Implement CordInputStream in terms of ChunkIterator
PiperOrigin-RevId: 492051638
diff --git a/src/google/protobuf/io/zero_copy_stream.cc b/src/google/protobuf/io/zero_copy_stream.cc
index 83b7225..0e3ae16 100644
--- a/src/google/protobuf/io/zero_copy_stream.cc
+++ b/src/google/protobuf/io/zero_copy_stream.cc
@@ -34,13 +34,76 @@
#include "google/protobuf/io/zero_copy_stream.h"
+#include <utility>
+
#include "google/protobuf/stubs/logging.h"
#include "google/protobuf/stubs/common.h"
+#include "absl/strings/cord.h"
+#include "absl/strings/cord_buffer.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+
+// Must be included last.
+#include "google/protobuf/port_def.inc"
namespace google {
namespace protobuf {
namespace io {
+bool ZeroCopyInputStream::ReadCord(absl::Cord* cord, int count) {
+ if (count <= 0) return true;
+
+ absl::CordBuffer cord_buffer = cord->GetAppendBuffer(count);
+ absl::Span<char> out = cord_buffer.available_up_to(count);
+
+ auto FetchNextChunk = [&]() -> absl::Span<const char> {
+ const void* buffer;
+ int size;
+ if (!Next(&buffer, &size)) return {};
+
+ if (size > count) {
+ BackUp(size - count);
+ size = count;
+ }
+ return absl::MakeConstSpan(static_cast<const char*>(buffer), size);
+ };
+
+ auto AppendFullBuffer = [&]() -> absl::Span<char> {
+ cord->Append(std::move(cord_buffer));
+ cord_buffer = absl::CordBuffer::CreateWithDefaultLimit(count);
+ return cord_buffer.available_up_to(count);
+ };
+
+ auto CopyBytes = [&](absl::Span<char>& dst, absl::Span<const char>& src,
+ size_t bytes) {
+ memcpy(dst.data(), src.data(), bytes);
+ dst.remove_prefix(bytes);
+ src.remove_prefix(bytes);
+ count -= bytes;
+ cord_buffer.IncreaseLengthBy(bytes);
+ };
+
+ do {
+ absl::Span<const char> in = FetchNextChunk();
+ if (in.empty()) {
+ // Append whatever we have pending so far.
+ cord->Append(std::move(cord_buffer));
+ return false;
+ }
+
+ if (out.empty()) out = AppendFullBuffer();
+
+ while (in.size() > out.size()) {
+ CopyBytes(out, in, out.size());
+ out = AppendFullBuffer();
+ }
+
+ CopyBytes(out, in, in.size());
+ } while (count > 0);
+
+ cord->Append(std::move(cord_buffer));
+ return true;
+}
bool ZeroCopyOutputStream::WriteAliasedRaw(const void* /* data */,
int /* size */) {
diff --git a/src/google/protobuf/io/zero_copy_stream_unittest.cc b/src/google/protobuf/io/zero_copy_stream_unittest.cc
index 4b7d3fc..bfc4e01 100644
--- a/src/google/protobuf/io/zero_copy_stream_unittest.cc
+++ b/src/google/protobuf/io/zero_copy_stream_unittest.cc
@@ -46,6 +46,7 @@
// "parametized tests" so that one set of tests can be used on all the
// implementations.
+#include <algorithm>
#include <chrono>
#include <thread>
@@ -62,6 +63,7 @@
#include <memory>
#include <sstream>
#include <utility>
+#include <vector>
#include "google/protobuf/testing/file.h"
#include "google/protobuf/io/coded_stream.h"
@@ -79,6 +81,9 @@
#include "google/protobuf/testing/googletest.h"
#include <gtest/gtest.h>
+// Must be included last.
+#include "google/protobuf/port_def.inc"
+
namespace google {
namespace protobuf {
namespace io {
@@ -737,6 +742,89 @@
#endif // THREAD_SANITIZER
}
+TEST(DefaultReadCordTest, ReadSmallCord) {
+ std::string source = "hello world";
+ ArrayInputStream input(source.data(), source.size());
+
+ absl::Cord dest;
+ EXPECT_TRUE(input.Skip(1));
+ EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+ EXPECT_EQ(dest, "ello worl");
+}
+
+TEST(DefaultReadCordTest, ReadSmallCordAfterBackUp) {
+ std::string source = "hello world";
+ ArrayInputStream input(source.data(), source.size());
+
+ absl::Cord dest;
+ const void* buffer;
+ int size;
+ EXPECT_TRUE(input.Next(&buffer, &size));
+ input.BackUp(size - 1);
+
+ EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+ EXPECT_EQ(dest, "ello worl");
+}
+
+TEST(DefaultReadCordTest, ReadLargeCord) {
+ std::string source = "hello world";
+ for (int i = 0; i < 1024; i++) {
+ source.append("hello world");
+ }
+
+ absl::Cord dest;
+ ArrayInputStream input(source.data(), source.size());
+ EXPECT_TRUE(input.Skip(1));
+ EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+ absl::Cord expected(source);
+ expected.RemovePrefix(1);
+ expected.RemoveSuffix(1);
+
+ EXPECT_EQ(expected, dest);
+}
+
+TEST(DefaultReadCordTest, ReadLargeCordAfterBackup) {
+ std::string source = "hello world";
+ for (int i = 0; i < 1024; i++) {
+ source.append("hello world");
+ }
+
+ absl::Cord dest;
+ ArrayInputStream input(source.data(), source.size());
+
+ const void* buffer;
+ int size;
+ EXPECT_TRUE(input.Next(&buffer, &size));
+ input.BackUp(size - 1);
+
+ EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+ absl::Cord expected(source);
+ expected.RemovePrefix(1);
+ expected.RemoveSuffix(1);
+
+ EXPECT_EQ(expected, dest);
+
+ EXPECT_TRUE(input.Next(&buffer, &size));
+ EXPECT_EQ("d", std::string(reinterpret_cast<const char*>(buffer), size));
+}
+
+TEST(DefaultReadCordTest, ReadCordEof) {
+ std::string source = "hello world";
+
+ absl::Cord dest;
+ ArrayInputStream input(source.data(), source.size());
+ input.Skip(1);
+ EXPECT_FALSE(input.ReadCord(&dest, source.size()));
+
+ absl::Cord expected(source);
+ expected.RemovePrefix(1);
+ EXPECT_EQ(expected, dest);
+}
+
// To test files, we create a temporary file, write, read, truncate, repeat.
TEST_F(IoTest, FileIo) {