// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/edk/system/channel_manager.h"

#include <utility>

#include "mojo/edk/platform/platform_handle.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"

using mojo::platform::PlatformHandle;
using mojo::platform::PlatformHandleWatcher;
using mojo::platform::ScopedPlatformHandle;
using mojo::platform::TaskRunner;
using mojo::util::MakeRefCounted;
using mojo::util::MutexLocker;
using mojo::util::RefPtr;

namespace mojo {
namespace system {

ChannelManager::ChannelManager(embedder::PlatformSupport* platform_support,
                               RefPtr<TaskRunner>&& io_task_runner,
                               PlatformHandleWatcher* io_watcher,
                               ConnectionManager* connection_manager)
    : platform_support_(platform_support),
      io_task_runner_(std::move(io_task_runner)),
      io_watcher_(io_watcher),
      connection_manager_(connection_manager) {
  DCHECK(platform_support_);
  DCHECK(io_task_runner_);
  DCHECK(io_watcher_);
  // (|connection_manager_| may be null.)
}

ChannelManager::~ChannelManager() {
  // |Shutdown()| must be called before destruction and have been completed.
  // TODO(vtl): This doesn't verify the above condition very strictly at all
  // (e.g., we may never have had any channels, or we may have manually shut all
  // the channels down).
  DCHECK(channels_.empty());
}

void ChannelManager::ShutdownOnIOThread() {
  // Taking this lock really shouldn't be necessary, but we do it for
  // consistency.
  ChannelIdToChannelMap channels;
  {
    MutexLocker locker(&mutex_);
    channels.swap(channels_);
  }

  for (auto& channel : channels)
    channel.second->Shutdown();
}

void ChannelManager::Shutdown(
    std::function<void()>&& callback,
    RefPtr<TaskRunner>&& callback_thread_task_runner) {
  // TODO(vtl): With C++14 lambda captures, we'll be able to move |callback| and
  // |callback_thread_task_runner| instead of copying them.
  io_task_runner_->PostTask(
      [this, callback, callback_thread_task_runner]() mutable {
        ShutdownOnIOThread();
        if (callback_thread_task_runner)
          callback_thread_task_runner->PostTask(std::move(callback));
        else
          callback();
      });
}

RefPtr<MessagePipeDispatcher> ChannelManager::CreateChannelOnIOThread(
    ChannelId channel_id,
    ScopedPlatformHandle platform_handle) {
  RefPtr<ChannelEndpoint> bootstrap_channel_endpoint;
  auto dispatcher = MessagePipeDispatcher::CreateRemoteMessagePipe(
      &bootstrap_channel_endpoint);
  CreateChannelOnIOThreadHelper(channel_id, platform_handle.Pass(),
                                std::move(bootstrap_channel_endpoint));
  return dispatcher;
}

RefPtr<Channel> ChannelManager::CreateChannelWithoutBootstrapOnIOThread(
    ChannelId channel_id,
    ScopedPlatformHandle platform_handle) {
  return CreateChannelOnIOThreadHelper(channel_id, platform_handle.Pass(),
                                       nullptr);
}

RefPtr<MessagePipeDispatcher> ChannelManager::CreateChannel(
    ChannelId channel_id,
    ScopedPlatformHandle platform_handle,
    std::function<void()>&& callback,
    RefPtr<TaskRunner>&& callback_thread_task_runner) {
  DCHECK(callback);
  // (|callback_thread_task_runner| may be null.)

  RefPtr<ChannelEndpoint> bootstrap_channel_endpoint;
  auto dispatcher = MessagePipeDispatcher::CreateRemoteMessagePipe(
      &bootstrap_channel_endpoint);
  // TODO(vtl): We have to copy or "unscope" various things due to C++11 lambda
  // capture limitations.
  PlatformHandle raw_platform_handle = platform_handle.release();
  io_task_runner_->PostTask([this, channel_id, raw_platform_handle,
                             bootstrap_channel_endpoint, callback,
                             callback_thread_task_runner]() mutable {
    CreateChannelOnIOThreadHelper(channel_id,
                                  ScopedPlatformHandle(raw_platform_handle),
                                  std::move(bootstrap_channel_endpoint));
    if (callback_thread_task_runner)
      callback_thread_task_runner->PostTask(std::move(callback));
    else
      callback();
  });
  return dispatcher;
}

RefPtr<Channel> ChannelManager::GetChannel(ChannelId channel_id) const {
  MutexLocker locker(&mutex_);
  auto it = channels_.find(channel_id);
  DCHECK(it != channels_.end());
  return it->second;
}

void ChannelManager::WillShutdownChannel(ChannelId channel_id) {
  GetChannel(channel_id)->WillShutdownSoon();
}

void ChannelManager::ShutdownChannelOnIOThread(ChannelId channel_id) {
  RefPtr<Channel> channel;
  {
    MutexLocker locker(&mutex_);
    auto it = channels_.find(channel_id);
    DCHECK(it != channels_.end());
    channel = std::move(it->second);
    channels_.erase(it);
  }
  channel->Shutdown();
}

void ChannelManager::ShutdownChannel(
    ChannelId channel_id,
    std::function<void()>&& callback,
    RefPtr<TaskRunner>&& callback_thread_task_runner) {
  RefPtr<Channel> channel;
  {
    MutexLocker locker(&mutex_);
    auto it = channels_.find(channel_id);
    DCHECK(it != channels_.end());
    channel.swap(it->second);
    channels_.erase(it);
  }
  channel->WillShutdownSoon();
  // TODO(vtl): With C++14 lambda captures, we'll be able to move stuff instead
  // of copying.
  io_task_runner_->PostTask(
      [channel, callback, callback_thread_task_runner]() mutable {
        channel->Shutdown();
        if (callback_thread_task_runner)
          callback_thread_task_runner->PostTask(std::move(callback));
        else
          callback();
      });
}

RefPtr<Channel> ChannelManager::CreateChannelOnIOThreadHelper(
    ChannelId channel_id,
    ScopedPlatformHandle platform_handle,
    RefPtr<ChannelEndpoint>&& bootstrap_channel_endpoint) {
  DCHECK_NE(channel_id, kInvalidChannelId);
  DCHECK(platform_handle.is_valid());

  // Create and initialize a |Channel|.
  auto channel = MakeRefCounted<Channel>(platform_support_);
  channel->Init(io_task_runner_.Clone(), io_watcher_,
                RawChannel::Create(platform_handle.Pass()));
  if (bootstrap_channel_endpoint)
    channel->SetBootstrapEndpoint(std::move(bootstrap_channel_endpoint));

  {
    MutexLocker locker(&mutex_);
    CHECK(channels_.find(channel_id) == channels_.end());
    channels_[channel_id] = channel;
  }
  channel->SetChannelManager(this);
  return channel;
}

}  // namespace system
}  // namespace mojo
