EDK: Add implementation of data pipe producer write threshold stuff.

Entirely analogous to afe7672f3020a8b28fdbb316b3f7f91dcdd35ad0, which
did this for read threshold stuff.

* Mostly, "consumer" -> "producer", "read" -> "write", etc.
* Also doesn't expose thunks yet. (But does expose new handle signal.)
* The only non-mechanical changes versus the aforementioned change are
  in {LocalDataPipeImpl,RemoteConsumerDataPipeImpl}::
  ProducerGetHandleSignalsState() (these are essentially the same)
  and DataPipeImplTest.WriteThreshold.

R=vardhan@google.com

Review URL: https://codereview.chromium.org/1885663002 .
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index c0cdd3f..ecde52e 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -405,6 +405,29 @@
   return MOJO_RESULT_OK;
 }
 
+MojoResult Core::SetDataPipeProducerOptions(
+    MojoHandle data_pipe_producer_handle,
+    UserPointer<const MojoDataPipeProducerOptions> options) {
+  RefPtr<Dispatcher> dispatcher;
+  MojoResult result = GetDispatcher(data_pipe_producer_handle, &dispatcher);
+  if (result != MOJO_RESULT_OK)
+    return result;
+
+  return dispatcher->SetDataPipeProducerOptions(options);
+}
+
+MojoResult Core::GetDataPipeProducerOptions(
+    MojoHandle data_pipe_producer_handle,
+    UserPointer<MojoDataPipeProducerOptions> options,
+    uint32_t options_num_bytes) {
+  RefPtr<Dispatcher> dispatcher;
+  MojoResult result = GetDispatcher(data_pipe_producer_handle, &dispatcher);
+  if (result != MOJO_RESULT_OK)
+    return result;
+
+  return dispatcher->GetDataPipeProducerOptions(options, options_num_bytes);
+}
+
 MojoResult Core::WriteData(MojoHandle data_pipe_producer_handle,
                            UserPointer<const void> elements,
                            UserPointer<uint32_t> num_bytes,
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index fe8cfbf..32a7ff2 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -130,6 +130,13 @@
       UserPointer<const MojoCreateDataPipeOptions> options,
       UserPointer<MojoHandle> data_pipe_producer_handle,
       UserPointer<MojoHandle> data_pipe_consumer_handle);
+  MojoResult SetDataPipeProducerOptions(
+      MojoHandle data_pipe_producer_handle,
+      UserPointer<const MojoDataPipeProducerOptions> options);
+  MojoResult GetDataPipeProducerOptions(
+      MojoHandle data_pipe_producer_handle,
+      UserPointer<MojoDataPipeProducerOptions> options,
+      uint32_t options_num_bytes);
   MojoResult WriteData(MojoHandle data_pipe_producer_handle,
                        UserPointer<const void> elements,
                        UserPointer<uint32_t> num_bytes,
diff --git a/mojo/edk/system/core_unittest.cc b/mojo/edk/system/core_unittest.cc
index bb8967a..9395286 100644
--- a/mojo/edk/system/core_unittest.cc
+++ b/mojo/edk/system/core_unittest.cc
@@ -997,14 +997,18 @@
   EXPECT_EQ(
       MOJO_RESULT_FAILED_PRECONDITION,
       core()->Wait(ph, MOJO_HANDLE_SIGNAL_READABLE, 0, MakeUserPointer(&hss)));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
   hss = kEmptyMojoHandleSignalsState;
   EXPECT_EQ(MOJO_RESULT_OK, core()->Wait(ph, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
                                          MakeUserPointer(&hss)));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // Consumer should be never-writable, and not yet readable.
@@ -1179,6 +1183,92 @@
   EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ch));
 }
 
+TEST_F(CoreTest, DataPipeSetGetProducerOptions) {
+  MojoCreateDataPipeOptions options = {
+      sizeof(MojoCreateDataPipeOptions),        // |struct_size|.
+      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
+      8,                                        // |element_num_bytes|.
+      800                                       // |capacity_num_bytes|.
+  };
+  MojoHandle ph, ch;  // p is for producer and c is for consumer.
+
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->CreateDataPipe(MakeUserPointer(&options),
+                                   MakeUserPointer(&ph), MakeUserPointer(&ch)));
+  // Should get two distinct, valid handles.
+  EXPECT_NE(ph, MOJO_HANDLE_INVALID);
+  EXPECT_NE(ch, MOJO_HANDLE_INVALID);
+  EXPECT_NE(ph, ch);
+
+  // Get it.
+  MojoDataPipeProducerOptions popts = {};
+  const uint32_t kPoptsSize = static_cast<uint32_t>(sizeof(popts));
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+                                ph, MakeUserPointer(&popts), kPoptsSize));
+  EXPECT_EQ(kPoptsSize, popts.struct_size);
+  EXPECT_EQ(0u, popts.write_threshold_num_bytes);
+
+  // Invalid write threshold.
+  popts.struct_size = kPoptsSize;
+  popts.write_threshold_num_bytes = 4;
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+  // The options shouldn't change.
+  popts = MojoDataPipeProducerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+                                ph, MakeUserPointer(&popts), kPoptsSize));
+  EXPECT_EQ(kPoptsSize, popts.struct_size);
+  EXPECT_EQ(0u, popts.write_threshold_num_bytes);
+
+  // Valid write threshold.
+  popts.struct_size = kPoptsSize;
+  popts.write_threshold_num_bytes = 8;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+  popts = MojoDataPipeProducerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+                                ph, MakeUserPointer(&popts), kPoptsSize));
+  EXPECT_EQ(kPoptsSize, popts.struct_size);
+  EXPECT_EQ(8u, popts.write_threshold_num_bytes);
+
+  // Invalid write threshold.
+  popts.struct_size = kPoptsSize;
+  popts.write_threshold_num_bytes = 9;
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+  // The options shouldn't change.
+  popts = MojoDataPipeProducerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+                                ph, MakeUserPointer(&popts), kPoptsSize));
+  EXPECT_EQ(kPoptsSize, popts.struct_size);
+  EXPECT_EQ(8u, popts.write_threshold_num_bytes);
+
+  // Valid write threshold.
+  popts.struct_size = kPoptsSize;
+  popts.write_threshold_num_bytes = 16;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+  popts = MojoDataPipeProducerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+                                ph, MakeUserPointer(&popts), kPoptsSize));
+  EXPECT_EQ(kPoptsSize, popts.struct_size);
+  EXPECT_EQ(16u, popts.write_threshold_num_bytes);
+
+  // Can also set to default by passing null.
+  EXPECT_EQ(MOJO_RESULT_OK,
+            core()->SetDataPipeProducerOptions(ph, NullUserPointer()));
+  popts = MojoDataPipeProducerOptions();
+  EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+                                ph, MakeUserPointer(&popts), kPoptsSize));
+  EXPECT_EQ(kPoptsSize, popts.struct_size);
+  // Note: Should be reported as 0 ("default"), even if it means the element
+  // struct_size.
+  EXPECT_EQ(0u, popts.write_threshold_num_bytes);
+
+  EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ph));
+  EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ch));
+}
+
 TEST_F(CoreTest, DataPipeSetGetConsumerOptions) {
   MojoCreateDataPipeOptions options = {
       sizeof(MojoCreateDataPipeOptions),        // |struct_size|.
@@ -1196,7 +1286,7 @@
   EXPECT_NE(ch, MOJO_HANDLE_INVALID);
   EXPECT_NE(ph, ch);
 
-  // Read it.
+  // Get it.
   MojoDataPipeConsumerOptions copts = {};
   const uint32_t kCoptsSize = static_cast<uint32_t>(sizeof(copts));
   EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
index 9a90c8a..c42b6fd 100644
--- a/mojo/edk/system/data_pipe.cc
+++ b/mojo/edk/system/data_pipe.cc
@@ -281,6 +281,29 @@
   ProducerCloseNoLock();
 }
 
+MojoResult DataPipe::ProducerSetOptions(uint32_t write_threshold_num_bytes) {
+  MutexLocker locker(&mutex_);
+  DCHECK(has_local_producer_no_lock());
+
+  if (write_threshold_num_bytes % element_num_bytes() != 0)
+    return MOJO_RESULT_INVALID_ARGUMENT;
+
+  HandleSignalsState old_producer_state =
+      impl_->ProducerGetHandleSignalsState();
+  producer_write_threshold_num_bytes_ = write_threshold_num_bytes;
+  HandleSignalsState new_producer_state =
+      impl_->ProducerGetHandleSignalsState();
+  if (!new_producer_state.equals(old_producer_state))
+    AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
+  return MOJO_RESULT_OK;
+}
+
+void DataPipe::ProducerGetOptions(uint32_t* write_threshold_num_bytes) {
+  MutexLocker locker(&mutex_);
+  DCHECK(has_local_producer_no_lock());
+  *write_threshold_num_bytes = producer_write_threshold_num_bytes_;
+}
+
 MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
                                        UserPointer<uint32_t> num_bytes,
                                        bool all_or_none) {
@@ -686,6 +709,7 @@
       capacity_num_bytes_(validated_options.capacity_num_bytes),
       producer_open_(true),
       consumer_open_(true),
+      producer_write_threshold_num_bytes_(0),
       consumer_read_threshold_num_bytes_(0),
       producer_awakable_list_(has_local_producer ? new AwakableList()
                                                  : nullptr),
diff --git a/mojo/edk/system/data_pipe.h b/mojo/edk/system/data_pipe.h
index f559511..8ff9464 100644
--- a/mojo/edk/system/data_pipe.h
+++ b/mojo/edk/system/data_pipe.h
@@ -105,6 +105,8 @@
   // corresponding names.
   void ProducerCancelAllAwakables();
   void ProducerClose();
+  MojoResult ProducerSetOptions(uint32_t write_threshold_num_bytes);
+  void ProducerGetOptions(uint32_t* write_threshold_num_bytes);
   MojoResult ProducerWriteData(UserPointer<const void> elements,
                                UserPointer<uint32_t> num_bytes,
                                bool all_or_none);
@@ -197,6 +199,14 @@
     mutex_.AssertHeld();
     return consumer_open_;
   }
+  // Note that this returns the "real" write threshold (never zero).
+  size_t producer_write_threshold_num_bytes_no_lock() const
+      MOJO_SHARED_LOCKS_REQUIRED(mutex_) {
+    mutex_.AssertHeld();
+    return (producer_write_threshold_num_bytes_ > 0u)
+               ? producer_write_threshold_num_bytes_
+               : element_num_bytes_;
+  }
   // Note that this returns the "real" read threshold (never zero).
   size_t consumer_read_threshold_num_bytes_no_lock() const
       MOJO_SHARED_LOCKS_REQUIRED(mutex_) {
@@ -282,6 +292,8 @@
   bool producer_open_ MOJO_GUARDED_BY(mutex_);
   bool consumer_open_ MOJO_GUARDED_BY(mutex_);
   // This may be zero (in which case it "means" |element_num_bytes_|).
+  uint32_t producer_write_threshold_num_bytes_ MOJO_GUARDED_BY(mutex_);
+  // This may be zero (in which case it "means" |element_num_bytes_|).
   uint32_t consumer_read_threshold_num_bytes_ MOJO_GUARDED_BY(mutex_);
   // Non-null only if the producer or consumer, respectively, is local.
   std::unique_ptr<AwakableList> producer_awakable_list_ MOJO_GUARDED_BY(mutex_);
diff --git a/mojo/edk/system/data_pipe_impl.h b/mojo/edk/system/data_pipe_impl.h
index e92d66c..bd3cc31 100644
--- a/mojo/edk/system/data_pipe_impl.h
+++ b/mojo/edk/system/data_pipe_impl.h
@@ -144,6 +144,10 @@
   bool consumer_open() const MOJO_NO_THREAD_SAFETY_ANALYSIS {
     return owner_->consumer_open_no_lock();
   }
+  size_t producer_write_threshold_num_bytes() const
+      MOJO_NO_THREAD_SAFETY_ANALYSIS {
+    return owner_->producer_write_threshold_num_bytes_no_lock();
+  }
   size_t consumer_read_threshold_num_bytes() const
       MOJO_NO_THREAD_SAFETY_ANALYSIS {
     return owner_->consumer_read_threshold_num_bytes_no_lock();
diff --git a/mojo/edk/system/data_pipe_impl_unittest.cc b/mojo/edk/system/data_pipe_impl_unittest.cc
index a783ad9..858044a 100644
--- a/mojo/edk/system/data_pipe_impl_unittest.cc
+++ b/mojo/edk/system/data_pipe_impl_unittest.cc
@@ -120,6 +120,12 @@
   }
 
   void ProducerClose() { helper_->ProducerClose(); }
+  MojoResult ProducerSetOptions(uint32_t write_threshold_num_bytes) {
+    return dpp()->ProducerSetOptions(write_threshold_num_bytes);
+  }
+  void ProducerGetOptions(uint32_t* write_threshold_num_bytes) {
+    dpp()->ProducerGetOptions(write_threshold_num_bytes);
+  }
   MojoResult ProducerWriteData(UserPointer<const void> elements,
                                UserPointer<uint32_t> num_bytes,
                                bool all_or_none) {
@@ -777,8 +783,10 @@
   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
             this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12,
                                       &hss));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // Already writable.
@@ -812,7 +820,8 @@
   hss = HandleSignalsState();
   this->ProducerRemoveAwakable(&pwaiter, &hss);
   EXPECT_EQ(0u, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // Wait for data to become available to the consumer.
@@ -848,7 +857,8 @@
   hss = HandleSignalsState();
   this->ProducerRemoveAwakable(&pwaiter, &hss);
   EXPECT_EQ(0u, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // Do it again.
@@ -874,8 +884,10 @@
   EXPECT_EQ(78u, context);
   hss = HandleSignalsState();
   this->ProducerRemoveAwakable(&pwaiter, &hss);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // Try writing, using a two-phase write.
@@ -918,8 +930,10 @@
   EXPECT_EQ(90u, context);
   hss = HandleSignalsState();
   this->ProducerRemoveAwakable(&pwaiter, &hss);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // Write one element.
@@ -1341,8 +1355,10 @@
   EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
             this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
                                       &hss));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   void* write_ptr = nullptr;
@@ -1362,7 +1378,8 @@
   hss = HandleSignalsState();
   this->ProducerRemoveAwakable(&pwaiter, &hss);
   EXPECT_EQ(0u, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // It shouldn't be readable yet either (we'll wait later).
@@ -1381,8 +1398,10 @@
   EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
             this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3,
                                       &hss));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // It should become readable.
@@ -1435,8 +1454,10 @@
   EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
             this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6,
                                       &hss));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             hss.satisfiable_signals);
 
   // But not readable.
@@ -2549,6 +2570,165 @@
   this->ConsumerClose();
 }
 
+TYPED_TEST(DataPipeImplTest, WriteThreshold) {
+  const MojoCreateDataPipeOptions options = {
+      kSizeOfOptions,                           // |struct_size|.
+      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
+      1u,                                       // |element_num_bytes|.
+      10u                                       // |capacity_num_bytes|.
+  };
+  this->Create(options);
+  this->DoTransfer();
+
+  // The default write threshold should be 0 (which means "default", i.e., one
+  // element).
+  uint32_t write_threshold_num_bytes = 123;
+  this->ProducerGetOptions(&write_threshold_num_bytes);
+  EXPECT_EQ(0u, write_threshold_num_bytes);
+
+  Waiter waiter;
+  HandleSignalsState hss;
+
+  // Try to wait to the write threshold signal; it should already have it.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, &hss));
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfiable_signals);
+
+  // Before writing, add a waiter on the consumer (since we'll need to know when
+  // the written bytes have propagated).
+  Waiter read_waiter;
+  read_waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_OK,
+            this->ConsumerAddAwakable(&read_waiter, MOJO_HANDLE_SIGNAL_READABLE,
+                                      0, nullptr));
+
+  // Write 5 bytes.
+  static const char kTestData[5] = {'A', 'B', 'C', 'D', 'E'};
+  uint32_t num_bytes = 5;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            this->ProducerWriteData(UserPointer<const void>(kTestData),
+                                    MakeUserPointer(&num_bytes), false));
+  EXPECT_EQ(5u, num_bytes);
+
+  // It should still have the write threshold signal.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+  // Set the write threshold to 5.
+  this->ProducerSetOptions(5);
+  write_threshold_num_bytes = 123u;
+  this->ProducerGetOptions(&write_threshold_num_bytes);
+  EXPECT_EQ(5u, write_threshold_num_bytes);
+
+  // Should still have the write threshold signal.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+  // Set the write threshold to 6.
+  this->ProducerSetOptions(6);
+
+  // Should no longer have the write threshold signal.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_OK,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+  hss = HandleSignalsState();
+  this->ProducerRemoveAwakable(&waiter, &hss);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfiable_signals);
+
+  // Add a waiter.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_OK,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+  // Wait for the consumer to be readable.
+  EXPECT_EQ(MOJO_RESULT_OK, read_waiter.Wait(test::TinyTimeout(), nullptr));
+  this->ConsumerRemoveAwakable(&read_waiter, nullptr);
+
+  // Read a byte.
+  char read_byte = 'a';
+  num_bytes = sizeof(read_byte);
+  EXPECT_EQ(MOJO_RESULT_OK,
+            this->ConsumerReadData(UserPointer<void>(&read_byte),
+                                   MakeUserPointer(&num_bytes), true, false));
+  EXPECT_EQ(1u, num_bytes);
+  EXPECT_EQ(kTestData[0], read_byte);
+
+  // Wait; should get the write threshold signal.
+  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
+  hss = HandleSignalsState();
+  this->ProducerRemoveAwakable(&waiter, &hss);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            hss.satisfiable_signals);
+
+  // Write 6 more bytes.
+  static const char kMoreTestData[6] = {'1', '2', '3', '4', '5', '6'};
+  num_bytes = 6;
+  EXPECT_EQ(MOJO_RESULT_OK,
+            this->ProducerWriteData(UserPointer<const void>(kMoreTestData),
+                                    MakeUserPointer(&num_bytes), false));
+  EXPECT_EQ(6u, num_bytes);
+
+  // Should no longer have the write threshold signal.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_OK,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+  this->ProducerRemoveAwakable(&waiter, nullptr);
+
+  // Set the write threshold to 0 (which means the element size, 1).
+  this->ProducerSetOptions(0);
+  write_threshold_num_bytes = 123u;
+  this->ProducerGetOptions(&write_threshold_num_bytes);
+  EXPECT_EQ(0u, write_threshold_num_bytes);
+
+  // Should still not have the write threshold signal.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_OK,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+  this->ProducerRemoveAwakable(&waiter, nullptr);
+
+  // Add a waiter.
+  waiter.Init();
+  ASSERT_EQ(MOJO_RESULT_OK,
+            this->ProducerAddAwakable(
+                &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+  // Close the consumer.
+  this->ConsumerClose();
+
+  // Wait; the condition should now never be satisfiable.
+  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+            waiter.Wait(test::TinyTimeout(), nullptr));
+  hss = HandleSignalsState();
+  this->ProducerRemoveAwakable(&waiter, &hss);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
+
+  this->ProducerClose();
+}
+
 TYPED_TEST(DataPipeImplTest, ReadThreshold) {
   const MojoCreateDataPipeOptions options = {
       kSizeOfOptions,                           // |struct_size|.
@@ -2561,7 +2741,7 @@
 
   // The default read threshold should be 0 (which means "default", i.e., one
   // element).
-  uint32_t read_threshold_num_bytes = 123u;
+  uint32_t read_threshold_num_bytes = 123;
   this->ConsumerGetOptions(&read_threshold_num_bytes);
   EXPECT_EQ(0u, read_threshold_num_bytes);
 
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index 726f1bb..febe217 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -9,6 +9,7 @@
 #include "base/logging.h"
 #include "mojo/edk/system/data_pipe.h"
 #include "mojo/edk/system/memory.h"
+#include "mojo/edk/system/options_validation.h"
 
 using mojo::platform::ScopedPlatformHandle;
 using mojo::util::MutexLocker;
@@ -74,6 +75,51 @@
   return dispatcher;
 }
 
+MojoResult DataPipeProducerDispatcher::SetDataPipeProducerOptionsImplNoLock(
+    UserPointer<const MojoDataPipeProducerOptions> options) {
+  mutex().AssertHeld();
+
+  // The default of 0 means 1 element (however big that is).
+  uint32_t write_threshold_num_bytes = 0;
+  if (!options.IsNull()) {
+    UserOptionsReader<MojoDataPipeProducerOptions> reader(options);
+    if (!reader.is_valid())
+      return MOJO_RESULT_INVALID_ARGUMENT;
+
+    if (!OPTIONS_STRUCT_HAS_MEMBER(MojoDataPipeProducerOptions,
+                                   write_threshold_num_bytes, reader))
+      return MOJO_RESULT_OK;
+
+    write_threshold_num_bytes = reader.options().write_threshold_num_bytes;
+  }
+
+  return data_pipe_->ProducerSetOptions(write_threshold_num_bytes);
+}
+
+MojoResult DataPipeProducerDispatcher::GetDataPipeProducerOptionsImplNoLock(
+    UserPointer<MojoDataPipeProducerOptions> options,
+    uint32_t options_num_bytes) {
+  mutex().AssertHeld();
+
+  // Note: If/when |MojoDataPipeProducerOptions| is extended beyond its initial
+  // definition, more work will be necessary. (See the definition of
+  // |MojoGetDataPipeProducerOptions()| in mojo/public/c/system/data_pipe.h.)
+  static_assert(sizeof(MojoDataPipeProducerOptions) == 8u,
+                "MojoDataPipeProducerOptions has been extended!");
+
+  if (options_num_bytes < sizeof(MojoDataPipeProducerOptions))
+    return MOJO_RESULT_INVALID_ARGUMENT;
+
+  uint32_t write_threshold_num_bytes = 0;
+  data_pipe_->ProducerGetOptions(&write_threshold_num_bytes);
+  MojoDataPipeProducerOptions model_options = {
+      sizeof(MojoDataPipeProducerOptions),  // |struct_size|.
+      write_threshold_num_bytes,            // |write_threshold_num_bytes|.
+  };
+  options.Put(model_options);
+  return MOJO_RESULT_OK;
+}
+
 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
     UserPointer<const void> elements,
     UserPointer<uint32_t> num_bytes,
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h
index daab3a9..5f3e197 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.h
@@ -47,6 +47,11 @@
   void CloseImplNoLock() override;
   util::RefPtr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock()
       override;
+  MojoResult SetDataPipeProducerOptionsImplNoLock(
+      UserPointer<const MojoDataPipeProducerOptions> options) override;
+  MojoResult GetDataPipeProducerOptionsImplNoLock(
+      UserPointer<MojoDataPipeProducerOptions> options,
+      uint32_t options_num_bytes) override;
   MojoResult WriteDataImplNoLock(UserPointer<const void> elements,
                                  UserPointer<uint32_t> num_bytes,
                                  MojoWriteDataFlags flags) override;
diff --git a/mojo/edk/system/dispatcher.cc b/mojo/edk/system/dispatcher.cc
index 1c98a99..386fa0e 100644
--- a/mojo/edk/system/dispatcher.cc
+++ b/mojo/edk/system/dispatcher.cc
@@ -140,6 +140,25 @@
                                flags);
 }
 
+MojoResult Dispatcher::SetDataPipeProducerOptions(
+    UserPointer<const MojoDataPipeProducerOptions> options) {
+  MutexLocker locker(&mutex_);
+  if (is_closed_)
+    return MOJO_RESULT_INVALID_ARGUMENT;
+
+  return SetDataPipeProducerOptionsImplNoLock(options);
+}
+
+MojoResult Dispatcher::GetDataPipeProducerOptions(
+    UserPointer<MojoDataPipeProducerOptions> options,
+    uint32_t options_num_bytes) {
+  MutexLocker locker(&mutex_);
+  if (is_closed_)
+    return MOJO_RESULT_INVALID_ARGUMENT;
+
+  return GetDataPipeProducerOptionsImplNoLock(options, options_num_bytes);
+}
+
 MojoResult Dispatcher::WriteData(UserPointer<const void> elements,
                                  UserPointer<uint32_t> num_bytes,
                                  MojoWriteDataFlags flags) {
@@ -326,6 +345,21 @@
   return MOJO_RESULT_INVALID_ARGUMENT;
 }
 
+MojoResult Dispatcher::SetDataPipeProducerOptionsImplNoLock(
+    UserPointer<const MojoDataPipeProducerOptions> /*options*/) {
+  mutex_.AssertHeld();
+  DCHECK(!is_closed_);
+  return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::GetDataPipeProducerOptionsImplNoLock(
+    UserPointer<MojoDataPipeProducerOptions> /*options*/,
+    uint32_t /*options_num_bytes*/) {
+  mutex_.AssertHeld();
+  DCHECK(!is_closed_);
+  return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
 MojoResult Dispatcher::WriteDataImplNoLock(UserPointer<const void> /*elements*/,
                                            UserPointer<uint32_t> /*num_bytes*/,
                                            MojoWriteDataFlags /*flags*/) {
diff --git a/mojo/edk/system/dispatcher.h b/mojo/edk/system/dispatcher.h
index fad468f..2206848 100644
--- a/mojo/edk/system/dispatcher.h
+++ b/mojo/edk/system/dispatcher.h
@@ -98,6 +98,11 @@
                          uint32_t* num_dispatchers,
                          MojoReadMessageFlags flags);
 
+  MojoResult SetDataPipeProducerOptions(
+      UserPointer<const MojoDataPipeProducerOptions> options);
+  MojoResult GetDataPipeProducerOptions(
+      UserPointer<MojoDataPipeProducerOptions> options,
+      uint32_t options_num_bytes);
   MojoResult WriteData(UserPointer<const void> elements,
                        UserPointer<uint32_t> elements_num_bytes,
                        MojoWriteDataFlags flags);
@@ -251,6 +256,12 @@
                                            uint32_t* num_dispatchers,
                                            MojoReadMessageFlags flags)
       MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+  virtual MojoResult SetDataPipeProducerOptionsImplNoLock(
+      UserPointer<const MojoDataPipeProducerOptions> options)
+      MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+  virtual MojoResult GetDataPipeProducerOptionsImplNoLock(
+      UserPointer<MojoDataPipeProducerOptions> options,
+      uint32_t options_num_bytes) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
   virtual MojoResult WriteDataImplNoLock(UserPointer<const void> elements,
                                          UserPointer<uint32_t> num_bytes,
                                          MojoWriteDataFlags flags)
diff --git a/mojo/edk/system/dispatcher_unittest.cc b/mojo/edk/system/dispatcher_unittest.cc
index eb956d5..aaf775d 100644
--- a/mojo/edk/system/dispatcher_unittest.cc
+++ b/mojo/edk/system/dispatcher_unittest.cc
@@ -61,6 +61,10 @@
             d->ReadMessage(NullUserPointer(), NullUserPointer(), nullptr,
                            nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            d->SetDataPipeProducerOptions(NullUserPointer()));
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            d->GetDataPipeProducerOptions(NullUserPointer(), 0));
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
             d->WriteData(NullUserPointer(), NullUserPointer(),
                          MOJO_WRITE_DATA_FLAG_NONE));
   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
@@ -114,6 +118,10 @@
             d->ReadMessage(NullUserPointer(), NullUserPointer(), nullptr,
                            nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            d->SetDataPipeProducerOptions(NullUserPointer()));
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+            d->GetDataPipeProducerOptions(NullUserPointer(), 0));
+  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
             d->WriteData(NullUserPointer(), NullUserPointer(),
                          MOJO_WRITE_DATA_FLAG_NONE));
   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
@@ -156,6 +164,8 @@
   CLOSE = 0,
   WRITE_MESSAGE,
   READ_MESSAGE,
+  SET_DATA_PIPE_PRODUCER_OPTIONS,
+  GET_DATA_PIPE_PRODUCER_OPTIONS,
   WRITE_DATA,
   BEGIN_WRITE_DATA,
   END_WRITE_DATA,
@@ -200,6 +210,14 @@
           dispatcher->ReadMessage(NullUserPointer(), NullUserPointer(), nullptr,
                                   nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
       break;
+    case DispatcherOp::SET_DATA_PIPE_PRODUCER_OPTIONS:
+      EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+                dispatcher->SetDataPipeProducerOptions(NullUserPointer()));
+      break;
+    case DispatcherOp::GET_DATA_PIPE_PRODUCER_OPTIONS:
+      EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+                dispatcher->GetDataPipeProducerOptions(NullUserPointer(), 0));
+      break;
     case DispatcherOp::WRITE_DATA:
       EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
                 dispatcher->WriteData(NullUserPointer(), NullUserPointer(),
diff --git a/mojo/edk/system/local_data_pipe_impl.cc b/mojo/edk/system/local_data_pipe_impl.cc
index 7c05c22..dcb527c 100644
--- a/mojo/edk/system/local_data_pipe_impl.cc
+++ b/mojo/edk/system/local_data_pipe_impl.cc
@@ -144,10 +144,18 @@
 HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const {
   HandleSignalsState rv;
   if (consumer_open()) {
-    if (current_num_bytes_ < capacity_num_bytes() &&
-        !producer_in_two_phase_write())
-      rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
-    rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+    if (!producer_in_two_phase_write()) {
+      // |producer_write_threshold_num_bytes()| is always at least 1.
+      if (capacity_num_bytes() - current_num_bytes_ >=
+          producer_write_threshold_num_bytes()) {
+        rv.satisfied_signals |=
+            MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
+      } else if (current_num_bytes_ < capacity_num_bytes()) {
+        rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+      }
+    }
+    rv.satisfiable_signals |=
+        MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
   } else {
     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
   }
diff --git a/mojo/edk/system/remote_consumer_data_pipe_impl.cc b/mojo/edk/system/remote_consumer_data_pipe_impl.cc
index 2e03bfc..bd99162 100644
--- a/mojo/edk/system/remote_consumer_data_pipe_impl.cc
+++ b/mojo/edk/system/remote_consumer_data_pipe_impl.cc
@@ -257,10 +257,18 @@
     const {
   HandleSignalsState rv;
   if (consumer_open()) {
-    if (consumer_num_bytes_ < capacity_num_bytes() &&
-        !producer_in_two_phase_write())
-      rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
-    rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+    if (!producer_in_two_phase_write()) {
+      // |producer_write_threshold_num_bytes()| is always at least 1.
+      if (capacity_num_bytes() - consumer_num_bytes_ >=
+          producer_write_threshold_num_bytes()) {
+        rv.satisfied_signals |=
+            MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
+      } else if (consumer_num_bytes_ < capacity_num_bytes()) {
+        rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+      }
+    }
+    rv.satisfiable_signals |=
+        MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
   } else {
     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
   }
diff --git a/mojo/public/c/system/tests/core_unittest.cc b/mojo/public/c/system/tests/core_unittest.cc
index 0ad52ad..071a902 100644
--- a/mojo/public/c/system/tests/core_unittest.cc
+++ b/mojo/public/c/system/tests/core_unittest.cc
@@ -230,8 +230,10 @@
   // The producer |hp| should be writable.
   EXPECT_EQ(MOJO_RESULT_OK,
             MojoWait(hp, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &state));
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, state.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            state.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             state.satisfiable_signals);
 
   // Try to read from |hc|.
@@ -315,12 +317,15 @@
   // the producer never-writable?
 }
 
+// TODO(vtl): Once thunks are in: TEST(CoreTest, DataPipeWriteThreshold) { ... }
+
 TEST(CoreTest, DataPipeReadThreshold) {
   MojoHandle hp = MOJO_HANDLE_INVALID;
   MojoHandle hc = MOJO_HANDLE_INVALID;
   EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &hp, &hc));
   EXPECT_NE(hp, MOJO_HANDLE_INVALID);
   EXPECT_NE(hc, MOJO_HANDLE_INVALID);
+  EXPECT_NE(hc, hp);
 
   MojoDataPipeConsumerOptions copts;
   static const uint32_t kCoptsSize = static_cast<uint32_t>(sizeof(copts));
diff --git a/mojo/public/platform/native/system_impl_private_unittest.cc b/mojo/public/platform/native/system_impl_private_unittest.cc
index 900c932..f54b5e3 100644
--- a/mojo/public/platform/native/system_impl_private_unittest.cc
+++ b/mojo/public/platform/native/system_impl_private_unittest.cc
@@ -168,8 +168,10 @@
       MOJO_RESULT_OK,
       MojoSystemImplWait(sys0, hp, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &state));
 
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, state.satisfied_signals);
-  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+            state.satisfied_signals);
+  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+                MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
             state.satisfiable_signals);
 
   // Try to read from |hc|.
@@ -267,6 +269,9 @@
   // 2 SystemImpls are leaked...
 }
 
+// TODO(vtl): Once thunks are in:
+//   TEST(SystemImplTest, DataPipeWriteThreshold) { ... }
+
 TEST(SystemImplTest, DataPipeReadThreshold) {
   MojoSystemImpl sys0 = MojoSystemImplCreateImpl();
   MojoSystemImpl sys1 = MojoSystemImplCreateImpl();