blob: a08d2c73a7ae30d22b00257750ceebc482506a49 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/channel_manager.h"
#include <utility>
#include "base/logging.h"
#include "mojo/edk/platform/platform_handle.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
using mojo::platform::PlatformHandle;
using mojo::platform::PlatformHandleWatcher;
using mojo::platform::ScopedPlatformHandle;
using mojo::platform::TaskRunner;
using mojo::util::MakeRefCounted;
using mojo::util::MutexLocker;
using mojo::util::RefPtr;
namespace mojo {
namespace system {
ChannelManager::ChannelManager(embedder::PlatformSupport* platform_support,
RefPtr<TaskRunner>&& io_task_runner,
PlatformHandleWatcher* io_watcher,
ConnectionManager* connection_manager)
: platform_support_(platform_support),
io_task_runner_(std::move(io_task_runner)),
io_watcher_(io_watcher),
connection_manager_(connection_manager) {
DCHECK(platform_support_);
DCHECK(io_task_runner_);
DCHECK(io_watcher_);
// (|connection_manager_| may be null.)
}
ChannelManager::~ChannelManager() {
// |Shutdown()| must be called before destruction and have been completed.
// TODO(vtl): This doesn't verify the above condition very strictly at all
// (e.g., we may never have had any channels, or we may have manually shut all
// the channels down).
DCHECK(channels_.empty());
}
void ChannelManager::ShutdownOnIOThread() {
// Taking this lock really shouldn't be necessary, but we do it for
// consistency.
ChannelIdToChannelMap channels;
{
MutexLocker locker(&mutex_);
channels.swap(channels_);
}
for (auto& channel : channels)
channel.second->Shutdown();
}
void ChannelManager::Shutdown(
std::function<void()>&& callback,
RefPtr<TaskRunner>&& callback_thread_task_runner) {
// TODO(vtl): With C++14 lambda captures, we'll be able to move |callback| and
// |callback_thread_task_runner| instead of copying them.
io_task_runner_->PostTask(
[this, callback, callback_thread_task_runner]() mutable {
ShutdownOnIOThread();
if (callback_thread_task_runner)
callback_thread_task_runner->PostTask(std::move(callback));
else
callback();
});
}
RefPtr<MessagePipeDispatcher> ChannelManager::CreateChannelOnIOThread(
ChannelId channel_id,
ScopedPlatformHandle platform_handle) {
RefPtr<ChannelEndpoint> bootstrap_channel_endpoint;
auto dispatcher = MessagePipeDispatcher::CreateRemoteMessagePipe(
&bootstrap_channel_endpoint);
CreateChannelOnIOThreadHelper(channel_id, platform_handle.Pass(),
std::move(bootstrap_channel_endpoint));
return dispatcher;
}
RefPtr<Channel> ChannelManager::CreateChannelWithoutBootstrapOnIOThread(
ChannelId channel_id,
ScopedPlatformHandle platform_handle) {
return CreateChannelOnIOThreadHelper(channel_id, platform_handle.Pass(),
nullptr);
}
RefPtr<MessagePipeDispatcher> ChannelManager::CreateChannel(
ChannelId channel_id,
ScopedPlatformHandle platform_handle,
std::function<void()>&& callback,
RefPtr<TaskRunner>&& callback_thread_task_runner) {
DCHECK(callback);
// (|callback_thread_task_runner| may be null.)
RefPtr<ChannelEndpoint> bootstrap_channel_endpoint;
auto dispatcher = MessagePipeDispatcher::CreateRemoteMessagePipe(
&bootstrap_channel_endpoint);
// TODO(vtl): We have to copy or "unscope" various things due to C++11 lambda
// capture limitations.
PlatformHandle raw_platform_handle = platform_handle.release();
io_task_runner_->PostTask([this, channel_id, raw_platform_handle,
bootstrap_channel_endpoint, callback,
callback_thread_task_runner]() mutable {
CreateChannelOnIOThreadHelper(channel_id,
ScopedPlatformHandle(raw_platform_handle),
std::move(bootstrap_channel_endpoint));
if (callback_thread_task_runner)
callback_thread_task_runner->PostTask(std::move(callback));
else
callback();
});
return dispatcher;
}
RefPtr<Channel> ChannelManager::GetChannel(ChannelId channel_id) const {
MutexLocker locker(&mutex_);
auto it = channels_.find(channel_id);
DCHECK(it != channels_.end());
return it->second;
}
void ChannelManager::WillShutdownChannel(ChannelId channel_id) {
GetChannel(channel_id)->WillShutdownSoon();
}
void ChannelManager::ShutdownChannelOnIOThread(ChannelId channel_id) {
RefPtr<Channel> channel;
{
MutexLocker locker(&mutex_);
auto it = channels_.find(channel_id);
DCHECK(it != channels_.end());
channel = std::move(it->second);
channels_.erase(it);
}
channel->Shutdown();
}
void ChannelManager::ShutdownChannel(
ChannelId channel_id,
std::function<void()>&& callback,
RefPtr<TaskRunner>&& callback_thread_task_runner) {
RefPtr<Channel> channel;
{
MutexLocker locker(&mutex_);
auto it = channels_.find(channel_id);
DCHECK(it != channels_.end());
channel.swap(it->second);
channels_.erase(it);
}
channel->WillShutdownSoon();
// TODO(vtl): With C++14 lambda captures, we'll be able to move stuff instead
// of copying.
io_task_runner_->PostTask(
[channel, callback, callback_thread_task_runner]() mutable {
channel->Shutdown();
if (callback_thread_task_runner)
callback_thread_task_runner->PostTask(std::move(callback));
else
callback();
});
}
RefPtr<Channel> ChannelManager::CreateChannelOnIOThreadHelper(
ChannelId channel_id,
ScopedPlatformHandle platform_handle,
RefPtr<ChannelEndpoint>&& bootstrap_channel_endpoint) {
DCHECK_NE(channel_id, kInvalidChannelId);
DCHECK(platform_handle.is_valid());
// Create and initialize a |Channel|.
auto channel = MakeRefCounted<Channel>(platform_support_);
channel->Init(io_task_runner_.Clone(), io_watcher_,
RawChannel::Create(platform_handle.Pass()));
if (bootstrap_channel_endpoint)
channel->SetBootstrapEndpoint(std::move(bootstrap_channel_endpoint));
{
MutexLocker locker(&mutex_);
CHECK(channels_.find(channel_id) == channels_.end());
channels_[channel_id] = channel;
}
channel->SetChannelManager(this);
return channel;
}
} // namespace system
} // namespace mojo