| // Copyright 2015 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/edk/system/slave_connection_manager.h" |
| |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/location.h" |
| #include "base/logging.h" |
| #include "base/message_loop/message_loop.h" |
| #include "mojo/edk/system/connection_manager_messages.h" |
| #include "mojo/edk/system/message_in_transit.h" |
| #include "mojo/edk/util/make_unique.h" |
| |
| namespace mojo { |
| namespace system { |
| |
| // SlaveConnectionManager ------------------------------------------------------ |
| |
| SlaveConnectionManager::SlaveConnectionManager( |
| embedder::PlatformSupport* platform_support) |
| : ConnectionManager(platform_support), |
| slave_process_delegate_(), |
| private_thread_("SlaveConnectionManagerPrivateThread"), |
| awaiting_ack_type_(NOT_AWAITING_ACK), |
| ack_result_(nullptr), |
| ack_peer_process_identifier_(nullptr), |
| ack_is_first_(nullptr), |
| ack_platform_handle_(nullptr) {} |
| |
| SlaveConnectionManager::~SlaveConnectionManager() { |
| DCHECK(!delegate_thread_task_runner_); |
| DCHECK(!slave_process_delegate_); |
| DCHECK(!private_thread_.message_loop()); |
| DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
| DCHECK(!ack_result_); |
| DCHECK(!ack_peer_process_identifier_); |
| DCHECK(!ack_is_first_); |
| DCHECK(!ack_platform_handle_); |
| } |
| |
| void SlaveConnectionManager::Init( |
| embedder::PlatformTaskRunnerRefPtr delegate_thread_task_runner, |
| embedder::SlaveProcessDelegate* slave_process_delegate, |
| embedder::ScopedPlatformHandle platform_handle) { |
| DCHECK(delegate_thread_task_runner); |
| DCHECK(slave_process_delegate); |
| DCHECK(platform_handle.is_valid()); |
| DCHECK(!delegate_thread_task_runner_); |
| DCHECK(!slave_process_delegate_); |
| DCHECK(!private_thread_.message_loop()); |
| |
| delegate_thread_task_runner_ = delegate_thread_task_runner; |
| slave_process_delegate_ = slave_process_delegate; |
| CHECK(private_thread_.StartWithOptions( |
| base::Thread::Options(base::MessageLoop::TYPE_IO, 0))); |
| private_thread_.message_loop()->PostTask( |
| FROM_HERE, |
| base::Bind(&SlaveConnectionManager::InitOnPrivateThread, |
| base::Unretained(this), base::Passed(&platform_handle))); |
| event_.Wait(); |
| } |
| |
| void SlaveConnectionManager::Shutdown() { |
| AssertNotOnPrivateThread(); |
| DCHECK(slave_process_delegate_); |
| DCHECK(private_thread_.message_loop()); |
| |
| // The |Stop()| will actually finish all posted tasks. |
| private_thread_.message_loop()->PostTask( |
| FROM_HERE, base::Bind(&SlaveConnectionManager::ShutdownOnPrivateThread, |
| base::Unretained(this))); |
| private_thread_.Stop(); |
| slave_process_delegate_ = nullptr; |
| delegate_thread_task_runner_ = nullptr; |
| } |
| |
| bool SlaveConnectionManager::AllowConnect( |
| const ConnectionIdentifier& connection_id) { |
| AssertNotOnPrivateThread(); |
| |
| MutexLocker locker(&mutex_); |
| Result result = Result::FAILURE; |
| private_thread_.message_loop()->PostTask( |
| FROM_HERE, |
| base::Bind(&SlaveConnectionManager::AllowConnectOnPrivateThread, |
| base::Unretained(this), connection_id, &result)); |
| event_.Wait(); |
| DCHECK(result == Result::FAILURE || result == Result::SUCCESS); |
| return result == Result::SUCCESS; |
| } |
| |
| bool SlaveConnectionManager::CancelConnect( |
| const ConnectionIdentifier& connection_id) { |
| AssertNotOnPrivateThread(); |
| |
| MutexLocker locker(&mutex_); |
| Result result = Result::FAILURE; |
| private_thread_.message_loop()->PostTask( |
| FROM_HERE, |
| base::Bind(&SlaveConnectionManager::CancelConnectOnPrivateThread, |
| base::Unretained(this), connection_id, &result)); |
| event_.Wait(); |
| DCHECK(result == Result::FAILURE || result == Result::SUCCESS); |
| return result == Result::SUCCESS; |
| } |
| |
| ConnectionManager::Result SlaveConnectionManager::Connect( |
| const ConnectionIdentifier& connection_id, |
| ProcessIdentifier* peer_process_identifier, |
| bool* is_first, |
| embedder::ScopedPlatformHandle* platform_handle) { |
| AssertNotOnPrivateThread(); |
| DCHECK(peer_process_identifier); |
| DCHECK(is_first); |
| DCHECK(platform_handle); |
| DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely. |
| |
| MutexLocker locker(&mutex_); |
| Result result = Result::FAILURE; |
| private_thread_.message_loop()->PostTask( |
| FROM_HERE, |
| base::Bind(&SlaveConnectionManager::ConnectOnPrivateThread, |
| base::Unretained(this), connection_id, &result, |
| peer_process_identifier, is_first, platform_handle)); |
| event_.Wait(); |
| return result; |
| } |
| |
| void SlaveConnectionManager::InitOnPrivateThread( |
| embedder::ScopedPlatformHandle platform_handle) { |
| AssertOnPrivateThread(); |
| |
| raw_channel_ = RawChannel::Create(platform_handle.Pass()); |
| raw_channel_->Init(this); |
| event_.Signal(); |
| } |
| |
| void SlaveConnectionManager::ShutdownOnPrivateThread() { |
| AssertOnPrivateThread(); |
| |
| CHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
| if (raw_channel_) { |
| raw_channel_->Shutdown(); |
| raw_channel_.reset(); |
| } |
| } |
| |
| void SlaveConnectionManager::AllowConnectOnPrivateThread( |
| const ConnectionIdentifier& connection_id, |
| Result* result) { |
| DCHECK(result); |
| AssertOnPrivateThread(); |
| // This should only posted (from another thread, to |private_thread_|) with |
| // the lock held (until this thread triggers |event_|). |
| DCHECK(!mutex_.TryLock()); |
| DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
| |
| DVLOG(1) << "Sending AllowConnect: connection ID " |
| << connection_id.ToString(); |
| if (!raw_channel_->WriteMessage(util::MakeUnique<MessageInTransit>( |
| MessageInTransit::Type::CONNECTION_MANAGER, |
| MessageInTransit::Subtype::CONNECTION_MANAGER_ALLOW_CONNECT, |
| sizeof(connection_id), &connection_id))) { |
| // Don't tear things down; possibly we'll still read some messages. |
| *result = Result::FAILURE; |
| event_.Signal(); |
| return; |
| } |
| awaiting_ack_type_ = AWAITING_ACCEPT_CONNECT_ACK; |
| ack_result_ = result; |
| } |
| |
| void SlaveConnectionManager::CancelConnectOnPrivateThread( |
| const ConnectionIdentifier& connection_id, |
| Result* result) { |
| DCHECK(result); |
| AssertOnPrivateThread(); |
| // This should only posted (from another thread, to |private_thread_|) with |
| // the lock held (until this thread triggers |event_|). |
| DCHECK(!mutex_.TryLock()); |
| DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
| |
| DVLOG(1) << "Sending CancelConnect: connection ID " |
| << connection_id.ToString(); |
| if (!raw_channel_->WriteMessage(util::MakeUnique<MessageInTransit>( |
| MessageInTransit::Type::CONNECTION_MANAGER, |
| MessageInTransit::Subtype::CONNECTION_MANAGER_CANCEL_CONNECT, |
| sizeof(connection_id), &connection_id))) { |
| // Don't tear things down; possibly we'll still read some messages. |
| *result = Result::FAILURE; |
| event_.Signal(); |
| return; |
| } |
| awaiting_ack_type_ = AWAITING_CANCEL_CONNECT_ACK; |
| ack_result_ = result; |
| } |
| |
| void SlaveConnectionManager::ConnectOnPrivateThread( |
| const ConnectionIdentifier& connection_id, |
| Result* result, |
| ProcessIdentifier* peer_process_identifier, |
| bool* is_first, |
| embedder::ScopedPlatformHandle* platform_handle) { |
| DCHECK(result); |
| AssertOnPrivateThread(); |
| // This should only posted (from another thread, to |private_thread_|) with |
| // the lock held (until this thread triggers |event_|). |
| DCHECK(!mutex_.TryLock()); |
| DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
| |
| DVLOG(1) << "Sending Connect: connection ID " << connection_id.ToString(); |
| if (!raw_channel_->WriteMessage(util::MakeUnique<MessageInTransit>( |
| MessageInTransit::Type::CONNECTION_MANAGER, |
| MessageInTransit::Subtype::CONNECTION_MANAGER_CONNECT, |
| sizeof(connection_id), &connection_id))) { |
| // Don't tear things down; possibly we'll still read some messages. |
| *result = Result::FAILURE; |
| platform_handle->reset(); |
| event_.Signal(); |
| return; |
| } |
| awaiting_ack_type_ = AWAITING_CONNECT_ACK; |
| ack_result_ = result; |
| ack_peer_process_identifier_ = peer_process_identifier; |
| ack_is_first_ = is_first; |
| ack_platform_handle_ = platform_handle; |
| } |
| |
| void SlaveConnectionManager::OnReadMessage( |
| const MessageInTransit::View& message_view, |
| embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
| AssertOnPrivateThread(); |
| |
| // Set |*ack_result_| to failure by default. |
| *ack_result_ = Result::FAILURE; |
| |
| // Note: Since we should be able to trust the master, simply crash (i.e., |
| // |CHECK()|-fail) if it sends us something invalid. |
| |
| // Unsolicited message. |
| CHECK_NE(awaiting_ack_type_, NOT_AWAITING_ACK); |
| // Bad message type. |
| CHECK_EQ(message_view.type(), MessageInTransit::Type::CONNECTION_MANAGER_ACK); |
| |
| size_t num_bytes = message_view.num_bytes(); |
| size_t num_platform_handles = platform_handles ? platform_handles->size() : 0; |
| |
| if (message_view.subtype() == |
| MessageInTransit::Subtype::CONNECTION_MANAGER_ACK_FAILURE) { |
| // Failure acks never have any contents. |
| DCHECK_EQ(num_bytes, 0u); |
| DCHECK_EQ(num_platform_handles, 0u); |
| // Leave |*ack_result_| as failure. |
| } else { |
| if (awaiting_ack_type_ != AWAITING_CONNECT_ACK) { |
| // In the non-"connect" case, there's only one type of success ack, which |
| // never has any contents. |
| CHECK_EQ(message_view.subtype(), |
| MessageInTransit::Subtype::CONNECTION_MANAGER_ACK_SUCCESS); |
| DCHECK_EQ(num_bytes, 0u); |
| DCHECK_EQ(num_platform_handles, 0u); |
| *ack_result_ = Result::SUCCESS; |
| DCHECK(!ack_peer_process_identifier_); |
| DCHECK(!ack_is_first_); |
| DCHECK(!ack_platform_handle_); |
| } else { |
| // Success acks for "connect" always have a |
| // |ConnectionManagerAckSuccessConnectData| as data. |
| CHECK_EQ(num_bytes, sizeof(ConnectionManagerAckSuccessConnectData)); |
| const ConnectionManagerAckSuccessConnectData& data = |
| *static_cast<const ConnectionManagerAckSuccessConnectData*>( |
| message_view.bytes()); |
| *ack_peer_process_identifier_ = data.peer_process_identifier; |
| *ack_is_first_ = data.is_first; |
| |
| switch (message_view.subtype()) { |
| case MessageInTransit::Subtype:: |
| CONNECTION_MANAGER_ACK_SUCCESS_CONNECT_SAME_PROCESS: |
| DCHECK_EQ(num_platform_handles, 0u); |
| *ack_result_ = Result::SUCCESS_CONNECT_SAME_PROCESS; |
| ack_platform_handle_->reset(); |
| break; |
| case MessageInTransit::Subtype:: |
| CONNECTION_MANAGER_ACK_SUCCESS_CONNECT_NEW_CONNECTION: |
| CHECK_EQ(num_platform_handles, 1u); |
| *ack_result_ = Result::SUCCESS_CONNECT_NEW_CONNECTION; |
| ack_platform_handle_->reset(platform_handles->at(0)); |
| platform_handles->at(0) = embedder::PlatformHandle(); |
| break; |
| case MessageInTransit::Subtype:: |
| CONNECTION_MANAGER_ACK_SUCCESS_CONNECT_REUSE_CONNECTION: |
| DCHECK_EQ(num_platform_handles, 0u); |
| *ack_result_ = Result::SUCCESS_CONNECT_REUSE_CONNECTION; |
| ack_platform_handle_->reset(); |
| break; |
| default: |
| CHECK(false); |
| } |
| } |
| } |
| |
| awaiting_ack_type_ = NOT_AWAITING_ACK; |
| ack_result_ = nullptr; |
| ack_peer_process_identifier_ = nullptr; |
| ack_is_first_ = nullptr; |
| ack_platform_handle_ = nullptr; |
| event_.Signal(); |
| } |
| |
| void SlaveConnectionManager::OnError(Error error) { |
| AssertOnPrivateThread(); |
| |
| // Ignore write errors, since we may still have some messages to read. |
| if (error == RawChannel::Delegate::ERROR_WRITE) |
| return; |
| |
| raw_channel_->Shutdown(); |
| raw_channel_.reset(); |
| |
| DCHECK(slave_process_delegate_); |
| embedder::PlatformPostTask( |
| delegate_thread_task_runner_.get(), |
| base::Bind(&embedder::SlaveProcessDelegate::OnMasterDisconnect, |
| base::Unretained(slave_process_delegate_))); |
| } |
| |
| void SlaveConnectionManager::AssertNotOnPrivateThread() const { |
| // This should only be called after |Init()| and before |Shutdown()|. (If not, |
| // the subsequent |DCHECK_NE()| is invalid, since the current thread may not |
| // have a message loop.) |
| DCHECK(private_thread_.message_loop()); |
| DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop()); |
| } |
| |
| void SlaveConnectionManager::AssertOnPrivateThread() const { |
| // This should only be called after |Init()| and before |Shutdown()|. |
| DCHECK(private_thread_.message_loop()); |
| DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop()); |
| } |
| |
| } // namespace system |
| } // namespace mojo |