Implement CordInputStream in terms of ChunkIterator

PiperOrigin-RevId: 492051638
diff --git a/src/google/protobuf/io/zero_copy_stream.cc b/src/google/protobuf/io/zero_copy_stream.cc
index 83b7225..0e3ae16 100644
--- a/src/google/protobuf/io/zero_copy_stream.cc
+++ b/src/google/protobuf/io/zero_copy_stream.cc
@@ -34,13 +34,76 @@
 
 #include "google/protobuf/io/zero_copy_stream.h"
 
+#include <utility>
+
 #include "google/protobuf/stubs/logging.h"
 #include "google/protobuf/stubs/common.h"
+#include "absl/strings/cord.h"
+#include "absl/strings/cord_buffer.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+
+// Must be included last.
+#include "google/protobuf/port_def.inc"
 
 namespace google {
 namespace protobuf {
 namespace io {
 
+bool ZeroCopyInputStream::ReadCord(absl::Cord* cord, int count) {
+  if (count <= 0) return true;
+
+  absl::CordBuffer cord_buffer = cord->GetAppendBuffer(count);
+  absl::Span<char> out = cord_buffer.available_up_to(count);
+
+  auto FetchNextChunk = [&]() -> absl::Span<const char> {
+    const void* buffer;
+    int size;
+    if (!Next(&buffer, &size)) return {};
+
+    if (size > count) {
+      BackUp(size - count);
+      size = count;
+    }
+    return absl::MakeConstSpan(static_cast<const char*>(buffer), size);
+  };
+
+  auto AppendFullBuffer = [&]() -> absl::Span<char> {
+    cord->Append(std::move(cord_buffer));
+    cord_buffer = absl::CordBuffer::CreateWithDefaultLimit(count);
+    return cord_buffer.available_up_to(count);
+  };
+
+  auto CopyBytes = [&](absl::Span<char>& dst, absl::Span<const char>& src,
+                       size_t bytes) {
+    memcpy(dst.data(), src.data(), bytes);
+    dst.remove_prefix(bytes);
+    src.remove_prefix(bytes);
+    count -= bytes;
+    cord_buffer.IncreaseLengthBy(bytes);
+  };
+
+  do {
+    absl::Span<const char> in = FetchNextChunk();
+    if (in.empty()) {
+      // Append whatever we have pending so far.
+      cord->Append(std::move(cord_buffer));
+      return false;
+    }
+
+    if (out.empty()) out = AppendFullBuffer();
+
+    while (in.size() > out.size()) {
+      CopyBytes(out, in, out.size());
+      out = AppendFullBuffer();
+    }
+
+    CopyBytes(out, in, in.size());
+  } while (count > 0);
+
+  cord->Append(std::move(cord_buffer));
+  return true;
+}
 
 bool ZeroCopyOutputStream::WriteAliasedRaw(const void* /* data */,
                                            int /* size */) {
diff --git a/src/google/protobuf/io/zero_copy_stream_unittest.cc b/src/google/protobuf/io/zero_copy_stream_unittest.cc
index 4b7d3fc..bfc4e01 100644
--- a/src/google/protobuf/io/zero_copy_stream_unittest.cc
+++ b/src/google/protobuf/io/zero_copy_stream_unittest.cc
@@ -46,6 +46,7 @@
 //   "parametized tests" so that one set of tests can be used on all the
 //   implementations.
 
+#include <algorithm>
 #include <chrono>
 #include <thread>
 
@@ -62,6 +63,7 @@
 #include <memory>
 #include <sstream>
 #include <utility>
+#include <vector>
 
 #include "google/protobuf/testing/file.h"
 #include "google/protobuf/io/coded_stream.h"
@@ -79,6 +81,9 @@
 #include "google/protobuf/testing/googletest.h"
 #include <gtest/gtest.h>
 
+// Must be included last.
+#include "google/protobuf/port_def.inc"
+
 namespace google {
 namespace protobuf {
 namespace io {
@@ -737,6 +742,89 @@
 #endif  // THREAD_SANITIZER
 }
 
+TEST(DefaultReadCordTest, ReadSmallCord) {
+  std::string source = "hello world";
+  ArrayInputStream input(source.data(), source.size());
+
+  absl::Cord dest;
+  EXPECT_TRUE(input.Skip(1));
+  EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+  EXPECT_EQ(dest, "ello worl");
+}
+
+TEST(DefaultReadCordTest, ReadSmallCordAfterBackUp) {
+  std::string source = "hello world";
+  ArrayInputStream input(source.data(), source.size());
+
+  absl::Cord dest;
+  const void* buffer;
+  int size;
+  EXPECT_TRUE(input.Next(&buffer, &size));
+  input.BackUp(size - 1);
+
+  EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+  EXPECT_EQ(dest, "ello worl");
+}
+
+TEST(DefaultReadCordTest, ReadLargeCord) {
+  std::string source = "hello world";
+  for (int i = 0; i < 1024; i++) {
+    source.append("hello world");
+  }
+
+  absl::Cord dest;
+  ArrayInputStream input(source.data(), source.size());
+  EXPECT_TRUE(input.Skip(1));
+  EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+  absl::Cord expected(source);
+  expected.RemovePrefix(1);
+  expected.RemoveSuffix(1);
+
+  EXPECT_EQ(expected, dest);
+}
+
+TEST(DefaultReadCordTest, ReadLargeCordAfterBackup) {
+  std::string source = "hello world";
+  for (int i = 0; i < 1024; i++) {
+    source.append("hello world");
+  }
+
+  absl::Cord dest;
+  ArrayInputStream input(source.data(), source.size());
+
+  const void* buffer;
+  int size;
+  EXPECT_TRUE(input.Next(&buffer, &size));
+  input.BackUp(size - 1);
+
+  EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
+
+  absl::Cord expected(source);
+  expected.RemovePrefix(1);
+  expected.RemoveSuffix(1);
+
+  EXPECT_EQ(expected, dest);
+
+  EXPECT_TRUE(input.Next(&buffer, &size));
+  EXPECT_EQ("d", std::string(reinterpret_cast<const char*>(buffer), size));
+}
+
+TEST(DefaultReadCordTest, ReadCordEof) {
+  std::string source = "hello world";
+
+  absl::Cord dest;
+  ArrayInputStream input(source.data(), source.size());
+  input.Skip(1);
+  EXPECT_FALSE(input.ReadCord(&dest, source.size()));
+
+  absl::Cord expected(source);
+  expected.RemovePrefix(1);
+  EXPECT_EQ(expected, dest);
+}
+
 
 // To test files, we create a temporary file, write, read, truncate, repeat.
 TEST_F(IoTest, FileIo) {