EDK: Add implementation of data pipe producer write threshold stuff.
Entirely analogous to afe7672f3020a8b28fdbb316b3f7f91dcdd35ad0, which
did this for read threshold stuff.
* Mostly, "consumer" -> "producer", "read" -> "write", etc.
* Also doesn't expose thunks yet. (But does expose new handle signal.)
* The only non-mechanical changes versus the aforementioned change are
in {LocalDataPipeImpl,RemoteConsumerDataPipeImpl}::
ProducerGetHandleSignalsState() (these are essentially the same)
and DataPipeImplTest.WriteThreshold.
R=vardhan@google.com
Review URL: https://codereview.chromium.org/1885663002 .
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index c0cdd3f..ecde52e 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -405,6 +405,29 @@
return MOJO_RESULT_OK;
}
+MojoResult Core::SetDataPipeProducerOptions(
+ MojoHandle data_pipe_producer_handle,
+ UserPointer<const MojoDataPipeProducerOptions> options) {
+ RefPtr<Dispatcher> dispatcher;
+ MojoResult result = GetDispatcher(data_pipe_producer_handle, &dispatcher);
+ if (result != MOJO_RESULT_OK)
+ return result;
+
+ return dispatcher->SetDataPipeProducerOptions(options);
+}
+
+MojoResult Core::GetDataPipeProducerOptions(
+ MojoHandle data_pipe_producer_handle,
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes) {
+ RefPtr<Dispatcher> dispatcher;
+ MojoResult result = GetDispatcher(data_pipe_producer_handle, &dispatcher);
+ if (result != MOJO_RESULT_OK)
+ return result;
+
+ return dispatcher->GetDataPipeProducerOptions(options, options_num_bytes);
+}
+
MojoResult Core::WriteData(MojoHandle data_pipe_producer_handle,
UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index fe8cfbf..32a7ff2 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -130,6 +130,13 @@
UserPointer<const MojoCreateDataPipeOptions> options,
UserPointer<MojoHandle> data_pipe_producer_handle,
UserPointer<MojoHandle> data_pipe_consumer_handle);
+ MojoResult SetDataPipeProducerOptions(
+ MojoHandle data_pipe_producer_handle,
+ UserPointer<const MojoDataPipeProducerOptions> options);
+ MojoResult GetDataPipeProducerOptions(
+ MojoHandle data_pipe_producer_handle,
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes);
MojoResult WriteData(MojoHandle data_pipe_producer_handle,
UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
diff --git a/mojo/edk/system/core_unittest.cc b/mojo/edk/system/core_unittest.cc
index bb8967a..9395286 100644
--- a/mojo/edk/system/core_unittest.cc
+++ b/mojo/edk/system/core_unittest.cc
@@ -997,14 +997,18 @@
EXPECT_EQ(
MOJO_RESULT_FAILED_PRECONDITION,
core()->Wait(ph, MOJO_HANDLE_SIGNAL_READABLE, 0, MakeUserPointer(&hss)));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
hss = kEmptyMojoHandleSignalsState;
EXPECT_EQ(MOJO_RESULT_OK, core()->Wait(ph, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
MakeUserPointer(&hss)));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// Consumer should be never-writable, and not yet readable.
@@ -1179,6 +1183,92 @@
EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ch));
}
+TEST_F(CoreTest, DataPipeSetGetProducerOptions) {
+ MojoCreateDataPipeOptions options = {
+ sizeof(MojoCreateDataPipeOptions), // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 8, // |element_num_bytes|.
+ 800 // |capacity_num_bytes|.
+ };
+ MojoHandle ph, ch; // p is for producer and c is for consumer.
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->CreateDataPipe(MakeUserPointer(&options),
+ MakeUserPointer(&ph), MakeUserPointer(&ch)));
+ // Should get two distinct, valid handles.
+ EXPECT_NE(ph, MOJO_HANDLE_INVALID);
+ EXPECT_NE(ch, MOJO_HANDLE_INVALID);
+ EXPECT_NE(ph, ch);
+
+ // Get it.
+ MojoDataPipeProducerOptions popts = {};
+ const uint32_t kPoptsSize = static_cast<uint32_t>(sizeof(popts));
+ EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+ ph, MakeUserPointer(&popts), kPoptsSize));
+ EXPECT_EQ(kPoptsSize, popts.struct_size);
+ EXPECT_EQ(0u, popts.write_threshold_num_bytes);
+
+ // Invalid write threshold.
+ popts.struct_size = kPoptsSize;
+ popts.write_threshold_num_bytes = 4;
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+ // The options shouldn't change.
+ popts = MojoDataPipeProducerOptions();
+ EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+ ph, MakeUserPointer(&popts), kPoptsSize));
+ EXPECT_EQ(kPoptsSize, popts.struct_size);
+ EXPECT_EQ(0u, popts.write_threshold_num_bytes);
+
+ // Valid write threshold.
+ popts.struct_size = kPoptsSize;
+ popts.write_threshold_num_bytes = 8;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+ popts = MojoDataPipeProducerOptions();
+ EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+ ph, MakeUserPointer(&popts), kPoptsSize));
+ EXPECT_EQ(kPoptsSize, popts.struct_size);
+ EXPECT_EQ(8u, popts.write_threshold_num_bytes);
+
+ // Invalid write threshold.
+ popts.struct_size = kPoptsSize;
+ popts.write_threshold_num_bytes = 9;
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+ // The options shouldn't change.
+ popts = MojoDataPipeProducerOptions();
+ EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+ ph, MakeUserPointer(&popts), kPoptsSize));
+ EXPECT_EQ(kPoptsSize, popts.struct_size);
+ EXPECT_EQ(8u, popts.write_threshold_num_bytes);
+
+ // Valid write threshold.
+ popts.struct_size = kPoptsSize;
+ popts.write_threshold_num_bytes = 16;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->SetDataPipeProducerOptions(ph, MakeUserPointer(&popts)));
+ popts = MojoDataPipeProducerOptions();
+ EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+ ph, MakeUserPointer(&popts), kPoptsSize));
+ EXPECT_EQ(kPoptsSize, popts.struct_size);
+ EXPECT_EQ(16u, popts.write_threshold_num_bytes);
+
+ // Can also set to default by passing null.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->SetDataPipeProducerOptions(ph, NullUserPointer()));
+ popts = MojoDataPipeProducerOptions();
+ EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeProducerOptions(
+ ph, MakeUserPointer(&popts), kPoptsSize));
+ EXPECT_EQ(kPoptsSize, popts.struct_size);
+ // Note: Should be reported as 0 ("default"), even if it means the element
+ // struct_size.
+ EXPECT_EQ(0u, popts.write_threshold_num_bytes);
+
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ph));
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(ch));
+}
+
TEST_F(CoreTest, DataPipeSetGetConsumerOptions) {
MojoCreateDataPipeOptions options = {
sizeof(MojoCreateDataPipeOptions), // |struct_size|.
@@ -1196,7 +1286,7 @@
EXPECT_NE(ch, MOJO_HANDLE_INVALID);
EXPECT_NE(ph, ch);
- // Read it.
+ // Get it.
MojoDataPipeConsumerOptions copts = {};
const uint32_t kCoptsSize = static_cast<uint32_t>(sizeof(copts));
EXPECT_EQ(MOJO_RESULT_OK, core()->GetDataPipeConsumerOptions(
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
index 9a90c8a..c42b6fd 100644
--- a/mojo/edk/system/data_pipe.cc
+++ b/mojo/edk/system/data_pipe.cc
@@ -281,6 +281,29 @@
ProducerCloseNoLock();
}
+MojoResult DataPipe::ProducerSetOptions(uint32_t write_threshold_num_bytes) {
+ MutexLocker locker(&mutex_);
+ DCHECK(has_local_producer_no_lock());
+
+ if (write_threshold_num_bytes % element_num_bytes() != 0)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ HandleSignalsState old_producer_state =
+ impl_->ProducerGetHandleSignalsState();
+ producer_write_threshold_num_bytes_ = write_threshold_num_bytes;
+ HandleSignalsState new_producer_state =
+ impl_->ProducerGetHandleSignalsState();
+ if (!new_producer_state.equals(old_producer_state))
+ AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
+ return MOJO_RESULT_OK;
+}
+
+void DataPipe::ProducerGetOptions(uint32_t* write_threshold_num_bytes) {
+ MutexLocker locker(&mutex_);
+ DCHECK(has_local_producer_no_lock());
+ *write_threshold_num_bytes = producer_write_threshold_num_bytes_;
+}
+
MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
bool all_or_none) {
@@ -686,6 +709,7 @@
capacity_num_bytes_(validated_options.capacity_num_bytes),
producer_open_(true),
consumer_open_(true),
+ producer_write_threshold_num_bytes_(0),
consumer_read_threshold_num_bytes_(0),
producer_awakable_list_(has_local_producer ? new AwakableList()
: nullptr),
diff --git a/mojo/edk/system/data_pipe.h b/mojo/edk/system/data_pipe.h
index f559511..8ff9464 100644
--- a/mojo/edk/system/data_pipe.h
+++ b/mojo/edk/system/data_pipe.h
@@ -105,6 +105,8 @@
// corresponding names.
void ProducerCancelAllAwakables();
void ProducerClose();
+ MojoResult ProducerSetOptions(uint32_t write_threshold_num_bytes);
+ void ProducerGetOptions(uint32_t* write_threshold_num_bytes);
MojoResult ProducerWriteData(UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
bool all_or_none);
@@ -197,6 +199,14 @@
mutex_.AssertHeld();
return consumer_open_;
}
+ // Note that this returns the "real" write threshold (never zero).
+ size_t producer_write_threshold_num_bytes_no_lock() const
+ MOJO_SHARED_LOCKS_REQUIRED(mutex_) {
+ mutex_.AssertHeld();
+ return (producer_write_threshold_num_bytes_ > 0u)
+ ? producer_write_threshold_num_bytes_
+ : element_num_bytes_;
+ }
// Note that this returns the "real" read threshold (never zero).
size_t consumer_read_threshold_num_bytes_no_lock() const
MOJO_SHARED_LOCKS_REQUIRED(mutex_) {
@@ -282,6 +292,8 @@
bool producer_open_ MOJO_GUARDED_BY(mutex_);
bool consumer_open_ MOJO_GUARDED_BY(mutex_);
// This may be zero (in which case it "means" |element_num_bytes_|).
+ uint32_t producer_write_threshold_num_bytes_ MOJO_GUARDED_BY(mutex_);
+ // This may be zero (in which case it "means" |element_num_bytes_|).
uint32_t consumer_read_threshold_num_bytes_ MOJO_GUARDED_BY(mutex_);
// Non-null only if the producer or consumer, respectively, is local.
std::unique_ptr<AwakableList> producer_awakable_list_ MOJO_GUARDED_BY(mutex_);
diff --git a/mojo/edk/system/data_pipe_impl.h b/mojo/edk/system/data_pipe_impl.h
index e92d66c..bd3cc31 100644
--- a/mojo/edk/system/data_pipe_impl.h
+++ b/mojo/edk/system/data_pipe_impl.h
@@ -144,6 +144,10 @@
bool consumer_open() const MOJO_NO_THREAD_SAFETY_ANALYSIS {
return owner_->consumer_open_no_lock();
}
+ size_t producer_write_threshold_num_bytes() const
+ MOJO_NO_THREAD_SAFETY_ANALYSIS {
+ return owner_->producer_write_threshold_num_bytes_no_lock();
+ }
size_t consumer_read_threshold_num_bytes() const
MOJO_NO_THREAD_SAFETY_ANALYSIS {
return owner_->consumer_read_threshold_num_bytes_no_lock();
diff --git a/mojo/edk/system/data_pipe_impl_unittest.cc b/mojo/edk/system/data_pipe_impl_unittest.cc
index a783ad9..858044a 100644
--- a/mojo/edk/system/data_pipe_impl_unittest.cc
+++ b/mojo/edk/system/data_pipe_impl_unittest.cc
@@ -120,6 +120,12 @@
}
void ProducerClose() { helper_->ProducerClose(); }
+ MojoResult ProducerSetOptions(uint32_t write_threshold_num_bytes) {
+ return dpp()->ProducerSetOptions(write_threshold_num_bytes);
+ }
+ void ProducerGetOptions(uint32_t* write_threshold_num_bytes) {
+ dpp()->ProducerGetOptions(write_threshold_num_bytes);
+ }
MojoResult ProducerWriteData(UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
bool all_or_none) {
@@ -777,8 +783,10 @@
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12,
&hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// Already writable.
@@ -812,7 +820,8 @@
hss = HandleSignalsState();
this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// Wait for data to become available to the consumer.
@@ -848,7 +857,8 @@
hss = HandleSignalsState();
this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// Do it again.
@@ -874,8 +884,10 @@
EXPECT_EQ(78u, context);
hss = HandleSignalsState();
this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// Try writing, using a two-phase write.
@@ -918,8 +930,10 @@
EXPECT_EQ(90u, context);
hss = HandleSignalsState();
this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// Write one element.
@@ -1341,8 +1355,10 @@
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
&hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
void* write_ptr = nullptr;
@@ -1362,7 +1378,8 @@
hss = HandleSignalsState();
this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// It shouldn't be readable yet either (we'll wait later).
@@ -1381,8 +1398,10 @@
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3,
&hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// It should become readable.
@@ -1435,8 +1454,10 @@
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6,
&hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
hss.satisfiable_signals);
// But not readable.
@@ -2549,6 +2570,165 @@
this->ConsumerClose();
}
+TYPED_TEST(DataPipeImplTest, WriteThreshold) {
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 10u // |capacity_num_bytes|.
+ };
+ this->Create(options);
+ this->DoTransfer();
+
+ // The default write threshold should be 0 (which means "default", i.e., one
+ // element).
+ uint32_t write_threshold_num_bytes = 123;
+ this->ProducerGetOptions(&write_threshold_num_bytes);
+ EXPECT_EQ(0u, write_threshold_num_bytes);
+
+ Waiter waiter;
+ HandleSignalsState hss;
+
+ // Try to wait to the write threshold signal; it should already have it.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfiable_signals);
+
+ // Before writing, add a waiter on the consumer (since we'll need to know when
+ // the written bytes have propagated).
+ Waiter read_waiter;
+ read_waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ConsumerAddAwakable(&read_waiter, MOJO_HANDLE_SIGNAL_READABLE,
+ 0, nullptr));
+
+ // Write 5 bytes.
+ static const char kTestData[5] = {'A', 'B', 'C', 'D', 'E'};
+ uint32_t num_bytes = 5;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(kTestData),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(5u, num_bytes);
+
+ // It should still have the write threshold signal.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+ // Set the write threshold to 5.
+ this->ProducerSetOptions(5);
+ write_threshold_num_bytes = 123u;
+ this->ProducerGetOptions(&write_threshold_num_bytes);
+ EXPECT_EQ(5u, write_threshold_num_bytes);
+
+ // Should still have the write threshold signal.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+ // Set the write threshold to 6.
+ this->ProducerSetOptions(6);
+
+ // Should no longer have the write threshold signal.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+ hss = HandleSignalsState();
+ this->ProducerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfiable_signals);
+
+ // Add a waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+ // Wait for the consumer to be readable.
+ EXPECT_EQ(MOJO_RESULT_OK, read_waiter.Wait(test::TinyTimeout(), nullptr));
+ this->ConsumerRemoveAwakable(&read_waiter, nullptr);
+
+ // Read a byte.
+ char read_byte = 'a';
+ num_bytes = sizeof(read_byte);
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(&read_byte),
+ MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(1u, num_bytes);
+ EXPECT_EQ(kTestData[0], read_byte);
+
+ // Wait; should get the write threshold signal.
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
+ hss = HandleSignalsState();
+ this->ProducerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ hss.satisfiable_signals);
+
+ // Write 6 more bytes.
+ static const char kMoreTestData[6] = {'1', '2', '3', '4', '5', '6'};
+ num_bytes = 6;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(kMoreTestData),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(6u, num_bytes);
+
+ // Should no longer have the write threshold signal.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+ this->ProducerRemoveAwakable(&waiter, nullptr);
+
+ // Set the write threshold to 0 (which means the element size, 1).
+ this->ProducerSetOptions(0);
+ write_threshold_num_bytes = 123u;
+ this->ProducerGetOptions(&write_threshold_num_bytes);
+ EXPECT_EQ(0u, write_threshold_num_bytes);
+
+ // Should still not have the write threshold signal.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
+ this->ProducerRemoveAwakable(&waiter, nullptr);
+
+ // Add a waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ProducerAddAwakable(
+ &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr));
+
+ // Close the consumer.
+ this->ConsumerClose();
+
+ // Wait; the condition should now never be satisfiable.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ waiter.Wait(test::TinyTimeout(), nullptr));
+ hss = HandleSignalsState();
+ this->ProducerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
+
+ this->ProducerClose();
+}
+
TYPED_TEST(DataPipeImplTest, ReadThreshold) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions, // |struct_size|.
@@ -2561,7 +2741,7 @@
// The default read threshold should be 0 (which means "default", i.e., one
// element).
- uint32_t read_threshold_num_bytes = 123u;
+ uint32_t read_threshold_num_bytes = 123;
this->ConsumerGetOptions(&read_threshold_num_bytes);
EXPECT_EQ(0u, read_threshold_num_bytes);
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index 726f1bb..febe217 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -9,6 +9,7 @@
#include "base/logging.h"
#include "mojo/edk/system/data_pipe.h"
#include "mojo/edk/system/memory.h"
+#include "mojo/edk/system/options_validation.h"
using mojo::platform::ScopedPlatformHandle;
using mojo::util::MutexLocker;
@@ -74,6 +75,51 @@
return dispatcher;
}
+MojoResult DataPipeProducerDispatcher::SetDataPipeProducerOptionsImplNoLock(
+ UserPointer<const MojoDataPipeProducerOptions> options) {
+ mutex().AssertHeld();
+
+ // The default of 0 means 1 element (however big that is).
+ uint32_t write_threshold_num_bytes = 0;
+ if (!options.IsNull()) {
+ UserOptionsReader<MojoDataPipeProducerOptions> reader(options);
+ if (!reader.is_valid())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (!OPTIONS_STRUCT_HAS_MEMBER(MojoDataPipeProducerOptions,
+ write_threshold_num_bytes, reader))
+ return MOJO_RESULT_OK;
+
+ write_threshold_num_bytes = reader.options().write_threshold_num_bytes;
+ }
+
+ return data_pipe_->ProducerSetOptions(write_threshold_num_bytes);
+}
+
+MojoResult DataPipeProducerDispatcher::GetDataPipeProducerOptionsImplNoLock(
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes) {
+ mutex().AssertHeld();
+
+ // Note: If/when |MojoDataPipeProducerOptions| is extended beyond its initial
+ // definition, more work will be necessary. (See the definition of
+ // |MojoGetDataPipeProducerOptions()| in mojo/public/c/system/data_pipe.h.)
+ static_assert(sizeof(MojoDataPipeProducerOptions) == 8u,
+ "MojoDataPipeProducerOptions has been extended!");
+
+ if (options_num_bytes < sizeof(MojoDataPipeProducerOptions))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ uint32_t write_threshold_num_bytes = 0;
+ data_pipe_->ProducerGetOptions(&write_threshold_num_bytes);
+ MojoDataPipeProducerOptions model_options = {
+ sizeof(MojoDataPipeProducerOptions), // |struct_size|.
+ write_threshold_num_bytes, // |write_threshold_num_bytes|.
+ };
+ options.Put(model_options);
+ return MOJO_RESULT_OK;
+}
+
MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h
index daab3a9..5f3e197 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.h
@@ -47,6 +47,11 @@
void CloseImplNoLock() override;
util::RefPtr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock()
override;
+ MojoResult SetDataPipeProducerOptionsImplNoLock(
+ UserPointer<const MojoDataPipeProducerOptions> options) override;
+ MojoResult GetDataPipeProducerOptionsImplNoLock(
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes) override;
MojoResult WriteDataImplNoLock(UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
MojoWriteDataFlags flags) override;
diff --git a/mojo/edk/system/dispatcher.cc b/mojo/edk/system/dispatcher.cc
index 1c98a99..386fa0e 100644
--- a/mojo/edk/system/dispatcher.cc
+++ b/mojo/edk/system/dispatcher.cc
@@ -140,6 +140,25 @@
flags);
}
+MojoResult Dispatcher::SetDataPipeProducerOptions(
+ UserPointer<const MojoDataPipeProducerOptions> options) {
+ MutexLocker locker(&mutex_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return SetDataPipeProducerOptionsImplNoLock(options);
+}
+
+MojoResult Dispatcher::GetDataPipeProducerOptions(
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes) {
+ MutexLocker locker(&mutex_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return GetDataPipeProducerOptionsImplNoLock(options, options_num_bytes);
+}
+
MojoResult Dispatcher::WriteData(UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
MojoWriteDataFlags flags) {
@@ -326,6 +345,21 @@
return MOJO_RESULT_INVALID_ARGUMENT;
}
+MojoResult Dispatcher::SetDataPipeProducerOptionsImplNoLock(
+ UserPointer<const MojoDataPipeProducerOptions> /*options*/) {
+ mutex_.AssertHeld();
+ DCHECK(!is_closed_);
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::GetDataPipeProducerOptionsImplNoLock(
+ UserPointer<MojoDataPipeProducerOptions> /*options*/,
+ uint32_t /*options_num_bytes*/) {
+ mutex_.AssertHeld();
+ DCHECK(!is_closed_);
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
MojoResult Dispatcher::WriteDataImplNoLock(UserPointer<const void> /*elements*/,
UserPointer<uint32_t> /*num_bytes*/,
MojoWriteDataFlags /*flags*/) {
diff --git a/mojo/edk/system/dispatcher.h b/mojo/edk/system/dispatcher.h
index fad468f..2206848 100644
--- a/mojo/edk/system/dispatcher.h
+++ b/mojo/edk/system/dispatcher.h
@@ -98,6 +98,11 @@
uint32_t* num_dispatchers,
MojoReadMessageFlags flags);
+ MojoResult SetDataPipeProducerOptions(
+ UserPointer<const MojoDataPipeProducerOptions> options);
+ MojoResult GetDataPipeProducerOptions(
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes);
MojoResult WriteData(UserPointer<const void> elements,
UserPointer<uint32_t> elements_num_bytes,
MojoWriteDataFlags flags);
@@ -251,6 +256,12 @@
uint32_t* num_dispatchers,
MojoReadMessageFlags flags)
MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ virtual MojoResult SetDataPipeProducerOptionsImplNoLock(
+ UserPointer<const MojoDataPipeProducerOptions> options)
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ virtual MojoResult GetDataPipeProducerOptionsImplNoLock(
+ UserPointer<MojoDataPipeProducerOptions> options,
+ uint32_t options_num_bytes) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
virtual MojoResult WriteDataImplNoLock(UserPointer<const void> elements,
UserPointer<uint32_t> num_bytes,
MojoWriteDataFlags flags)
diff --git a/mojo/edk/system/dispatcher_unittest.cc b/mojo/edk/system/dispatcher_unittest.cc
index eb956d5..aaf775d 100644
--- a/mojo/edk/system/dispatcher_unittest.cc
+++ b/mojo/edk/system/dispatcher_unittest.cc
@@ -61,6 +61,10 @@
d->ReadMessage(NullUserPointer(), NullUserPointer(), nullptr,
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->SetDataPipeProducerOptions(NullUserPointer()));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->GetDataPipeProducerOptions(NullUserPointer(), 0));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
d->WriteData(NullUserPointer(), NullUserPointer(),
MOJO_WRITE_DATA_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
@@ -114,6 +118,10 @@
d->ReadMessage(NullUserPointer(), NullUserPointer(), nullptr,
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->SetDataPipeProducerOptions(NullUserPointer()));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->GetDataPipeProducerOptions(NullUserPointer(), 0));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
d->WriteData(NullUserPointer(), NullUserPointer(),
MOJO_WRITE_DATA_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
@@ -156,6 +164,8 @@
CLOSE = 0,
WRITE_MESSAGE,
READ_MESSAGE,
+ SET_DATA_PIPE_PRODUCER_OPTIONS,
+ GET_DATA_PIPE_PRODUCER_OPTIONS,
WRITE_DATA,
BEGIN_WRITE_DATA,
END_WRITE_DATA,
@@ -200,6 +210,14 @@
dispatcher->ReadMessage(NullUserPointer(), NullUserPointer(), nullptr,
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
break;
+ case DispatcherOp::SET_DATA_PIPE_PRODUCER_OPTIONS:
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ dispatcher->SetDataPipeProducerOptions(NullUserPointer()));
+ break;
+ case DispatcherOp::GET_DATA_PIPE_PRODUCER_OPTIONS:
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ dispatcher->GetDataPipeProducerOptions(NullUserPointer(), 0));
+ break;
case DispatcherOp::WRITE_DATA:
EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
dispatcher->WriteData(NullUserPointer(), NullUserPointer(),
diff --git a/mojo/edk/system/local_data_pipe_impl.cc b/mojo/edk/system/local_data_pipe_impl.cc
index 7c05c22..dcb527c 100644
--- a/mojo/edk/system/local_data_pipe_impl.cc
+++ b/mojo/edk/system/local_data_pipe_impl.cc
@@ -144,10 +144,18 @@
HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const {
HandleSignalsState rv;
if (consumer_open()) {
- if (current_num_bytes_ < capacity_num_bytes() &&
- !producer_in_two_phase_write())
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ if (!producer_in_two_phase_write()) {
+ // |producer_write_threshold_num_bytes()| is always at least 1.
+ if (capacity_num_bytes() - current_num_bytes_ >=
+ producer_write_threshold_num_bytes()) {
+ rv.satisfied_signals |=
+ MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
+ } else if (current_num_bytes_ < capacity_num_bytes()) {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ }
+ }
+ rv.satisfiable_signals |=
+ MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
} else {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
}
diff --git a/mojo/edk/system/remote_consumer_data_pipe_impl.cc b/mojo/edk/system/remote_consumer_data_pipe_impl.cc
index 2e03bfc..bd99162 100644
--- a/mojo/edk/system/remote_consumer_data_pipe_impl.cc
+++ b/mojo/edk/system/remote_consumer_data_pipe_impl.cc
@@ -257,10 +257,18 @@
const {
HandleSignalsState rv;
if (consumer_open()) {
- if (consumer_num_bytes_ < capacity_num_bytes() &&
- !producer_in_two_phase_write())
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ if (!producer_in_two_phase_write()) {
+ // |producer_write_threshold_num_bytes()| is always at least 1.
+ if (capacity_num_bytes() - consumer_num_bytes_ >=
+ producer_write_threshold_num_bytes()) {
+ rv.satisfied_signals |=
+ MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
+ } else if (consumer_num_bytes_ < capacity_num_bytes()) {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ }
+ }
+ rv.satisfiable_signals |=
+ MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD;
} else {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
}
diff --git a/mojo/public/c/system/tests/core_unittest.cc b/mojo/public/c/system/tests/core_unittest.cc
index 0ad52ad..071a902 100644
--- a/mojo/public/c/system/tests/core_unittest.cc
+++ b/mojo/public/c/system/tests/core_unittest.cc
@@ -230,8 +230,10 @@
// The producer |hp| should be writable.
EXPECT_EQ(MOJO_RESULT_OK,
MojoWait(hp, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &state));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, state.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ state.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
state.satisfiable_signals);
// Try to read from |hc|.
@@ -315,12 +317,15 @@
// the producer never-writable?
}
+// TODO(vtl): Once thunks are in: TEST(CoreTest, DataPipeWriteThreshold) { ... }
+
TEST(CoreTest, DataPipeReadThreshold) {
MojoHandle hp = MOJO_HANDLE_INVALID;
MojoHandle hc = MOJO_HANDLE_INVALID;
EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &hp, &hc));
EXPECT_NE(hp, MOJO_HANDLE_INVALID);
EXPECT_NE(hc, MOJO_HANDLE_INVALID);
+ EXPECT_NE(hc, hp);
MojoDataPipeConsumerOptions copts;
static const uint32_t kCoptsSize = static_cast<uint32_t>(sizeof(copts));
diff --git a/mojo/public/platform/native/system_impl_private_unittest.cc b/mojo/public/platform/native/system_impl_private_unittest.cc
index 900c932..f54b5e3 100644
--- a/mojo/public/platform/native/system_impl_private_unittest.cc
+++ b/mojo/public/platform/native/system_impl_private_unittest.cc
@@ -168,8 +168,10 @@
MOJO_RESULT_OK,
MojoSystemImplWait(sys0, hp, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &state));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, state.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
+ state.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
+ MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
state.satisfiable_signals);
// Try to read from |hc|.
@@ -267,6 +269,9 @@
// 2 SystemImpls are leaked...
}
+// TODO(vtl): Once thunks are in:
+// TEST(SystemImplTest, DataPipeWriteThreshold) { ... }
+
TEST(SystemImplTest, DataPipeReadThreshold) {
MojoSystemImpl sys0 = MojoSystemImplCreateImpl();
MojoSystemImpl sys1 = MojoSystemImplCreateImpl();