EDK: Add implementation of data pipe consumer read threshold stuff.

* Add methods to Core.
* ... to Dispatcher.
* ... to DataPipeConsumerDispatcher.
* ... to DataPipe.
* ... to DataPipeImpl.
* ... to LocalDataPipeImpl.
* ... to RemoteProducerDataPipeImpl.
* Add tests.
* Don't actually expose the new functions to Mojo programs yet (i.e., no
  thunks yet).
* But the new handle signal is exposed!
* Update existing tests (due to new handle signal).

R=vardhan@google.com
BUG=#442

Review URL: https://codereview.chromium.org/1856113002 .
diff --git a/mojo/edk/system/core_unittest.cc b/mojo/edk/system/core_unittest.cc
index ead68e6..0fdd5ac 100644
--- a/mojo/edk/system/core_unittest.cc
+++ b/mojo/edk/system/core_unittest.cc
@@ -22,9 +22,6 @@
 
 const MojoHandleSignalsState kEmptyMojoHandleSignalsState = {0u, 0u};
 const MojoHandleSignalsState kFullMojoHandleSignalsState = {~0u, ~0u};
-const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
-                                      MOJO_HANDLE_SIGNAL_WRITABLE |
-                                      MOJO_HANDLE_SIGNAL_PEER_CLOSED;
 
 using CoreTest = test::CoreTestBase;
 
@@ -659,9 +656,13 @@
                        MakeUserPointer(&result_index), MakeUserPointer(hss)));
   EXPECT_EQ(static_cast<uint32_t>(-1), result_index);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[0].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[0].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[0].satisfiable_signals);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[1].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[1].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[1].satisfiable_signals);
 
   // Try to read anyway.
   char buffer[1] = {'a'};
@@ -680,12 +681,16 @@
   EXPECT_EQ(MOJO_RESULT_OK, core()->Wait(h[0], MOJO_HANDLE_SIGNAL_WRITABLE,
                                          1000000000, MakeUserPointer(&hss[0])));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[0].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[0].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[0].satisfiable_signals);
   hss[0] = kEmptyMojoHandleSignalsState;
   EXPECT_EQ(MOJO_RESULT_OK, core()->Wait(h[1], MOJO_HANDLE_SIGNAL_WRITABLE,
                                          1000000000, MakeUserPointer(&hss[0])));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[0].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[0].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[0].satisfiable_signals);
 
   // Also check that |h[1]| is writable using |WaitMany()|.
   signals[0] = MOJO_HANDLE_SIGNAL_READABLE;
@@ -700,9 +705,13 @@
                        MakeUserPointer(hss)));
   EXPECT_EQ(1u, result_index);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[0].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[0].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[0].satisfiable_signals);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[1].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[1].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[1].satisfiable_signals);
 
   // Write to |h[1]|.
   buffer[0] = 'b';
@@ -725,9 +734,13 @@
   EXPECT_EQ(0u, result_index);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss[0].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[0].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[0].satisfiable_signals);
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[1].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[1].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[1].satisfiable_signals);
 
   // Read from |h[0]|.
   // First, get only the size.
@@ -755,7 +768,9 @@
             core()->Wait(h[0], MOJO_HANDLE_SIGNAL_READABLE, 0,
                          MakeUserPointer(&hss[0])));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss[0].satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss[0].satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss[0].satisfiable_signals);
 
   // Write to |h[0]|.
   buffer[0] = 'd';
@@ -841,7 +856,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -881,7 +898,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -904,7 +923,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -939,7 +960,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -990,14 +1013,16 @@
       MOJO_RESULT_FAILED_PRECONDITION,
       core()->Wait(ch, MOJO_HANDLE_SIGNAL_WRITABLE, 0, MakeUserPointer(&hss)));
   EXPECT_EQ(0u, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
   hss = kFullMojoHandleSignalsState;
   EXPECT_EQ(
       MOJO_RESULT_DEADLINE_EXCEEDED,
       core()->Wait(ch, MOJO_HANDLE_SIGNAL_READABLE, 0, MakeUserPointer(&hss)));
   EXPECT_EQ(0u, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
 
   // Write.
@@ -1013,8 +1038,10 @@
   hss = kEmptyMojoHandleSignalsState;
   EXPECT_EQ(MOJO_RESULT_OK, core()->Wait(ch, MOJO_HANDLE_SIGNAL_READABLE, 0,
                                          MakeUserPointer(&hss)));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
 
   // Peek one character.
@@ -1132,7 +1159,8 @@
       MOJO_RESULT_DEADLINE_EXCEEDED,
       core()->Wait(ch, MOJO_HANDLE_SIGNAL_READABLE, 0, MakeUserPointer(&hss)));
   EXPECT_EQ(0u, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
 
   // TODO(vtl): More.
@@ -1151,6 +1179,94 @@
   EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ch));
 }
 
+TEST_F(CoreTest, DataPipeSetGetConsumerOptions) {
+  MojoCreateDataPipeOptions options = {
+      sizeof(MojoCreateDataPipeOptions),        // |struct_size|.
+      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
+      8,                                        // |element_num_bytes|.
+      800                                       // |capacity_num_bytes|.
+  };
+  MojoHandle ph, ch;  // p is for producer and c is for consumer.
+
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->CreateDataPipe(MakeUserPointer(&options),
+                                   MakeUserPointer(&ph), MakeUserPointer(&ch)));
+  // Should get two distinct, valid handles.
+  EXPECT_NE(ph, MOJO_HANDLE_INVALID);
+  EXPECT_NE(ch, MOJO_HANDLE_INVALID);
+  EXPECT_NE(ph, ch);
+
+  // Read it.
+  MojoDataPipeConsumerOptions copts = {};
+  const uint32_t kCoptsSize = static_cast<uint32_t>(sizeof(copts));
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
+                                ch, MakeUserPointer(&copts), kCoptsSize));
+  EXPECT_EQ(kCoptsSize, copts.struct_size);
+  EXPECT_EQ(0u, copts.read_threshold_num_bytes);
+
+  // Invalid read threshold.
+  copts.struct_size = kCoptsSize;
+  copts.read_threshold_num_bytes = 4;
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->SetDataPipeConsumerOptions(ch, MakeUserPointer(&copts)));
+  // The options shouldn't change.
+  copts = MojoDataPipeConsumerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
+                                ch, MakeUserPointer(&copts), kCoptsSize));
+  EXPECT_EQ(kCoptsSize, copts.struct_size);
+  EXPECT_EQ(0u, copts.read_threshold_num_bytes);
+
+  // Valid read threshold.
+  copts.struct_size = kCoptsSize;
+  copts.read_threshold_num_bytes = 8;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->SetDataPipeConsumerOptions(ch, MakeUserPointer(&copts)));
+  copts = MojoDataPipeConsumerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
+                                ch, MakeUserPointer(&copts), kCoptsSize));
+  EXPECT_EQ(kCoptsSize, copts.struct_size);
+  EXPECT_EQ(8u, copts.read_threshold_num_bytes);
+
+  // Invalid read threshold.
+  copts.struct_size = kCoptsSize;
+  copts.read_threshold_num_bytes = 9;
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->SetDataPipeConsumerOptions(ch, MakeUserPointer(&copts)));
+  // The options shouldn't change.
+  copts = MojoDataPipeConsumerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
+                                ch, MakeUserPointer(&copts), kCoptsSize));
+  EXPECT_EQ(kCoptsSize, copts.struct_size);
+  EXPECT_EQ(8u, copts.read_threshold_num_bytes);
+
+  // Valid read threshold.
+  copts.struct_size = kCoptsSize;
+  copts.read_threshold_num_bytes = 16;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->SetDataPipeConsumerOptions(ch, MakeUserPointer(&copts)));
+  copts = MojoDataPipeConsumerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
+                                ch, MakeUserPointer(&copts), kCoptsSize));
+  EXPECT_EQ(kCoptsSize, copts.struct_size);
+  EXPECT_EQ(16u, copts.read_threshold_num_bytes);
+
+  // Default read threshold.
+  copts.struct_size = kCoptsSize;
+  copts.read_threshold_num_bytes = 0;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->SetDataPipeConsumerOptions(ch, MakeUserPointer(&copts)));
+  copts = MojoDataPipeConsumerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
+                                ch, MakeUserPointer(&copts), kCoptsSize));
+  EXPECT_EQ(kCoptsSize, copts.struct_size);
+  // Note: Should be reported as 0 ("default"), even if it means the element
+  // struct_size.
+  EXPECT_EQ(0u, copts.read_threshold_num_bytes);
+
+  EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ph));
+  EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ch));
+}
+
 // Tests passing data pipe producer and consumer handles.
 TEST_F(CoreTest, MessagePipeBasicLocalHandlePassing2) {
   const char kHello[] = "hello";
@@ -1186,7 +1302,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -1220,8 +1338,10 @@
   EXPECT_EQ(MOJO_RESULT_OK,
             core()->Wait(ch_received, MOJO_HANDLE_SIGNAL_READABLE, 1000000000,
                          MakeUserPointer(&hss)));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
   num_bytes = kBufferSize;
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -1242,7 +1362,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -1276,8 +1398,10 @@
   EXPECT_EQ(MOJO_RESULT_OK,
             core()->Wait(ch_received, MOJO_HANDLE_SIGNAL_READABLE, 1000000000,
                          MakeUserPointer(&hss)));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
   num_bytes = kBufferSize;
   EXPECT_EQ(MOJO_RESULT_OK,
@@ -1335,8 +1459,10 @@
   hss = kEmptyMojoHandleSignalsState;
   EXPECT_EQ(MOJO_RESULT_OK, core()->Wait(ch, MOJO_HANDLE_SIGNAL_READABLE,
                                          1000000000, MakeUserPointer(&hss)));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
             hss.satisfiable_signals);
 
   // Make sure that |ch| can't be sent if it's in a two-phase read.
@@ -1363,7 +1489,9 @@
                          MakeUserPointer(&hss)));
   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
             hss.satisfied_signals);
-  EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
+                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+            hss.satisfiable_signals);
   num_bytes = kBufferSize;
   num_handles = MOJO_ARRAYSIZE(handles);
   EXPECT_EQ(MOJO_RESULT_OK,