blob: 5ff8e56d85fe56a5f8252f64cf31655cd5f34f09 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/slave_connection_manager.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "mojo/edk/system/message_in_transit.h"
namespace mojo {
namespace system {
// SlaveConnectionManager ------------------------------------------------------
SlaveConnectionManager::SlaveConnectionManager()
: slave_process_delegate_(),
private_thread_("SlaveConnectionManagerPrivateThread"),
awaiting_ack_type_(NOT_AWAITING_ACK),
ack_result_(),
ack_peer_process_identifier_(),
ack_platform_handle_(),
event_(false, false) { // Auto-reset, not initially signalled.
}
SlaveConnectionManager::~SlaveConnectionManager() {
DCHECK(!delegate_thread_task_runner_);
DCHECK(!slave_process_delegate_);
DCHECK(!private_thread_.message_loop());
DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
DCHECK(!ack_result_);
DCHECK(!ack_peer_process_identifier_);
DCHECK(!ack_platform_handle_);
}
void SlaveConnectionManager::Init(
scoped_refptr<base::TaskRunner> delegate_thread_task_runner,
embedder::SlaveProcessDelegate* slave_process_delegate,
embedder::ScopedPlatformHandle platform_handle) {
DCHECK(delegate_thread_task_runner);
DCHECK(slave_process_delegate);
DCHECK(platform_handle.is_valid());
DCHECK(!delegate_thread_task_runner_);
DCHECK(!slave_process_delegate_);
DCHECK(!private_thread_.message_loop());
delegate_thread_task_runner_ = delegate_thread_task_runner;
slave_process_delegate_ = slave_process_delegate;
CHECK(private_thread_.StartWithOptions(
base::Thread::Options(base::MessageLoop::TYPE_IO, 0)));
private_thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&SlaveConnectionManager::InitOnPrivateThread,
base::Unretained(this), base::Passed(&platform_handle)));
event_.Wait();
}
void SlaveConnectionManager::Shutdown() {
AssertNotOnPrivateThread();
DCHECK(slave_process_delegate_);
DCHECK(private_thread_.message_loop());
// The |Stop()| will actually finish all posted tasks.
private_thread_.message_loop()->PostTask(
FROM_HERE, base::Bind(&SlaveConnectionManager::ShutdownOnPrivateThread,
base::Unretained(this)));
private_thread_.Stop();
slave_process_delegate_ = nullptr;
delegate_thread_task_runner_ = nullptr;
}
bool SlaveConnectionManager::AllowConnect(
const ConnectionIdentifier& connection_id) {
AssertNotOnPrivateThread();
base::AutoLock locker(lock_);
bool result = false;
private_thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&SlaveConnectionManager::AllowConnectOnPrivateThread,
base::Unretained(this), connection_id, &result));
event_.Wait();
return result;
}
bool SlaveConnectionManager::CancelConnect(
const ConnectionIdentifier& connection_id) {
AssertNotOnPrivateThread();
base::AutoLock locker(lock_);
bool result = false;
private_thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&SlaveConnectionManager::CancelConnectOnPrivateThread,
base::Unretained(this), connection_id, &result));
event_.Wait();
return result;
}
bool SlaveConnectionManager::Connect(
const ConnectionIdentifier& connection_id,
ProcessIdentifier* peer_process_identifier,
embedder::ScopedPlatformHandle* platform_handle) {
AssertNotOnPrivateThread();
base::AutoLock locker(lock_);
bool result = false;
private_thread_.message_loop()->PostTask(
FROM_HERE, base::Bind(&SlaveConnectionManager::ConnectOnPrivateThread,
base::Unretained(this), connection_id, &result,
peer_process_identifier, platform_handle));
event_.Wait();
return result;
}
void SlaveConnectionManager::InitOnPrivateThread(
embedder::ScopedPlatformHandle platform_handle) {
AssertOnPrivateThread();
raw_channel_ = RawChannel::Create(platform_handle.Pass());
raw_channel_->Init(this);
event_.Signal();
}
void SlaveConnectionManager::ShutdownOnPrivateThread() {
AssertOnPrivateThread();
CHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
if (raw_channel_) {
raw_channel_->Shutdown();
raw_channel_.reset();
}
}
void SlaveConnectionManager::AllowConnectOnPrivateThread(
const ConnectionIdentifier& connection_id,
bool* result) {
DCHECK(result);
AssertOnPrivateThread();
// This should only posted (from another thread, to |private_thread_|) with
// the lock held (until this thread triggers |event_|).
DCHECK(!lock_.Try());
DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
DVLOG(1) << "Sending AllowConnect: connection ID " << connection_id;
if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit(
MessageInTransit::kTypeConnectionManager,
MessageInTransit::kSubtypeConnectionManagerAllowConnect,
sizeof(connection_id), &connection_id)))) {
// Don't tear things down; possibly we'll still read some messages.
*result = false;
event_.Signal();
return;
}
awaiting_ack_type_ = AWAITING_ACCEPT_CONNECT_ACK;
ack_result_ = result;
}
void SlaveConnectionManager::CancelConnectOnPrivateThread(
const ConnectionIdentifier& connection_id,
bool* result) {
DCHECK(result);
AssertOnPrivateThread();
// This should only posted (from another thread, to |private_thread_|) with
// the lock held (until this thread triggers |event_|).
DCHECK(!lock_.Try());
DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
DVLOG(1) << "Sending CancelConnect: connection ID " << connection_id;
if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit(
MessageInTransit::kTypeConnectionManager,
MessageInTransit::kSubtypeConnectionManagerCancelConnect,
sizeof(connection_id), &connection_id)))) {
// Don't tear things down; possibly we'll still read some messages.
*result = false;
event_.Signal();
return;
}
awaiting_ack_type_ = AWAITING_CANCEL_CONNECT_ACK;
ack_result_ = result;
}
void SlaveConnectionManager::ConnectOnPrivateThread(
const ConnectionIdentifier& connection_id,
bool* result,
ProcessIdentifier* peer_process_identifier,
embedder::ScopedPlatformHandle* platform_handle) {
DCHECK(result);
DCHECK(platform_handle);
DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely.
AssertOnPrivateThread();
// This should only posted (from another thread, to |private_thread_|) with
// the lock held (until this thread triggers |event_|).
DCHECK(!lock_.Try());
DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
DVLOG(1) << "Sending Connect: connection ID " << connection_id;
if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit(
MessageInTransit::kTypeConnectionManager,
MessageInTransit::kSubtypeConnectionManagerConnect,
sizeof(connection_id), &connection_id)))) {
// Don't tear things down; possibly we'll still read some messages.
*result = false;
platform_handle->reset();
event_.Signal();
return;
}
awaiting_ack_type_ = AWAITING_CONNECT_ACK;
ack_result_ = result;
ack_peer_process_identifier_ = peer_process_identifier;
ack_platform_handle_ = platform_handle;
}
void SlaveConnectionManager::OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
AssertOnPrivateThread();
// Set |*ack_result_| to false by default.
*ack_result_ = false;
// Note: Since we should be able to trust the master, simply crash (i.e.,
// |CHECK()|-fail) if it sends us something invalid.
// Unsolicited message.
CHECK_NE(awaiting_ack_type_, NOT_AWAITING_ACK);
// Bad message type.
CHECK_EQ(message_view.type(), MessageInTransit::kTypeConnectionManagerAck);
size_t num_bytes = message_view.num_bytes();
size_t num_platform_handles = platform_handles ? platform_handles->size() : 0;
if (message_view.subtype() ==
MessageInTransit::kSubtypeConnectionManagerAckFailure) {
// Failure acks never have any contents.
CHECK_EQ(num_bytes, 0u);
CHECK_EQ(num_platform_handles, 0u);
// Leave |*ack_result_| false.
} else if (message_view.subtype() ==
MessageInTransit::kSubtypeConnectionManagerAckSuccess) {
if (awaiting_ack_type_ == AWAITING_ACCEPT_CONNECT_ACK ||
awaiting_ack_type_ == AWAITING_CANCEL_CONNECT_ACK) {
// Success acks for "accept/cancel connect" have no contents.
CHECK_EQ(num_bytes, 0u);
CHECK_EQ(num_platform_handles, 0u);
*ack_result_ = true;
DCHECK(!ack_peer_process_identifier_);
DCHECK(!ack_platform_handle_);
} else {
DCHECK_EQ(awaiting_ack_type_, AWAITING_CONNECT_ACK);
// Success acks for "connect" always have a |ProcessIdentifier| as data,
// and *maybe* one platform handle.
CHECK_EQ(num_bytes, sizeof(ProcessIdentifier));
CHECK_LE(num_platform_handles, 1u);
*ack_result_ = true;
*ack_peer_process_identifier_ =
*reinterpret_cast<const ProcessIdentifier*>(message_view.bytes());
if (num_platform_handles > 0) {
ack_platform_handle_->reset(platform_handles->at(0));
platform_handles->at(0) = embedder::PlatformHandle();
} else {
ack_platform_handle_->reset();
}
}
} else {
// Bad message subtype.
CHECK(false);
}
awaiting_ack_type_ = NOT_AWAITING_ACK;
ack_result_ = nullptr;
ack_peer_process_identifier_ = nullptr;
ack_platform_handle_ = nullptr;
event_.Signal();
}
void SlaveConnectionManager::OnError(Error error) {
AssertOnPrivateThread();
// Ignore write errors, since we may still have some messages to read.
if (error == RawChannel::Delegate::ERROR_WRITE)
return;
raw_channel_->Shutdown();
raw_channel_.reset();
DCHECK(slave_process_delegate_);
delegate_thread_task_runner_->PostTask(
FROM_HERE, base::Bind(&embedder::SlaveProcessDelegate::OnMasterDisconnect,
base::Unretained(slave_process_delegate_)));
}
void SlaveConnectionManager::AssertNotOnPrivateThread() const {
// This should only be called after |Init()| and before |Shutdown()|. (If not,
// the subsequent |DCHECK_NE()| is invalid, since the current thread may not
// have a message loop.)
DCHECK(private_thread_.message_loop());
DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop());
}
void SlaveConnectionManager::AssertOnPrivateThread() const {
// This should only be called after |Init()| and before |Shutdown()|.
DCHECK(private_thread_.message_loop());
DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop());
}
} // namespace system
} // namespace mojo