| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/edk/system/data_pipe.h" |
| |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <limits> |
| #include <memory> |
| #include <utility> |
| |
| #include "base/logging.h" |
| #include "mojo/edk/platform/aligned_alloc.h" |
| #include "mojo/edk/system/awakable_list.h" |
| #include "mojo/edk/system/channel.h" |
| #include "mojo/edk/system/configuration.h" |
| #include "mojo/edk/system/data_pipe_impl.h" |
| #include "mojo/edk/system/incoming_endpoint.h" |
| #include "mojo/edk/system/local_data_pipe_impl.h" |
| #include "mojo/edk/system/memory.h" |
| #include "mojo/edk/system/options_validation.h" |
| #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
| #include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
| #include "mojo/edk/util/make_unique.h" |
| |
| using mojo::platform::AlignedUniquePtr; |
| using mojo::platform::ScopedPlatformHandle; |
| using mojo::util::MakeUnique; |
| using mojo::util::MutexLocker; |
| using mojo::util::RefPtr; |
| |
| namespace mojo { |
| namespace system { |
| |
| // static |
| MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
| MojoCreateDataPipeOptions result = { |
| static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
| MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
| 1u, |
| static_cast<uint32_t>( |
| GetConfiguration().default_data_pipe_capacity_bytes)}; |
| return result; |
| } |
| |
| // static |
| MojoResult DataPipe::ValidateCreateOptions( |
| UserPointer<const MojoCreateDataPipeOptions> in_options, |
| MojoCreateDataPipeOptions* out_options) { |
| const MojoCreateDataPipeOptionsFlags kKnownFlags = |
| MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
| |
| *out_options = GetDefaultCreateOptions(); |
| if (in_options.IsNull()) |
| return MOJO_RESULT_OK; |
| |
| UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options); |
| if (!reader.is_valid()) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader)) |
| return MOJO_RESULT_OK; |
| if ((reader.options().flags & ~kKnownFlags)) |
| return MOJO_RESULT_UNIMPLEMENTED; |
| out_options->flags = reader.options().flags; |
| |
| // Checks for fields beyond |flags|: |
| |
| if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, element_num_bytes, |
| reader)) |
| return MOJO_RESULT_OK; |
| if (reader.options().element_num_bytes == 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| out_options->element_num_bytes = reader.options().element_num_bytes; |
| |
| if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes, |
| reader) || |
| reader.options().capacity_num_bytes == 0) { |
| // Round the default capacity down to a multiple of the element size (but at |
| // least one element). |
| size_t default_data_pipe_capacity_bytes = |
| GetConfiguration().default_data_pipe_capacity_bytes; |
| out_options->capacity_num_bytes = |
| std::max(static_cast<uint32_t>(default_data_pipe_capacity_bytes - |
| (default_data_pipe_capacity_bytes % |
| out_options->element_num_bytes)), |
| out_options->element_num_bytes); |
| return MOJO_RESULT_OK; |
| } |
| if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| if (reader.options().capacity_num_bytes > |
| GetConfiguration().max_data_pipe_capacity_bytes) |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
| |
| return MOJO_RESULT_OK; |
| } |
| |
| // static |
| RefPtr<DataPipe> DataPipe::CreateLocal( |
| const MojoCreateDataPipeOptions& validated_options) { |
| return AdoptRef(new DataPipe(true, true, validated_options, |
| MakeUnique<LocalDataPipeImpl>())); |
| } |
| |
| // static |
| RefPtr<DataPipe> DataPipe::CreateRemoteProducerFromExisting( |
| const MojoCreateDataPipeOptions& validated_options, |
| MessageInTransitQueue* message_queue, |
| RefPtr<ChannelEndpoint>&& channel_endpoint) { |
| AlignedUniquePtr<char> buffer; |
| size_t buffer_num_bytes = 0; |
| if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
| validated_options, message_queue, &buffer, &buffer_num_bytes)) |
| return nullptr; |
| |
| // Important: This is called under |IncomingEndpoint|'s (which is a |
| // |ChannelEndpointClient|) lock, in particular from |
| // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that |
| // lock, it will reset its |endpoint_| member, which makes any later or |
| // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
| // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
| // is called. |
| RefPtr<DataPipe> data_pipe = AdoptRef(new DataPipe( |
| false, true, validated_options, |
| MakeUnique<RemoteProducerDataPipeImpl>( |
| channel_endpoint.Clone(), std::move(buffer), 0, buffer_num_bytes))); |
| if (channel_endpoint) { |
| if (!channel_endpoint->ReplaceClient(data_pipe.Clone(), 0)) |
| data_pipe->OnDetachFromChannel(0); |
| } else { |
| data_pipe->SetProducerClosed(); |
| } |
| return data_pipe; |
| } |
| |
| // static |
| RefPtr<DataPipe> DataPipe::CreateRemoteConsumerFromExisting( |
| const MojoCreateDataPipeOptions& validated_options, |
| size_t consumer_num_bytes, |
| MessageInTransitQueue* message_queue, |
| RefPtr<ChannelEndpoint>&& channel_endpoint) { |
| if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
| validated_options, &consumer_num_bytes, message_queue)) |
| return nullptr; |
| |
| // Important: This is called under |IncomingEndpoint|'s (which is a |
| // |ChannelEndpointClient|) lock, in particular from |
| // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that |
| // lock, it will reset its |endpoint_| member, which makes any later or |
| // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
| // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
| // is called. |
| RefPtr<DataPipe> data_pipe = AdoptRef(new DataPipe( |
| true, false, validated_options, |
| MakeUnique<RemoteConsumerDataPipeImpl>(channel_endpoint.Clone(), |
| consumer_num_bytes, nullptr, 0))); |
| if (channel_endpoint) { |
| if (!channel_endpoint->ReplaceClient(data_pipe.Clone(), 0)) |
| data_pipe->OnDetachFromChannel(0); |
| } else { |
| data_pipe->SetConsumerClosed(); |
| } |
| return data_pipe; |
| } |
| |
| // static |
| bool DataPipe::ProducerDeserialize(Channel* channel, |
| const void* source, |
| size_t size, |
| RefPtr<DataPipe>* data_pipe) { |
| DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
| |
| bool consumer_open = false; |
| if (size == sizeof(SerializedDataPipeProducerDispatcher)) { |
| consumer_open = false; |
| } else if (size == |
| sizeof(SerializedDataPipeProducerDispatcher) + |
| channel->GetSerializedEndpointSize()) { |
| consumer_open = true; |
| } else { |
| LOG(ERROR) << "Invalid serialized data pipe producer"; |
| return false; |
| } |
| |
| const SerializedDataPipeProducerDispatcher* s = |
| static_cast<const SerializedDataPipeProducerDispatcher*>(source); |
| MojoCreateDataPipeOptions revalidated_options = {}; |
| if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
| &revalidated_options) != MOJO_RESULT_OK) { |
| LOG(ERROR) << "Invalid serialized data pipe producer (bad options)"; |
| return false; |
| } |
| |
| if (!consumer_open) { |
| if (s->consumer_num_bytes != static_cast<size_t>(-1)) { |
| LOG(ERROR) |
| << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
| return false; |
| } |
| |
| *data_pipe = AdoptRef(new DataPipe( |
| true, false, revalidated_options, |
| MakeUnique<RemoteConsumerDataPipeImpl>(nullptr, 0, nullptr, 0))); |
| (*data_pipe)->SetConsumerClosed(); |
| |
| return true; |
| } |
| |
| if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || |
| s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { |
| LOG(ERROR) |
| << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
| return false; |
| } |
| |
| const void* endpoint_source = static_cast<const char*>(source) + |
| sizeof(SerializedDataPipeProducerDispatcher); |
| RefPtr<IncomingEndpoint> incoming_endpoint = |
| channel->DeserializeEndpoint(endpoint_source); |
| if (!incoming_endpoint) |
| return false; |
| |
| *data_pipe = incoming_endpoint->ConvertToDataPipeProducer( |
| revalidated_options, s->consumer_num_bytes); |
| if (!*data_pipe) |
| return false; |
| |
| return true; |
| } |
| |
| // static |
| bool DataPipe::ConsumerDeserialize(Channel* channel, |
| const void* source, |
| size_t size, |
| RefPtr<DataPipe>* data_pipe) { |
| DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
| |
| if (size != |
| sizeof(SerializedDataPipeConsumerDispatcher) + |
| channel->GetSerializedEndpointSize()) { |
| LOG(ERROR) << "Invalid serialized data pipe consumer"; |
| return false; |
| } |
| |
| const SerializedDataPipeConsumerDispatcher* s = |
| static_cast<const SerializedDataPipeConsumerDispatcher*>(source); |
| MojoCreateDataPipeOptions revalidated_options = {}; |
| if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
| &revalidated_options) != MOJO_RESULT_OK) { |
| LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)"; |
| return false; |
| } |
| |
| const void* endpoint_source = static_cast<const char*>(source) + |
| sizeof(SerializedDataPipeConsumerDispatcher); |
| RefPtr<IncomingEndpoint> incoming_endpoint = |
| channel->DeserializeEndpoint(endpoint_source); |
| if (!incoming_endpoint) |
| return false; |
| |
| *data_pipe = |
| incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options); |
| if (!*data_pipe) |
| return false; |
| |
| return true; |
| } |
| |
| void DataPipe::ProducerCancelAllAwakables() { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| producer_awakable_list_->CancelAll(); |
| } |
| |
| void DataPipe::ProducerClose() { |
| MutexLocker locker(&mutex_); |
| ProducerCloseNoLock(); |
| } |
| |
| MojoResult DataPipe::ProducerSetOptions(uint32_t write_threshold_num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (write_threshold_num_bytes % element_num_bytes() != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| HandleSignalsState old_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| producer_write_threshold_num_bytes_ = write_threshold_num_bytes; |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (!new_producer_state.equals(old_producer_state)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::ProducerGetOptions(uint32_t* write_threshold_num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| *write_threshold_num_bytes = producer_write_threshold_num_bytes_; |
| } |
| |
| MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements, |
| UserPointer<uint32_t> num_bytes, |
| bool all_or_none) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (producer_in_two_phase_write_no_lock()) |
| return MOJO_RESULT_BUSY; |
| if (!consumer_open_no_lock()) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| // Returning "busy" takes priority over "invalid argument". |
| uint32_t max_num_bytes_to_write = num_bytes.Get(); |
| if (max_num_bytes_to_write % element_num_bytes() != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| if (max_num_bytes_to_write == 0) |
| return MOJO_RESULT_OK; // Nothing to do. |
| |
| uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; |
| |
| HandleSignalsState old_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| MojoResult rv = impl_->ProducerWriteData( |
| elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); |
| HandleSignalsState new_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| if (!new_consumer_state.equals(old_consumer_state)) |
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
| return rv; |
| } |
| |
| MojoResult DataPipe::ProducerBeginWriteData( |
| UserPointer<void*> buffer, |
| UserPointer<uint32_t> buffer_num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (producer_in_two_phase_write_no_lock()) |
| return MOJO_RESULT_BUSY; |
| if (!consumer_open_no_lock()) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| // Note: No need to awake producer awakables, even though we're going from |
| // writable to non-writable (since you can't wait on non-writability). |
| // Similarly, though this may have discarded data (in "may discard" mode), |
| // making it non-readable, there's still no need to awake consumer awakables. |
| DCHECK(producer_in_two_phase_write_no_lock()); |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (!producer_in_two_phase_write_no_lock()) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| // Note: Allow successful completion of the two-phase write even if the |
| // consumer has been closed. |
| |
| HandleSignalsState old_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| MojoResult rv; |
| if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || |
| num_bytes_written % element_num_bytes() != 0) { |
| rv = MOJO_RESULT_INVALID_ARGUMENT; |
| producer_two_phase_max_num_bytes_written_ = 0; |
| } else { |
| rv = impl_->ProducerEndWriteData(num_bytes_written); |
| } |
| // Two-phase write ended even on failure. |
| DCHECK(!producer_in_two_phase_write_no_lock()); |
| // If we're now writable, we *became* writable (since we weren't writable |
| // during the two-phase write), so awake producer awakables. |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| HandleSignalsState new_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| if (!new_consumer_state.equals(old_consumer_state)) |
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
| return rv; |
| } |
| |
| HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| return impl_->ProducerGetHandleSignalsState(); |
| } |
| |
| MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
| MojoHandleSignals signals, |
| uint32_t context, |
| HandleSignalsState* signals_state) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState(); |
| if (producer_state.satisfies(signals)) { |
| if (signals_state) |
| *signals_state = producer_state; |
| return MOJO_RESULT_ALREADY_EXISTS; |
| } |
| if (!producer_state.can_satisfy(signals)) { |
| if (signals_state) |
| *signals_state = producer_state; |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| |
| producer_awakable_list_->Add(awakable, signals, context); |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::ProducerRemoveAwakable(Awakable* awakable, |
| HandleSignalsState* signals_state) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| producer_awakable_list_->Remove(awakable); |
| if (signals_state) |
| *signals_state = impl_->ProducerGetHandleSignalsState(); |
| } |
| |
| void DataPipe::ProducerStartSerialize(Channel* channel, |
| size_t* max_size, |
| size_t* max_platform_handles) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| impl_->ProducerStartSerialize(channel, max_size, max_platform_handles); |
| } |
| |
| bool DataPipe::ProducerEndSerialize( |
| Channel* channel, |
| void* destination, |
| size_t* actual_size, |
| std::vector<ScopedPlatformHandle>* platform_handles) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_producer_no_lock()); |
| // Warning: After |ProducerEndSerialize()|, quite probably |impl_| has |
| // changed. |
| bool rv = impl_->ProducerEndSerialize(channel, destination, actual_size, |
| platform_handles); |
| |
| // TODO(vtl): The code below is similar to, but not quite the same as, |
| // |ProducerCloseNoLock()|. |
| DCHECK(has_local_producer_no_lock()); |
| producer_awakable_list_->CancelAll(); |
| producer_awakable_list_.reset(); |
| // Not a bug, except possibly in "user" code. |
| DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
| << "Producer transferred with active two-phase write"; |
| producer_two_phase_max_num_bytes_written_ = 0; |
| if (!has_local_consumer_no_lock()) |
| producer_open_ = false; |
| |
| return rv; |
| } |
| |
| bool DataPipe::ProducerIsBusy() const { |
| MutexLocker locker(&mutex_); |
| return producer_in_two_phase_write_no_lock(); |
| } |
| |
| void DataPipe::ConsumerCancelAllAwakables() { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| consumer_awakable_list_->CancelAll(); |
| } |
| |
| void DataPipe::ConsumerClose() { |
| MutexLocker locker(&mutex_); |
| ConsumerCloseNoLock(); |
| } |
| |
| MojoResult DataPipe::ConsumerSetOptions(uint32_t read_threshold_num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (read_threshold_num_bytes % element_num_bytes() != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| HandleSignalsState old_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| consumer_read_threshold_num_bytes_ = read_threshold_num_bytes; |
| HandleSignalsState new_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| if (!new_consumer_state.equals(old_consumer_state)) |
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::ConsumerGetOptions(uint32_t* read_threshold_num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| *read_threshold_num_bytes = consumer_read_threshold_num_bytes_; |
| } |
| |
| MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements, |
| UserPointer<uint32_t> num_bytes, |
| bool all_or_none, |
| bool peek) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (consumer_in_two_phase_read_no_lock()) |
| return MOJO_RESULT_BUSY; |
| |
| uint32_t max_num_bytes_to_read = num_bytes.Get(); |
| if (max_num_bytes_to_read % element_num_bytes() != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| if (max_num_bytes_to_read == 0) |
| return MOJO_RESULT_OK; // Nothing to do. |
| |
| uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
| |
| HandleSignalsState old_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| MojoResult rv = impl_->ConsumerReadData( |
| elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (!new_producer_state.equals(old_producer_state)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| return rv; |
| } |
| |
| MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, |
| bool all_or_none) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (consumer_in_two_phase_read_no_lock()) |
| return MOJO_RESULT_BUSY; |
| |
| uint32_t max_num_bytes_to_discard = num_bytes.Get(); |
| if (max_num_bytes_to_discard % element_num_bytes() != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| if (max_num_bytes_to_discard == 0) |
| return MOJO_RESULT_OK; // Nothing to do. |
| |
| uint32_t min_num_bytes_to_discard = |
| all_or_none ? max_num_bytes_to_discard : 0; |
| |
| HandleSignalsState old_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| MojoResult rv = impl_->ConsumerDiscardData( |
| num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (!new_producer_state.equals(old_producer_state)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| return rv; |
| } |
| |
| MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (consumer_in_two_phase_read_no_lock()) |
| return MOJO_RESULT_BUSY; |
| |
| // Note: Don't need to validate |*num_bytes| for query. |
| return impl_->ConsumerQueryData(num_bytes); |
| } |
| |
| MojoResult DataPipe::ConsumerBeginReadData( |
| UserPointer<const void*> buffer, |
| UserPointer<uint32_t> buffer_num_bytes) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (consumer_in_two_phase_read_no_lock()) |
| return MOJO_RESULT_BUSY; |
| |
| MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| DCHECK(consumer_in_two_phase_read_no_lock()); |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (!consumer_in_two_phase_read_no_lock()) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| HandleSignalsState old_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| HandleSignalsState old_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| MojoResult rv; |
| if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || |
| num_bytes_read % element_num_bytes() != 0) { |
| rv = MOJO_RESULT_INVALID_ARGUMENT; |
| consumer_two_phase_max_num_bytes_read_ = 0; |
| } else { |
| rv = impl_->ConsumerEndReadData(num_bytes_read); |
| } |
| // Two-phase read ended even on failure. |
| DCHECK(!consumer_in_two_phase_read_no_lock()); |
| // If we're now readable, we *became* readable (since we weren't readable |
| // during the two-phase read), so awake consumer awakables. |
| HandleSignalsState new_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| if (!new_consumer_state.equals(old_consumer_state)) |
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (!new_producer_state.equals(old_producer_state)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| return rv; |
| } |
| |
| HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| return impl_->ConsumerGetHandleSignalsState(); |
| } |
| |
| MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
| MojoHandleSignals signals, |
| uint32_t context, |
| HandleSignalsState* signals_state) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState(); |
| if (consumer_state.satisfies(signals)) { |
| if (signals_state) |
| *signals_state = consumer_state; |
| return MOJO_RESULT_ALREADY_EXISTS; |
| } |
| if (!consumer_state.can_satisfy(signals)) { |
| if (signals_state) |
| *signals_state = consumer_state; |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| |
| consumer_awakable_list_->Add(awakable, signals, context); |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, |
| HandleSignalsState* signals_state) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| consumer_awakable_list_->Remove(awakable); |
| if (signals_state) |
| *signals_state = impl_->ConsumerGetHandleSignalsState(); |
| } |
| |
| void DataPipe::ConsumerStartSerialize(Channel* channel, |
| size_t* max_size, |
| size_t* max_platform_handles) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles); |
| } |
| |
| bool DataPipe::ConsumerEndSerialize( |
| Channel* channel, |
| void* destination, |
| size_t* actual_size, |
| std::vector<ScopedPlatformHandle>* platform_handles) { |
| MutexLocker locker(&mutex_); |
| DCHECK(has_local_consumer_no_lock()); |
| // Warning: After |ConsumerEndSerialize()|, quite probably |impl_| has |
| // changed. |
| bool rv = impl_->ConsumerEndSerialize(channel, destination, actual_size, |
| platform_handles); |
| |
| // TODO(vtl): The code below is similar to, but not quite the same as, |
| // |ConsumerCloseNoLock()|. |
| consumer_awakable_list_->CancelAll(); |
| consumer_awakable_list_.reset(); |
| // Not a bug, except possibly in "user" code. |
| DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
| << "Consumer transferred with active two-phase read"; |
| consumer_two_phase_max_num_bytes_read_ = 0; |
| if (!has_local_producer_no_lock()) |
| consumer_open_ = false; |
| |
| return rv; |
| } |
| |
| bool DataPipe::ConsumerIsBusy() const { |
| MutexLocker locker(&mutex_); |
| return consumer_in_two_phase_read_no_lock(); |
| } |
| |
| DataPipe::DataPipe(bool has_local_producer, |
| bool has_local_consumer, |
| const MojoCreateDataPipeOptions& validated_options, |
| std::unique_ptr<DataPipeImpl> impl) |
| : element_num_bytes_(validated_options.element_num_bytes), |
| capacity_num_bytes_(validated_options.capacity_num_bytes), |
| producer_open_(true), |
| consumer_open_(true), |
| producer_write_threshold_num_bytes_(0), |
| consumer_read_threshold_num_bytes_(0), |
| producer_awakable_list_(has_local_producer ? new AwakableList() |
| : nullptr), |
| consumer_awakable_list_(has_local_consumer ? new AwakableList() |
| : nullptr), |
| producer_two_phase_max_num_bytes_written_(0), |
| consumer_two_phase_max_num_bytes_read_(0), |
| impl_(std::move(impl)) { |
| impl_->set_owner(this); |
| |
| #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
| // Check that the passed in options actually are validated. |
| MojoCreateDataPipeOptions unused = {}; |
| DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
| MOJO_RESULT_OK); |
| #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
| } |
| |
| DataPipe::~DataPipe() { |
| DCHECK(!producer_open_); |
| DCHECK(!consumer_open_); |
| DCHECK(!producer_awakable_list_); |
| DCHECK(!consumer_awakable_list_); |
| } |
| |
| std::unique_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock( |
| std::unique_ptr<DataPipeImpl> new_impl) { |
| mutex_.AssertHeld(); |
| DCHECK(new_impl); |
| |
| impl_->set_owner(nullptr); |
| std::unique_ptr<DataPipeImpl> rv(std::move(impl_)); |
| impl_ = std::move(new_impl); |
| impl_->set_owner(this); |
| return rv; |
| } |
| |
| void DataPipe::SetProducerClosedNoLock() { |
| mutex_.AssertHeld(); |
| DCHECK(!has_local_producer_no_lock()); |
| DCHECK(producer_open_); |
| producer_open_ = false; |
| } |
| |
| void DataPipe::SetConsumerClosedNoLock() { |
| mutex_.AssertHeld(); |
| DCHECK(!has_local_consumer_no_lock()); |
| DCHECK(consumer_open_); |
| consumer_open_ = false; |
| } |
| |
| void DataPipe::ProducerCloseNoLock() { |
| mutex_.AssertHeld(); |
| DCHECK(producer_open_); |
| producer_open_ = false; |
| if (has_local_producer_no_lock()) { |
| producer_awakable_list_.reset(); |
| // Not a bug, except possibly in "user" code. |
| DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
| << "Producer closed with active two-phase write"; |
| producer_two_phase_max_num_bytes_written_ = 0; |
| impl_->ProducerClose(); |
| AwakeConsumerAwakablesForStateChangeNoLock( |
| impl_->ConsumerGetHandleSignalsState()); |
| } |
| } |
| |
| void DataPipe::ConsumerCloseNoLock() { |
| mutex_.AssertHeld(); |
| DCHECK(consumer_open_); |
| consumer_open_ = false; |
| if (has_local_consumer_no_lock()) { |
| consumer_awakable_list_.reset(); |
| // Not a bug, except possibly in "user" code. |
| DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
| << "Consumer closed with active two-phase read"; |
| consumer_two_phase_max_num_bytes_read_ = 0; |
| impl_->ConsumerClose(); |
| AwakeProducerAwakablesForStateChangeNoLock( |
| impl_->ProducerGetHandleSignalsState()); |
| } |
| } |
| |
| bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
| MutexLocker locker(&mutex_); |
| DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock()); |
| |
| HandleSignalsState old_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| HandleSignalsState old_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| |
| bool rv = impl_->OnReadMessage(port, message); |
| |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (!new_producer_state.equals(old_producer_state)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| HandleSignalsState new_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| if (!new_consumer_state.equals(old_consumer_state)) |
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
| |
| return rv; |
| } |
| |
| void DataPipe::OnDetachFromChannel(unsigned port) { |
| MutexLocker locker(&mutex_); |
| DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock()); |
| |
| HandleSignalsState old_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| HandleSignalsState old_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| |
| impl_->OnDetachFromChannel(port); |
| |
| HandleSignalsState new_producer_state = |
| impl_->ProducerGetHandleSignalsState(); |
| if (!new_producer_state.equals(old_producer_state)) |
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
| HandleSignalsState new_consumer_state = |
| impl_->ConsumerGetHandleSignalsState(); |
| if (!new_consumer_state.equals(old_consumer_state)) |
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
| } |
| |
| void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( |
| const HandleSignalsState& new_producer_state) { |
| mutex_.AssertHeld(); |
| if (!has_local_producer_no_lock()) |
| return; |
| producer_awakable_list_->AwakeForStateChange(new_producer_state); |
| } |
| |
| void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( |
| const HandleSignalsState& new_consumer_state) { |
| mutex_.AssertHeld(); |
| if (!has_local_consumer_no_lock()) |
| return; |
| consumer_awakable_list_->AwakeForStateChange(new_consumer_state); |
| } |
| |
| void DataPipe::SetProducerClosed() { |
| MutexLocker locker(&mutex_); |
| SetProducerClosedNoLock(); |
| } |
| |
| void DataPipe::SetConsumerClosed() { |
| MutexLocker locker(&mutex_); |
| SetConsumerClosedNoLock(); |
| } |
| |
| } // namespace system |
| } // namespace mojo |