Implement a MOJO_READ_DATA_FLAG_PEEK flag for data pipes.

MOJO_READ_DATA_FLAG_PEEK allows the user to read data from a pipe
without removing it. The data will be available for the next read.

R=viettrungluu@chromium.org

Review URL: https://codereview.chromium.org/703573002
diff --git a/mojo/edk/system/core_unittest.cc b/mojo/edk/system/core_unittest.cc
index 9adde19..e6607df 100644
--- a/mojo/edk/system/core_unittest.cc
+++ b/mojo/edk/system/core_unittest.cc
@@ -1084,6 +1084,19 @@
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
 
+  // Peek one character.
+  elements[0] = -1;
+  elements[1] = -1;
+  num_bytes = 1u;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->ReadData(
+                ch,
+                UserPointer<void>(elements),
+                MakeUserPointer(&num_bytes),
+                MOJO_READ_DATA_FLAG_NONE | MOJO_READ_DATA_FLAG_PEEK));
+  EXPECT_EQ('A', elements[0]);
+  EXPECT_EQ(-1, elements[1]);
+
   // Read one character.
   elements[0] = -1;
   elements[1] = -1;
@@ -1131,6 +1144,16 @@
                              MOJO_READ_DATA_FLAG_QUERY));
   EXPECT_EQ(4u, num_bytes);
 
+  // Try to query with peek. Should fail.
+  num_bytes = 0;
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->ReadData(
+                ch,
+                NullUserPointer(),
+                MakeUserPointer(&num_bytes),
+                MOJO_READ_DATA_FLAG_QUERY | MOJO_READ_DATA_FLAG_PEEK));
+  EXPECT_EQ(0u, num_bytes);
+
   // Try to discard ten characters, in all-or-none mode. Should fail.
   num_bytes = 10;
   EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
@@ -1140,6 +1163,15 @@
                 MakeUserPointer(&num_bytes),
                 MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE));
 
+  // Try to discard two characters, in peek mode. Should fail.
+  num_bytes = 2;
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->ReadData(
+                ch,
+                NullUserPointer(),
+                MakeUserPointer(&num_bytes),
+                MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_PEEK));
+
   // Discard two characters.
   num_bytes = 2;
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -1149,9 +1181,17 @@
                 MakeUserPointer(&num_bytes),
                 MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE));
 
-  // Read the remaining two characters, in two-phase mode (all-or-none).
+  // Try a two-phase read of the remaining two bytes with peek. Should fail.
   const void* read_ptr = nullptr;
   num_bytes = 2;
+  ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->BeginReadData(ch,
+                                  MakeUserPointer(&read_ptr),
+                                  MakeUserPointer(&num_bytes),
+                                  MOJO_READ_DATA_FLAG_PEEK));
+
+  // Read the remaining two characters, in two-phase mode (all-or-none).
+  num_bytes = 2;
   ASSERT_EQ(MOJO_RESULT_OK,
             core()->BeginReadData(ch,
                                   MakeUserPointer(&read_ptr),
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
index e9c4903..e9525bc 100644
--- a/mojo/edk/system/data_pipe.cc
+++ b/mojo/edk/system/data_pipe.cc
@@ -260,7 +260,8 @@
 
 MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
                                       UserPointer<uint32_t> num_bytes,
-                                      bool all_or_none) {
+                                      bool all_or_none,
+                                      bool peek) {
   base::AutoLock locker(lock_);
   DCHECK(has_local_consumer_no_lock());
 
@@ -279,7 +280,7 @@
   HandleSignalsState old_producer_state =
       ProducerGetHandleSignalsStateImplNoLock();
   MojoResult rv = ConsumerReadDataImplNoLock(
-      elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
+      elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek);
   HandleSignalsState new_producer_state =
       ProducerGetHandleSignalsStateImplNoLock();
   if (!new_producer_state.equals(old_producer_state))
diff --git a/mojo/edk/system/data_pipe.h b/mojo/edk/system/data_pipe.h
index 64b18e2..d4afdda 100644
--- a/mojo/edk/system/data_pipe.h
+++ b/mojo/edk/system/data_pipe.h
@@ -73,7 +73,8 @@
   // a multiple of |element_num_bytes_|.
   MojoResult ConsumerReadData(UserPointer<void> elements,
                               UserPointer<uint32_t> num_bytes,
-                              bool all_or_none);
+                              bool all_or_none,
+                              bool peek);
   MojoResult ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
                                  bool all_or_none);
   MojoResult ConsumerQueryData(UserPointer<uint32_t> num_bytes);
@@ -120,7 +121,8 @@
       UserPointer<void> elements,
       UserPointer<uint32_t> num_bytes,
       uint32_t max_num_bytes_to_read,
-      uint32_t min_num_bytes_to_read) = 0;
+      uint32_t min_num_bytes_to_read,
+      bool peek) = 0;
   virtual MojoResult ConsumerDiscardDataImplNoLock(
       UserPointer<uint32_t> num_bytes,
       uint32_t max_num_bytes_to_discard,
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index 2b3f514..f2d4496 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -58,7 +58,8 @@
 
   if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
     // These flags are mutally exclusive.
-    if ((flags & MOJO_READ_DATA_FLAG_QUERY))
+    if ((flags & MOJO_READ_DATA_FLAG_QUERY) ||
+        (flags & MOJO_READ_DATA_FLAG_PEEK))
       return MOJO_RESULT_INVALID_ARGUMENT;
     DVLOG_IF(2, !elements.IsNull())
         << "Discard mode: ignoring non-null |elements|";
@@ -67,6 +68,8 @@
   }
 
   if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
+    if ((flags & MOJO_READ_DATA_FLAG_PEEK))
+      return MOJO_RESULT_INVALID_ARGUMENT;
     DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD));  // Handled above.
     DVLOG_IF(2, !elements.IsNull())
         << "Query mode: ignoring non-null |elements|";
@@ -74,7 +77,10 @@
   }
 
   return data_pipe_->ConsumerReadData(
-      elements, num_bytes, (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE));
+      elements,
+      num_bytes,
+      (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE),
+      (flags & MOJO_READ_DATA_FLAG_PEEK));
 }
 
 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
@@ -85,7 +91,8 @@
 
   // These flags may not be used in two-phase mode.
   if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
-      (flags & MOJO_READ_DATA_FLAG_QUERY))
+      (flags & MOJO_READ_DATA_FLAG_QUERY) ||
+      (flags & MOJO_READ_DATA_FLAG_PEEK))
     return MOJO_RESULT_INVALID_ARGUMENT;
 
   return data_pipe_->ConsumerBeginReadData(
diff --git a/mojo/edk/system/local_data_pipe.cc b/mojo/edk/system/local_data_pipe.cc
index 8ee696c..fdfaf28 100644
--- a/mojo/edk/system/local_data_pipe.cc
+++ b/mojo/edk/system/local_data_pipe.cc
@@ -177,7 +177,8 @@
     UserPointer<void> elements,
     UserPointer<uint32_t> num_bytes,
     uint32_t max_num_bytes_to_read,
-    uint32_t min_num_bytes_to_read) {
+    uint32_t min_num_bytes_to_read,
+    bool peek) {
   DCHECK_EQ(max_num_bytes_to_read % element_num_bytes(), 0u);
   DCHECK_EQ(min_num_bytes_to_read % element_num_bytes(), 0u);
   DCHECK_GT(max_num_bytes_to_read, 0u);
@@ -207,7 +208,8 @@
         .PutArray(buffer_.get(), num_bytes_to_read - num_bytes_to_read_first);
   }
 
-  MarkDataAsConsumedNoLock(num_bytes_to_read);
+  if (!peek)
+    MarkDataAsConsumedNoLock(num_bytes_to_read);
   num_bytes.Put(static_cast<uint32_t>(num_bytes_to_read));
   return MOJO_RESULT_OK;
 }
diff --git a/mojo/edk/system/local_data_pipe.h b/mojo/edk/system/local_data_pipe.h
index 8a46c52..c98cb7e 100644
--- a/mojo/edk/system/local_data_pipe.h
+++ b/mojo/edk/system/local_data_pipe.h
@@ -48,7 +48,8 @@
       UserPointer<void> elements,
       UserPointer<uint32_t> num_bytes,
       uint32_t max_num_bytes_to_read,
-      uint32_t min_num_bytes_to_read) override;
+      uint32_t min_num_bytes_to_read,
+      bool peek) override;
   MojoResult ConsumerDiscardDataImplNoLock(
       UserPointer<uint32_t> num_bytes,
       uint32_t max_num_bytes_to_discard,
diff --git a/mojo/edk/system/local_data_pipe_unittest.cc b/mojo/edk/system/local_data_pipe_unittest.cc
index 979b4d2..e8bc716 100644
--- a/mojo/edk/system/local_data_pipe_unittest.cc
+++ b/mojo/edk/system/local_data_pipe_unittest.cc
@@ -118,8 +118,10 @@
   num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
   EXPECT_EQ(
       MOJO_RESULT_SHOULD_WAIT,
-      dp->ConsumerReadData(
-          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
 
   // Query; nothing there yet.
   num_bytes = 0;
@@ -135,8 +137,10 @@
   num_bytes = sizeof(elements[0]) + 1;
   EXPECT_EQ(
       MOJO_RESULT_INVALID_ARGUMENT,
-      dp->ConsumerReadData(
-          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
 
   // Write two elements.
   elements[0] = 123;
@@ -160,8 +164,10 @@
   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
   EXPECT_EQ(
       MOJO_RESULT_OK,
-      dp->ConsumerReadData(
-          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
   EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
   EXPECT_EQ(123, elements[0]);
   EXPECT_EQ(-1, elements[1]);
@@ -171,14 +177,35 @@
   EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
   EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
 
+  // Peek one element.
+  elements[0] = -1;
+  elements[1] = -1;
+  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
+  EXPECT_EQ(
+      MOJO_RESULT_OK,
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           true));
+  EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
+  EXPECT_EQ(456, elements[0]);
+  EXPECT_EQ(-1, elements[1]);
+
+  // Query. Still has 1 element remaining.
+  num_bytes = 0;
+  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+  EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
+
   // Try to read two elements, with "all or none".
   elements[0] = -1;
   elements[1] = -1;
   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
   EXPECT_EQ(
       MOJO_RESULT_OUT_OF_RANGE,
-      dp->ConsumerReadData(
-          UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           true,
+                           false));
   EXPECT_EQ(-1, elements[0]);
   EXPECT_EQ(-1, elements[1]);
 
@@ -188,8 +215,10 @@
   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
   EXPECT_EQ(
       MOJO_RESULT_OK,
-      dp->ConsumerReadData(
-          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
   EXPECT_EQ(456, elements[0]);
   EXPECT_EQ(-1, elements[1]);
 
@@ -262,6 +291,32 @@
   EXPECT_EQ(0u, hss.satisfied_signals);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
 
+  // Peek one element.
+  elements[0] = -1;
+  elements[1] = -1;
+  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
+  EXPECT_EQ(
+      MOJO_RESULT_OK,
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           true,
+                           true));
+  EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
+  EXPECT_EQ(123, elements[0]);
+  EXPECT_EQ(-1, elements[1]);
+
+  // Add a waiter.
+  waiter.Init();
+  ASSERT_EQ(
+      MOJO_RESULT_OK,
+      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr));
+  // And it still shouldn't be writable yet.
+  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+  hss = HandleSignalsState();
+  dp->ProducerRemoveWaiter(&waiter, &hss);
+  EXPECT_EQ(0u, hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
+
   // Do it again.
   waiter.Init();
   ASSERT_EQ(
@@ -274,8 +329,10 @@
   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
   EXPECT_EQ(
       MOJO_RESULT_OK,
-      dp->ConsumerReadData(
-          UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
+      dp->ConsumerReadData(UserPointer<void>(elements),
+                           MakeUserPointer(&num_bytes),
+                           true,
+                           false));
   EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
   EXPECT_EQ(123, elements[0]);
   EXPECT_EQ(-1, elements[1]);
@@ -432,14 +489,39 @@
     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
 
+    // Peek one element.
+    elements[0] = -1;
+    elements[1] = -1;
+    num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
+    EXPECT_EQ(
+        MOJO_RESULT_OK,
+        dp->ConsumerReadData(UserPointer<void>(elements),
+                             MakeUserPointer(&num_bytes),
+                             true,
+                             true));
+    EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
+    EXPECT_EQ(456, elements[0]);
+    EXPECT_EQ(-1, elements[1]);
+
+    // Should still be readable.
+    waiter.Init();
+    hss = HandleSignalsState();
+    EXPECT_EQ(
+        MOJO_RESULT_ALREADY_EXISTS,
+        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss));
+    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+
     // Read one element.
     elements[0] = -1;
     elements[1] = -1;
     num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
     EXPECT_EQ(
         MOJO_RESULT_OK,
-        dp->ConsumerReadData(
-            UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
+        dp->ConsumerReadData(UserPointer<void>(elements),
+                             MakeUserPointer(&num_bytes),
+                             true,
+                             false));
     EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
     EXPECT_EQ(456, elements[0]);
     EXPECT_EQ(-1, elements[1]);
@@ -485,8 +567,10 @@
     num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
     EXPECT_EQ(
         MOJO_RESULT_OK,
-        dp->ConsumerReadData(
-            UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
+        dp->ConsumerReadData(UserPointer<void>(elements),
+                             MakeUserPointer(&num_bytes),
+                             true,
+                             false));
     EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
     EXPECT_EQ(789, elements[0]);
     EXPECT_EQ(-1, elements[1]);
@@ -843,8 +927,10 @@
   element = 0;
   EXPECT_EQ(
       MOJO_RESULT_OK,
-      dp->ConsumerReadData(
-          UserPointer<void>(&element), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(&element),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
   EXPECT_EQ(static_cast<uint32_t>(sizeof(int32_t)), num_bytes);
   EXPECT_EQ(456, element);
 
@@ -908,8 +994,10 @@
   num_bytes = 5u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 false,
+                                 false));
   EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
   int32_t expected_buffer[100];
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
@@ -933,8 +1021,10 @@
   num_bytes = 5u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 false,
+                                 false));
   EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   expected_buffer[0] = 8;
@@ -972,8 +1062,10 @@
   num_bytes = sizeof(buffer);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 false,
+                                 false));
   EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   expected_buffer[0] = 104;
@@ -1062,8 +1154,10 @@
   num_bytes = sizeof(buffer);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 false,
+                                 false));
   EXPECT_EQ(8u * sizeof(int32_t), num_bytes);
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   expected_buffer[0] = 500;
@@ -1134,8 +1228,10 @@
   num_bytes = 11u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   int32_t expected_buffer[100];
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
@@ -1172,8 +1268,10 @@
   num_bytes = 5u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   Seq(100, 5, expected_buffer);
@@ -1183,8 +1281,10 @@
   num_bytes = 6u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
 
@@ -1211,8 +1311,10 @@
   num_bytes = 4u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
 
@@ -1225,8 +1327,10 @@
   num_bytes = 2u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   Seq(400, 2, expected_buffer);
@@ -1291,8 +1395,10 @@
   num_bytes = 1u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
   int32_t expected_buffer[100];
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
@@ -1303,8 +1409,10 @@
   num_bytes = 10u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
 
@@ -1336,8 +1444,10 @@
   num_bytes = 10u * sizeof(int32_t);
   memset(buffer, 0xab, sizeof(buffer));
   EXPECT_EQ(MOJO_RESULT_OK,
-            dp->ConsumerReadData(
-                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
+            dp->ConsumerReadData(UserPointer<void>(buffer),
+                                 MakeUserPointer(&num_bytes),
+                                 true,
+                                 false));
   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
   Seq(300, 10, expected_buffer);
@@ -1525,8 +1635,10 @@
   num_bytes = 10u;
   EXPECT_EQ(
       MOJO_RESULT_OK,
-      dp->ConsumerReadData(
-          UserPointer<void>(read_buffer), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(read_buffer),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
   EXPECT_EQ(10u, num_bytes);
   EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
 
@@ -1572,8 +1684,10 @@
   memset(read_buffer, 0, num_bytes);
   EXPECT_EQ(
       MOJO_RESULT_OK,
-      dp->ConsumerReadData(
-          UserPointer<void>(read_buffer), MakeUserPointer(&num_bytes), false));
+      dp->ConsumerReadData(UserPointer<void>(read_buffer),
+                           MakeUserPointer(&num_bytes),
+                           false,
+                           false));
   EXPECT_EQ(100u, num_bytes);
   EXPECT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
 
@@ -1753,13 +1867,27 @@
     // Close the producer.
     dp->ProducerClose();
 
-    // Read that data.
+    // Peek that data.
     char buffer[1000];
     num_bytes = static_cast<uint32_t>(sizeof(buffer));
     EXPECT_EQ(
         MOJO_RESULT_OK,
-        dp->ConsumerReadData(
-            UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
+        dp->ConsumerReadData(UserPointer<void>(buffer),
+                             MakeUserPointer(&num_bytes),
+                             false,
+                             true));
+    EXPECT_EQ(kTestDataSize, num_bytes);
+    EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
+
+    // Read that data.
+    memset(buffer, 0, 1000);
+    num_bytes = static_cast<uint32_t>(sizeof(buffer));
+    EXPECT_EQ(
+        MOJO_RESULT_OK,
+        dp->ConsumerReadData(UserPointer<void>(buffer),
+                             MakeUserPointer(&num_bytes),
+                             false,
+                             false));
     EXPECT_EQ(kTestDataSize, num_bytes);
     EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
 
@@ -1767,8 +1895,10 @@
     num_bytes = static_cast<uint32_t>(sizeof(buffer));
     EXPECT_EQ(
         MOJO_RESULT_FAILED_PRECONDITION,
-        dp->ConsumerReadData(
-            UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
+        dp->ConsumerReadData(UserPointer<void>(buffer),
+                             MakeUserPointer(&num_bytes),
+                             false,
+                             false));
 
     // A two-phase read should also fail.
     const void* read_buffer_ptr = nullptr;
diff --git a/mojo/public/c/system/data_pipe.h b/mojo/public/c/system/data_pipe.h
index a7498e4..089ead3 100644
--- a/mojo/public/c/system/data_pipe.h
+++ b/mojo/public/c/system/data_pipe.h
@@ -83,6 +83,9 @@
 //       read. For use with |MojoReadData()| only. Mutually exclusive with
 //       |MOJO_READ_DATA_FLAG_DISCARD| and |MOJO_READ_DATA_FLAG_ALL_OR_NONE| is
 //       ignored if this flag is set.
+//   |MOJO_READ_DATA_FLAG_PEEK| - Read elements without removing them. For use
+//       with |MojoReadData()| only. Mutually exclusive with
+//       |MOJO_READ_DATA_FLAG_DISCARD| and |MOJO_READ_DATA_FLAG_QUERY|.
 
 typedef uint32_t MojoReadDataFlags;
 
@@ -91,11 +94,13 @@
 const MojoReadDataFlags MOJO_READ_DATA_FLAG_ALL_OR_NONE = 1 << 0;
 const MojoReadDataFlags MOJO_READ_DATA_FLAG_DISCARD = 1 << 1;
 const MojoReadDataFlags MOJO_READ_DATA_FLAG_QUERY = 1 << 2;
+const MojoReadDataFlags MOJO_READ_DATA_FLAG_PEEK = 1 << 3;
 #else
 #define MOJO_READ_DATA_FLAG_NONE ((MojoReadDataFlags)0)
 #define MOJO_READ_DATA_FLAG_ALL_OR_NONE ((MojoReadDataFlags)1 << 0)
 #define MOJO_READ_DATA_FLAG_DISCARD ((MojoReadDataFlags)1 << 1)
 #define MOJO_READ_DATA_FLAG_QUERY ((MojoReadDataFlags)1 << 2)
+#define MOJO_READ_DATA_FLAG_PEEK ((MojoReadDataFlags)1 << 3)
 #endif
 
 #ifdef __cplusplus
@@ -254,7 +259,9 @@
 // must be a multiple of the data pipe's element size) bytes of data to
 // |elements| and set |*num_bytes| to the amount actually read. If flags has
 // |MOJO_READ_DATA_FLAG_ALL_OR_NONE| set, it will either read exactly
-// |*num_bytes| bytes of data or none.
+// |*num_bytes| bytes of data or none. Additionally, if flags has
+// |MOJO_READ_DATA_FLAG_PEEK| set, the data read will remain in the pipe and be
+// available to future reads.
 //
 // If flags has |MOJO_READ_DATA_FLAG_DISCARD| set, it discards up to
 // |*num_bytes| (which again be a multiple of the element size) bytes of data,
@@ -300,7 +307,8 @@
 // |*buffer_num_bytes| will be at least as large as its input value, which must
 // also be a multiple of the element size (if |MOJO_READ_DATA_FLAG_ALL_OR_NONE|
 // is not set, the input value of |*buffer_num_bytes| is ignored). |flags| must
-// not have |MOJO_READ_DATA_FLAG_DISCARD| or |MOJO_READ_DATA_FLAG_QUERY| set.
+// not have |MOJO_READ_DATA_FLAG_DISCARD|, |MOJO_READ_DATA_FLAG_QUERY|, or
+// |MOJO_READ_DATA_FLAG_PEEK| set.
 //
 // During a two-phase read, |data_pipe_consumer_handle| is *not* readable.
 // E.g., if another thread tries to read from it, it will get