Make it possible to remove an Awakable with a specific "context" from a Dispatcher. This will be needed for wait sets. (Before this change, removing an Awakable removed all instances of it.) R=vardhan@google.com BUG=#350 Review URL: https://codereview.chromium.org/2060943007 .
diff --git a/mojo/edk/system/awakable_list.cc b/mojo/edk/system/awakable_list.cc index 510b88a..e419d40 100644 --- a/mojo/edk/system/awakable_list.cc +++ b/mojo/edk/system/awakable_list.cc
@@ -70,5 +70,20 @@ awakables_.erase(last, awakables_.end()); } +void AwakableList::RemoveWithContext(Awakable* awakable, uint64_t context) { + // We allow a thread to wait on the same handle multiple times simultaneously, + // so we need to scan the entire list and remove all occurrences of |waiter|. + auto last = awakables_.end(); + for (AwakeInfoList::iterator it = awakables_.begin(); it != last;) { + if (it->awakable == awakable && it->context == context) { + --last; + std::swap(*it, *last); + } else { + ++it; + } + } + awakables_.erase(last, awakables_.end()); +} + } // namespace system } // namespace mojo
diff --git a/mojo/edk/system/awakable_list.h b/mojo/edk/system/awakable_list.h index 60d76b8..6275159 100644 --- a/mojo/edk/system/awakable_list.h +++ b/mojo/edk/system/awakable_list.h
@@ -34,6 +34,7 @@ void CancelAll(); void Add(Awakable* awakable, MojoHandleSignals signals, uint64_t context); void Remove(Awakable* awakable); + void RemoveWithContext(Awakable* awakable, uint64_t context); private: struct AwakeInfo {
diff --git a/mojo/edk/system/awakable_list_unittest.cc b/mojo/edk/system/awakable_list_unittest.cc index 9772e7a..45e390c 100644 --- a/mojo/edk/system/awakable_list_unittest.cc +++ b/mojo/edk/system/awakable_list_unittest.cc
@@ -285,6 +285,44 @@ EXPECT_EQ(10u, context4); } +TEST(AwakableListTest, RemoveWithContext) { + MojoResult result; + uint64_t context; + + { + AwakableList awakable_list; + test::SimpleWaiterThread thread(&result, &context); + awakable_list.Add(thread.waiter(), MOJO_HANDLE_SIGNAL_READABLE, 1); + awakable_list.Add(thread.waiter(), MOJO_HANDLE_SIGNAL_READABLE, 2); + thread.Start(); + awakable_list.RemoveWithContext(thread.waiter(), 2); + awakable_list.AwakeForStateChange(HandleSignalsState( + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE)); + awakable_list.RemoveWithContext(thread.waiter(), 1); + // Double-remove okay: + awakable_list.RemoveWithContext(thread.waiter(), 1); + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(1u, context); + + // Try the same thing, but remove "1" before the awake instead. + { + AwakableList awakable_list; + test::SimpleWaiterThread thread(&result, &context); + awakable_list.Add(thread.waiter(), MOJO_HANDLE_SIGNAL_READABLE, 1); + awakable_list.Add(thread.waiter(), MOJO_HANDLE_SIGNAL_READABLE, 2); + thread.Start(); + awakable_list.RemoveWithContext(thread.waiter(), 1); + awakable_list.AwakeForStateChange(HandleSignalsState( + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE)); + awakable_list.RemoveWithContext(thread.waiter(), 2); + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(2u, context); +} + class KeepAwakable : public Awakable { public: KeepAwakable() : awake_count(0) {}
diff --git a/mojo/edk/system/core_test_base.cc b/mojo/edk/system/core_test_base.cc index 5c57952..f56f3fd 100644 --- a/mojo/edk/system/core_test_base.cc +++ b/mojo/edk/system/core_test_base.cc
@@ -195,6 +195,16 @@ *signals_state = HandleSignalsState(); } + void RemoveAwakableWithContextImplNoLock( + Awakable* /*awakable*/, + uint64_t /*context*/, + HandleSignalsState* signals_state) override { + info_->IncrementRemoveAwakableWithContextCallCount(); + mutex().AssertHeld(); + if (signals_state) + *signals_state = HandleSignalsState(); + } + void CancelAllStateNoLock() override { info_->IncrementCancelAllStateCallCount(); mutex().AssertHeld(); @@ -340,6 +350,12 @@ return remove_awakable_call_count_; } +unsigned CoreTestBase_MockHandleInfo::GetRemoveAwakableWithContextCallCount() + const { + MutexLocker locker(&mutex_); + return remove_awakable_with_context_call_count_; +} + unsigned CoreTestBase_MockHandleInfo::GetCancelAllStateCallCount() const { MutexLocker locker(&mutex_); return cancel_all_awakables_call_count_; @@ -440,6 +456,12 @@ remove_awakable_call_count_++; } +void CoreTestBase_MockHandleInfo:: + IncrementRemoveAwakableWithContextCallCount() { + MutexLocker locker(&mutex_); + remove_awakable_with_context_call_count_++; +} + void CoreTestBase_MockHandleInfo::IncrementCancelAllStateCallCount() { MutexLocker locker(&mutex_); cancel_all_awakables_call_count_++;
diff --git a/mojo/edk/system/core_test_base.h b/mojo/edk/system/core_test_base.h index b828e76..4bdaa6e 100644 --- a/mojo/edk/system/core_test_base.h +++ b/mojo/edk/system/core_test_base.h
@@ -81,6 +81,7 @@ unsigned GetMapBufferCallCount() const; unsigned GetAddAwakableCallCount() const; unsigned GetRemoveAwakableCallCount() const; + unsigned GetRemoveAwakableWithContextCallCount() const; unsigned GetCancelAllStateCallCount() const; size_t GetAddedAwakableSize() const; @@ -104,6 +105,7 @@ void IncrementMapBufferCallCount(); void IncrementAddAwakableCallCount(); void IncrementRemoveAwakableCallCount(); + void IncrementRemoveAwakableWithContextCallCount(); void IncrementCancelAllStateCallCount(); void AllowAddAwakable(bool alllow); @@ -129,6 +131,7 @@ unsigned map_buffer_call_count_ MOJO_GUARDED_BY(mutex_) = 0; unsigned add_awakable_call_count_ MOJO_GUARDED_BY(mutex_) = 0; unsigned remove_awakable_call_count_ MOJO_GUARDED_BY(mutex_) = 0; + unsigned remove_awakable_with_context_call_count_ MOJO_GUARDED_BY(mutex_) = 0; unsigned cancel_all_awakables_call_count_ MOJO_GUARDED_BY(mutex_) = 0; bool add_awakable_allowed_ MOJO_GUARDED_BY(mutex_) = false;
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc index b64eec4..7167873 100644 --- a/mojo/edk/system/data_pipe.cc +++ b/mojo/edk/system/data_pipe.cc
@@ -432,6 +432,17 @@ *signals_state = impl_->ProducerGetHandleSignalsState(); } +void DataPipe::ProducerRemoveAwakableWithContext( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + MutexLocker locker(&mutex_); + DCHECK(has_local_producer_no_lock()); + producer_awakable_list_->RemoveWithContext(awakable, context); + if (signals_state) + *signals_state = impl_->ProducerGetHandleSignalsState(); +} + void DataPipe::ProducerStartSerialize(Channel* channel, size_t* max_size, size_t* max_platform_handles) { @@ -654,6 +665,17 @@ *signals_state = impl_->ConsumerGetHandleSignalsState(); } +void DataPipe::ConsumerRemoveAwakableWithContext( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + MutexLocker locker(&mutex_); + DCHECK(has_local_consumer_no_lock()); + consumer_awakable_list_->RemoveWithContext(awakable, context); + if (signals_state) + *signals_state = impl_->ConsumerGetHandleSignalsState(); +} + void DataPipe::ConsumerStartSerialize(Channel* channel, size_t* max_size, size_t* max_platform_handles) {
diff --git a/mojo/edk/system/data_pipe.h b/mojo/edk/system/data_pipe.h index 63083b7..3853efb 100644 --- a/mojo/edk/system/data_pipe.h +++ b/mojo/edk/system/data_pipe.h
@@ -121,6 +121,9 @@ HandleSignalsState* signals_state); void ProducerRemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state); + void ProducerRemoveAwakableWithContext(Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state); void ProducerStartSerialize(Channel* channel, size_t* max_size, size_t* max_platform_handles); @@ -156,6 +159,9 @@ HandleSignalsState* signals_state); void ConsumerRemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state); + void ConsumerRemoveAwakableWithContext(Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state); void ConsumerStartSerialize(Channel* channel, size_t* max_size, size_t* max_platform_handles);
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc index f9c8dc1..eb93a04 100644 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -210,6 +210,15 @@ data_pipe_->ConsumerRemoveAwakable(awakable, signals_state); } +void DataPipeConsumerDispatcher::RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + mutex().AssertHeld(); + data_pipe_->ConsumerRemoveAwakableWithContext(awakable, context, + signals_state); +} + void DataPipeConsumerDispatcher::StartSerializeImplNoLock( Channel* channel, size_t* max_size,
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h index 62b3013..c8977f3 100644 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.h +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h
@@ -74,6 +74,10 @@ HandleSignalsState* signals_state) override; void RemoveAwakableImplNoLock(Awakable* awakable, HandleSignalsState* signals_state) override; + void RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) override; void StartSerializeImplNoLock(Channel* channel, size_t* max_size, size_t* max_platform_handles) override
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc index 829c71d..c9c686a 100644 --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -185,6 +185,15 @@ data_pipe_->ProducerRemoveAwakable(awakable, signals_state); } +void DataPipeProducerDispatcher::RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + mutex().AssertHeld(); + data_pipe_->ProducerRemoveAwakableWithContext(awakable, context, + signals_state); +} + void DataPipeProducerDispatcher::StartSerializeImplNoLock( Channel* channel, size_t* max_size,
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h index 5248fee..0f72e7f 100644 --- a/mojo/edk/system/data_pipe_producer_dispatcher.h +++ b/mojo/edk/system/data_pipe_producer_dispatcher.h
@@ -74,6 +74,10 @@ HandleSignalsState* signals_state) override; void RemoveAwakableImplNoLock(Awakable* awakable, HandleSignalsState* signals_state) override; + void RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) override; void StartSerializeImplNoLock(Channel* channel, size_t* max_size, size_t* max_platform_handles) override
diff --git a/mojo/edk/system/dispatcher.cc b/mojo/edk/system/dispatcher.cc index 11f73e2..7d9741a 100644 --- a/mojo/edk/system/dispatcher.cc +++ b/mojo/edk/system/dispatcher.cc
@@ -337,15 +337,28 @@ } void Dispatcher::RemoveAwakable(Awakable* awakable, - HandleSignalsState* handle_signals_state) { + HandleSignalsState* signals_state) { MutexLocker locker(&mutex_); if (is_closed_) { - if (handle_signals_state) - *handle_signals_state = HandleSignalsState(); + if (signals_state) + *signals_state = HandleSignalsState(); return; } - RemoveAwakableImplNoLock(awakable, handle_signals_state); + RemoveAwakableImplNoLock(awakable, signals_state); +} + +void Dispatcher::RemoveAwakableWithContext(Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + MutexLocker locker(&mutex_); + if (is_closed_) { + if (signals_state) + *signals_state = HandleSignalsState(); + return; + } + + RemoveAwakableWithContextImplNoLock(awakable, context, signals_state); } Dispatcher::Dispatcher() : is_closed_(false) {} @@ -579,6 +592,18 @@ *signals_state = HandleSignalsState(); } +void Dispatcher::RemoveAwakableWithContextImplNoLock( + Awakable* /*awakable*/, + uint64_t /*context*/, + HandleSignalsState* signals_state) { + mutex_.AssertHeld(); + DCHECK(!is_closed_); + // By default, waiting isn't supported. Only dispatchers that can be waited on + // will do something nontrivial. + if (signals_state) + *signals_state = HandleSignalsState(); +} + void Dispatcher::StartSerializeImplNoLock(Channel* /*channel*/, size_t* max_size, size_t* max_platform_handles) {
diff --git a/mojo/edk/system/dispatcher.h b/mojo/edk/system/dispatcher.h index 427db64..e80ae7a 100644 --- a/mojo/edk/system/dispatcher.h +++ b/mojo/edk/system/dispatcher.h
@@ -196,6 +196,12 @@ // If |signals_state| is non-null, on *failure* |*signals_state| will be set // to the current handle signals state (on success, it is left untouched). // + // Any awakable that's added must either eventually be removed (using + // |RemoveAwakable()| or |RemoveAwakableWithContext()|, below). If an + // awakable's |Awake()| is called (by this dispatcher) and it returns false, + // that is equivalent to |RemoveAwakableWithContext()| being called for that + // awakable and the context passed to |Awake()|. + // // Returns: // - |MOJO_RESULT_OK| if the awakable was added; // - |MOJO_RESULT_ALREADY_EXISTS| if |signals| is already satisfied (if @@ -214,11 +220,17 @@ MojoHandleSignals signals, uint64_t context, HandleSignalsState* signals_state); - // Removes an awakable from this dispatcher. (It is valid to call this - // multiple times for the same |awakable| on the same object, so long as - // |AddAwakable()| was called at most once.) If |signals_state| is non-null, - // |*signals_state| will be set to the current handle signals state. + // Removes an awakable from this dispatcher. This will remove all instances of + // |awakable|, regardless of context. (It is valid to call this multiple times + // for the same |awakable| on the same object.) If |signals_state| is + // non-null, |*signals_state| will be set to the current handle signals state. void RemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state); + // Like |RemoveAwakable()|, but only removes "instances" with the given + // context. (This may also be called multiple times, and there may be multiple + // instances with the same context.) + void RemoveAwakableWithContext(Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state); // A dispatcher must be put into a special state in order to be sent across a // message pipe. Outside of tests, only |HandleTableAccess| is allowed to do @@ -397,6 +409,10 @@ virtual void RemoveAwakableImplNoLock(Awakable* awakable, HandleSignalsState* signals_state) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + virtual void RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_); // These implement the API used to serialize dispatchers to a |Channel| // (described below). They will only be called on a dispatcher that's attached
diff --git a/mojo/edk/system/local_message_pipe_endpoint.cc b/mojo/edk/system/local_message_pipe_endpoint.cc index 8e65486..276a009 100644 --- a/mojo/edk/system/local_message_pipe_endpoint.cc +++ b/mojo/edk/system/local_message_pipe_endpoint.cc
@@ -183,5 +183,15 @@ *signals_state = GetHandleSignalsState(); } +void LocalMessagePipeEndpoint::RemoveAwakableWithContext( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + DCHECK(is_open_); + awakable_list_.RemoveWithContext(awakable, context); + if (signals_state) + *signals_state = GetHandleSignalsState(); +} + } // namespace system } // namespace mojo
diff --git a/mojo/edk/system/local_message_pipe_endpoint.h b/mojo/edk/system/local_message_pipe_endpoint.h index 6649b1c..7e9904c 100644 --- a/mojo/edk/system/local_message_pipe_endpoint.h +++ b/mojo/edk/system/local_message_pipe_endpoint.h
@@ -44,6 +44,9 @@ HandleSignalsState* signals_state) override; void RemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state) override; + void RemoveAwakableWithContext(Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) override; // This is only to be used by |MessagePipe|: MessageInTransitQueue* message_queue() { return &message_queue_; }
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc index 5e6166a..74b73a7 100644 --- a/mojo/edk/system/message_pipe.cc +++ b/mojo/edk/system/message_pipe.cc
@@ -215,6 +215,18 @@ endpoints_[port]->RemoveAwakable(awakable, signals_state); } +void MessagePipe::RemoveAwakableWithContext(unsigned port, + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + DCHECK(port == 0 || port == 1); + + MutexLocker locker(&mutex_); + DCHECK(endpoints_[port]); + + endpoints_[port]->RemoveAwakableWithContext(awakable, context, signals_state); +} + void MessagePipe::StartSerialize(unsigned /*port*/, Channel* channel, size_t* max_size,
diff --git a/mojo/edk/system/message_pipe.h b/mojo/edk/system/message_pipe.h index 97052f4..c5a684d 100644 --- a/mojo/edk/system/message_pipe.h +++ b/mojo/edk/system/message_pipe.h
@@ -108,6 +108,10 @@ void RemoveAwakable(unsigned port, Awakable* awakable, HandleSignalsState* signals_state); + void RemoveAwakableWithContext(unsigned port, + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state); void StartSerialize(unsigned port, Channel* channel, size_t* max_size,
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc index e57264a..e16548e 100644 --- a/mojo/edk/system/message_pipe_dispatcher.cc +++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -218,6 +218,15 @@ message_pipe_->RemoveAwakable(port_, awakable, signals_state); } +void MessagePipeDispatcher::RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + mutex().AssertHeld(); + message_pipe_->RemoveAwakableWithContext(port_, awakable, context, + signals_state); +} + void MessagePipeDispatcher::StartSerializeImplNoLock( Channel* channel, size_t* max_size,
diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h index f30f438..a7360a6 100644 --- a/mojo/edk/system/message_pipe_dispatcher.h +++ b/mojo/edk/system/message_pipe_dispatcher.h
@@ -103,6 +103,10 @@ HandleSignalsState* signals_state) override; void RemoveAwakableImplNoLock(Awakable* awakable, HandleSignalsState* signals_state) override; + void RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) override; void StartSerializeImplNoLock(Channel* channel, size_t* max_size, size_t* max_platform_handles) override
diff --git a/mojo/edk/system/message_pipe_endpoint.cc b/mojo/edk/system/message_pipe_endpoint.cc index 49ee909..0211f86 100644 --- a/mojo/edk/system/message_pipe_endpoint.cc +++ b/mojo/edk/system/message_pipe_endpoint.cc
@@ -45,6 +45,15 @@ *signals_state = HandleSignalsState(); } +void MessagePipeEndpoint::RemoveAwakableWithContext( + Awakable* /*awakable*/, + uint64_t /*context*/, + HandleSignalsState* signals_state) { + NOTREACHED(); + if (signals_state) + *signals_state = HandleSignalsState(); +} + void MessagePipeEndpoint::Attach(ChannelEndpoint* /*channel_endpoint*/) { NOTREACHED(); }
diff --git a/mojo/edk/system/message_pipe_endpoint.h b/mojo/edk/system/message_pipe_endpoint.h index f2f38a5..9c1d546 100644 --- a/mojo/edk/system/message_pipe_endpoint.h +++ b/mojo/edk/system/message_pipe_endpoint.h
@@ -72,6 +72,9 @@ HandleSignalsState* signals_state); virtual void RemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state); + virtual void RemoveAwakableWithContext(Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state); // Implementations must override these if they represent a proxy endpoint. An // implementation for a local endpoint needs not override these methods, since
diff --git a/mojo/edk/system/simple_dispatcher.cc b/mojo/edk/system/simple_dispatcher.cc index 48ee27d..c3dcd61 100644 --- a/mojo/edk/system/simple_dispatcher.cc +++ b/mojo/edk/system/simple_dispatcher.cc
@@ -60,5 +60,15 @@ *signals_state = GetHandleSignalsStateImplNoLock(); } +void SimpleDispatcher::RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) { + mutex().AssertHeld(); + awakable_list_.RemoveWithContext(awakable, context); + if (signals_state) + *signals_state = GetHandleSignalsStateImplNoLock(); +} + } // namespace system } // namespace mojo
diff --git a/mojo/edk/system/simple_dispatcher.h b/mojo/edk/system/simple_dispatcher.h index 558d42c..4501e8c 100644 --- a/mojo/edk/system/simple_dispatcher.h +++ b/mojo/edk/system/simple_dispatcher.h
@@ -37,6 +37,10 @@ HandleSignalsState* signals_state) override; void RemoveAwakableImplNoLock(Awakable* awakable, HandleSignalsState* signals_state) override; + void RemoveAwakableWithContextImplNoLock( + Awakable* awakable, + uint64_t context, + HandleSignalsState* signals_state) override; private: AwakableList awakable_list_ MOJO_GUARDED_BY(mutex());