// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/edk/system/channel.h"

#include <algorithm>
#include <utility>

#include "base/bind.h"
#include "base/logging.h"
#include "base/strings/stringprintf.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/system/endpoint_relayer.h"
#include "mojo/edk/system/transport_data.h"

using mojo::util::MakeRefCounted;
using mojo::util::MutexLocker;
using mojo::util::RefPtr;

namespace mojo {
namespace system {

namespace {

struct SerializedEndpoint {
  // This is the endpoint ID on the receiving side, and should be a "remote ID".
  // (The receiving side should already have had an endpoint attached and been
  // run via the |Channel|s. This endpoint will have both IDs assigned, so this
  // ID is only needed to associate that endpoint with a particular dispatcher.)
  ChannelEndpointId receiver_endpoint_id;
};

}  // namespace

void Channel::Init(std::unique_ptr<RawChannel> raw_channel) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(raw_channel);

  // No need to take |mutex_|, since this must be called before this object
  // becomes thread-safe.
  DCHECK(!is_running_);
  raw_channel_ = std::move(raw_channel);
  raw_channel_->Init(this);
  is_running_ = true;
}

void Channel::SetChannelManager(ChannelManager* channel_manager) {
  DCHECK(channel_manager);

  MutexLocker locker(&mutex_);
  DCHECK(!is_shutting_down_);
  DCHECK(!channel_manager_);
  channel_manager_ = channel_manager;
}

void Channel::Shutdown() {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)

  IdToEndpointMap to_destroy;
  {
    MutexLocker locker(&mutex_);
    if (!is_running_)
      return;

    // Note: Don't reset |raw_channel_|, in case we're being called from within
    // |OnReadMessage()| or |OnError()|.
    raw_channel_->Shutdown();
    is_running_ = false;

    // We need to deal with it outside the lock.
    std::swap(to_destroy, local_id_to_endpoint_map_);
  }

  size_t num_live = 0;
  size_t num_zombies = 0;
  for (IdToEndpointMap::iterator it = to_destroy.begin();
       it != to_destroy.end(); ++it) {
    if (it->second) {
      num_live++;
      it->second->DetachFromChannel();
    } else {
      num_zombies++;
    }
  }
  DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
                                       << " live endpoints and " << num_zombies
                                       << " zombies";
}

void Channel::WillShutdownSoon() {
  MutexLocker locker(&mutex_);
  is_shutting_down_ = true;
  channel_manager_ = nullptr;
}

void Channel::SetBootstrapEndpoint(RefPtr<ChannelEndpoint>&& endpoint) {
  // Used for both local and remote IDs.
  ChannelEndpointId bootstrap_id = ChannelEndpointId::GetBootstrap();
  SetBootstrapEndpointWithIds(std::move(endpoint), bootstrap_id, bootstrap_id);
}

void Channel::SetBootstrapEndpointWithIds(RefPtr<ChannelEndpoint>&& endpoint,
                                          ChannelEndpointId local_id,
                                          ChannelEndpointId remote_id) {
  DCHECK(endpoint);

  {
    MutexLocker locker(&mutex_);

    DLOG_IF(WARNING, is_shutting_down_)
        << "SetBootstrapEndpoint() while shutting down";

    // There must not be an endpoint with that ID already.
    DCHECK(local_id_to_endpoint_map_.find(local_id) ==
           local_id_to_endpoint_map_.end());

    local_id_to_endpoint_map_[local_id] = endpoint;
  }

  endpoint->AttachAndRun(this, local_id, remote_id);
}

bool Channel::WriteMessage(std::unique_ptr<MessageInTransit> message) {
  MutexLocker locker(&mutex_);
  if (!is_running_) {
    // TODO(vtl): I think this is probably not an error condition, but I should
    // think about it (and the shutdown sequence) more carefully.
    LOG(WARNING) << "WriteMessage() after shutdown";
    return false;
  }

  DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
  return raw_channel_->WriteMessage(std::move(message));
}

bool Channel::IsWriteBufferEmpty() {
  MutexLocker locker(&mutex_);
  if (!is_running_)
    return true;
  return raw_channel_->IsWriteBufferEmpty();
}

void Channel::DetachEndpoint(ChannelEndpoint* endpoint,
                             ChannelEndpointId local_id,
                             ChannelEndpointId remote_id) {
  // Keep a reference to |this| to prevent this |Channel| from being deleted
  // while this function is running. Without this, if |Shutdown()| is started on
  // the I/O thread immediately after |mutex_| is released below and finishes
  // before |SendControlMessage()| gets to run, |this| could be deleted while
  // this function is still running.
  RefPtr<Channel> self(this);

  if (!DetachEndpointInternal(endpoint, local_id, remote_id))
    return;

  // Note: Send the remove message outside the lock.
  if (!SendControlMessage(MessageInTransit::Subtype::CHANNEL_REMOVE_ENDPOINT,
                          local_id, remote_id, 0, nullptr)) {
    HandleLocalError(
        base::StringPrintf("Failed to send message to remove remote endpoint "
                           "(local ID %u, remote ID %u)",
                           static_cast<unsigned>(local_id.value()),
                           static_cast<unsigned>(remote_id.value()))
            .c_str());
  }
}

size_t Channel::GetSerializedEndpointSize() const {
  return sizeof(SerializedEndpoint);
}

void Channel::SerializeEndpointWithClosedPeer(
    void* destination,
    MessageInTransitQueue* message_queue) {
  // We can actually just pass no client to |SerializeEndpointWithLocalPeer()|.
  SerializeEndpointWithLocalPeer(destination, message_queue, nullptr, 0);
}

RefPtr<ChannelEndpoint> Channel::SerializeEndpointWithLocalPeer(
    void* destination,
    MessageInTransitQueue* message_queue,
    RefPtr<ChannelEndpointClient>&& endpoint_client,
    unsigned endpoint_client_port) {
  DCHECK(destination);
  // Allow |endpoint_client| to be null, for use by
  // |SerializeEndpointWithClosedPeer()|.

  auto endpoint = MakeRefCounted<ChannelEndpoint>(
      std::move(endpoint_client), endpoint_client_port, message_queue);

  SerializedEndpoint* s = static_cast<SerializedEndpoint*>(destination);
  s->receiver_endpoint_id = AttachAndRunEndpoint(endpoint.Clone());
  DVLOG(2) << "Serializing endpoint with local or closed peer (remote ID = "
           << s->receiver_endpoint_id << ")";

  return endpoint;
}

void Channel::SerializeEndpointWithRemotePeer(
    void* destination,
    MessageInTransitQueue* message_queue,
    RefPtr<ChannelEndpoint>&& peer_endpoint) {
  DCHECK(destination);
  DCHECK(peer_endpoint);

  DVLOG(1) << "Direct message pipe passing across multiple channels not yet "
              "implemented; will proxy";
  // Create and set up an |EndpointRelayer| to proxy.
  // TODO(vtl): If we were to own/track the relayer directly (rather than owning
  // it via its |ChannelEndpoint|s), then we might be able to make
  // |ChannelEndpoint|'s |client_| pointer a raw pointer.
  auto relayer = MakeRefCounted<EndpointRelayer>();
  auto endpoint =
      MakeRefCounted<ChannelEndpoint>(relayer.Clone(), 0, message_queue);
  relayer->Init(endpoint.Clone(), peer_endpoint.Clone());
  peer_endpoint->ReplaceClient(std::move(relayer), 1);

  SerializedEndpoint* s = static_cast<SerializedEndpoint*>(destination);
  s->receiver_endpoint_id = AttachAndRunEndpoint(std::move(endpoint));
  DVLOG(2) << "Serializing endpoint with remote peer (remote ID = "
           << s->receiver_endpoint_id << ")";
}

RefPtr<IncomingEndpoint> Channel::DeserializeEndpoint(const void* source) {
  const SerializedEndpoint* s = static_cast<const SerializedEndpoint*>(source);
  ChannelEndpointId local_id = s->receiver_endpoint_id;
  // No need to check the validity of |local_id| -- if it's not valid, it simply
  // won't be in |incoming_endpoints_|.
  DVLOG_IF(2, !local_id.is_valid() || !local_id.is_remote())
      << "Attempt to get incoming endpoint for invalid ID " << local_id;

  MutexLocker locker(&mutex_);

  auto it = incoming_endpoints_.find(local_id);
  if (it == incoming_endpoints_.end()) {
    LOG(ERROR) << "Failed to deserialize endpoint (ID = " << local_id << ")";
    return nullptr;
  }

  DVLOG(2) << "Deserializing endpoint (new local ID = " << local_id << ")";

  RefPtr<IncomingEndpoint> rv = std::move(it->second);
  incoming_endpoints_.erase(it);
  return rv;
}

size_t Channel::GetSerializedPlatformHandleSize() const {
  // TODO(vtl): Having to lock |mutex_| here is a bit unfortunate. Maybe we
  // should get the size in |Init()| and cache it?
  MutexLocker locker(&mutex_);
  return raw_channel_->GetSerializedPlatformHandleSize();
}

Channel::Channel(embedder::PlatformSupport* platform_support)
    : platform_support_(platform_support),
      is_running_(false),
      is_shutting_down_(false),
      channel_manager_(nullptr) {}

Channel::~Channel() {
  // The channel should have been shut down first.
  DCHECK(!is_running_);
}

bool Channel::DetachEndpointInternal(ChannelEndpoint* endpoint,
                                     ChannelEndpointId local_id,
                                     ChannelEndpointId remote_id) {
  DCHECK(endpoint);
  DCHECK(local_id.is_valid());

  if (!remote_id.is_valid())
    return false;  // Nothing to do.

  MutexLocker locker(&mutex_);
  if (!is_running_)
    return false;

  IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
  // We detach immediately if we receive a remove message, so it's possible
  // that the local ID is no longer in |local_id_to_endpoint_map_|, or even
  // that it's since been reused for another endpoint. In both cases, there's
  // nothing more to do.
  if (it == local_id_to_endpoint_map_.end() || it->second.get() != endpoint)
    return false;

  DCHECK(it->second);
  it->second = nullptr;
  return true;
}

void Channel::OnReadMessage(
    const MessageInTransit::View& message_view,
    embedder::ScopedPlatformHandleVectorPtr platform_handles) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)

  switch (message_view.type()) {
    case MessageInTransit::Type::ENDPOINT_CLIENT:
    case MessageInTransit::Type::ENDPOINT:
      OnReadMessageForEndpoint(message_view, std::move(platform_handles));
      break;
    case MessageInTransit::Type::CHANNEL:
      OnReadMessageForChannel(message_view, std::move(platform_handles));
      break;
    default:
      HandleRemoteError(
          base::StringPrintf("Received message of invalid type %u",
                             static_cast<unsigned>(message_view.type()))
              .c_str());
      break;
  }
}

void Channel::OnError(Error error) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)

  switch (error) {
    case ERROR_READ_SHUTDOWN:
      // The other side was cleanly closed, so this isn't actually an error.
      DVLOG(1) << "RawChannel read error (shutdown)";
      break;
    case ERROR_READ_BROKEN: {
      MutexLocker locker(&mutex_);
      LOG_IF(ERROR, !is_shutting_down_)
          << "RawChannel read error (connection broken)";
      break;
    }
    case ERROR_READ_BAD_MESSAGE:
      // Receiving a bad message means either a bug, data corruption, or
      // malicious attack (probably due to some other bug).
      LOG(ERROR) << "RawChannel read error (received bad message)";
      break;
    case ERROR_READ_UNKNOWN:
      LOG(ERROR) << "RawChannel read error (unknown)";
      break;
    case ERROR_WRITE:
      // Write errors are slightly notable: they probably shouldn't happen under
      // normal operation (but maybe the other side crashed).
      LOG(WARNING) << "RawChannel write error";
      break;
  }
  Shutdown();
}

void Channel::OnReadMessageForEndpoint(
    const MessageInTransit::View& message_view,
    embedder::ScopedPlatformHandleVectorPtr platform_handles) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(message_view.type() == MessageInTransit::Type::ENDPOINT_CLIENT ||
         message_view.type() == MessageInTransit::Type::ENDPOINT);

  ChannelEndpointId local_id = message_view.destination_id();
  if (!local_id.is_valid()) {
    HandleRemoteError("Received message with no destination ID");
    return;
  }

  RefPtr<ChannelEndpoint> endpoint;
  {
    MutexLocker locker(&mutex_);

    // Since we own |raw_channel_|, and this method and |Shutdown()| should only
    // be called from the creation thread, |raw_channel_| should never be null
    // here.
    DCHECK(is_running_);

    IdToEndpointMap::const_iterator it =
        local_id_to_endpoint_map_.find(local_id);
    if (it != local_id_to_endpoint_map_.end()) {
      // Ignore messages for zombie endpoints (not an error).
      if (!it->second) {
        DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID "
                    "= " << local_id
                 << ", remote ID = " << message_view.source_id() << ")";
        return;
      }

      endpoint = it->second;
    }
  }
  if (!endpoint) {
    HandleRemoteError(
        base::StringPrintf(
            "Received a message for nonexistent local destination ID %u",
            static_cast<unsigned>(local_id.value())).c_str());
    // This is strongly indicative of some problem. However, it's not a fatal
    // error, since it may indicate a buggy (or hostile) remote process. Don't
    // die even for Debug builds, since handling this properly needs to be
    // tested (TODO(vtl)).
    DLOG(ERROR) << "This should not happen under normal operation.";
    return;
  }

  std::unique_ptr<MessageInTransit> message(new MessageInTransit(message_view));
  if (message_view.transport_data_buffer_size() > 0) {
    DCHECK(message_view.transport_data_buffer());
    message->SetDispatchers(TransportData::DeserializeDispatchers(
        message_view.transport_data_buffer(),
        message_view.transport_data_buffer_size(), std::move(platform_handles),
        this));
  }

  endpoint->OnReadMessage(std::move(message));
}

void Channel::OnReadMessageForChannel(
    const MessageInTransit::View& message_view,
    embedder::ScopedPlatformHandleVectorPtr platform_handles) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK_EQ(message_view.type(), MessageInTransit::Type::CHANNEL);

  // Currently, no channel messages take platform handles.
  if (platform_handles) {
    HandleRemoteError(
        "Received invalid channel message (has platform handles)");
    NOTREACHED();
    return;
  }

  switch (message_view.subtype()) {
    case MessageInTransit::Subtype::CHANNEL_ATTACH_AND_RUN_ENDPOINT:
      DVLOG(2) << "Handling channel message to attach and run endpoint (local "
                  "ID " << message_view.destination_id() << ", remote ID "
               << message_view.source_id() << ")";
      if (!OnAttachAndRunEndpoint(message_view.destination_id(),
                                  message_view.source_id())) {
        HandleRemoteError(
            "Received invalid channel message to attach and run endpoint");
      }
      break;
    case MessageInTransit::Subtype::CHANNEL_REMOVE_ENDPOINT:
      DVLOG(2) << "Handling channel message to remove endpoint (local ID "
               << message_view.destination_id() << ", remote ID "
               << message_view.source_id() << ")";
      if (!OnRemoveEndpoint(message_view.destination_id(),
                            message_view.source_id())) {
        HandleRemoteError(
            "Received invalid channel message to remove endpoint");
      }
      break;
    case MessageInTransit::Subtype::CHANNEL_REMOVE_ENDPOINT_ACK:
      DVLOG(2) << "Handling channel message to ack remove endpoint (local ID "
               << message_view.destination_id() << ", remote ID "
               << message_view.source_id() << ")";
      if (!OnRemoveEndpointAck(message_view.destination_id())) {
        HandleRemoteError(
            "Received invalid channel message to ack remove endpoint");
      }
      break;
    default:
      HandleRemoteError("Received invalid channel message");
      NOTREACHED();
      break;
  }
}

bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id,
                                     ChannelEndpointId remote_id) {
  // We should only get this for remotely-created local endpoints, so our local
  // ID should be "remote".
  if (!local_id.is_valid() || !local_id.is_remote()) {
    DVLOG(2) << "Received attach and run endpoint with invalid local ID";
    return false;
  }

  // Conversely, the remote end should be "local".
  if (!remote_id.is_valid() || remote_id.is_remote()) {
    DVLOG(2) << "Received attach and run endpoint with invalid remote ID";
    return false;
  }

  // Create/initialize an |IncomingEndpoint| and thus an endpoint (outside the
  // lock).
  auto incoming_endpoint = MakeRefCounted<IncomingEndpoint>();
  RefPtr<ChannelEndpoint> endpoint = incoming_endpoint->Init();

  bool success = true;
  {
    MutexLocker locker(&mutex_);

    if (local_id_to_endpoint_map_.find(local_id) ==
        local_id_to_endpoint_map_.end()) {
      DCHECK(incoming_endpoints_.find(local_id) == incoming_endpoints_.end());

      // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll
      // avoid some refcount churn.)
      local_id_to_endpoint_map_[local_id] = endpoint;
      incoming_endpoints_[local_id] = incoming_endpoint;
    } else {
      // We need to call |Close()| outside the lock.
      success = false;
    }
  }
  if (!success) {
    DVLOG(2) << "Received attach and run endpoint for existing local ID";
    incoming_endpoint->Close();
    return false;
  }

  endpoint->AttachAndRun(this, local_id, remote_id);
  return true;
}

bool Channel::OnRemoveEndpoint(ChannelEndpointId local_id,
                               ChannelEndpointId remote_id) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)

  RefPtr<ChannelEndpoint> endpoint;
  {
    MutexLocker locker(&mutex_);

    IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
    if (it == local_id_to_endpoint_map_.end()) {
      DVLOG(2) << "Remove endpoint error: not found";
      return false;
    }

    if (!it->second) {
      // Remove messages "crossed"; we have to wait for the ack.
      return true;
    }

    endpoint = std::move(it->second);
    local_id_to_endpoint_map_.erase(it);
    // Detach and send the remove ack message outside the lock.
  }

  endpoint->DetachFromChannel();

  if (!SendControlMessage(
          MessageInTransit::Subtype::CHANNEL_REMOVE_ENDPOINT_ACK, local_id,
          remote_id, 0, nullptr)) {
    HandleLocalError(base::StringPrintf(
                         "Failed to send message to ack remove remote endpoint "
                         "(local ID %u, remote ID %u)",
                         static_cast<unsigned>(local_id.value()),
                         static_cast<unsigned>(remote_id.value())).c_str());
  }

  return true;
}

bool Channel::OnRemoveEndpointAck(ChannelEndpointId local_id) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
  DCHECK(thread_checker_.IsCreationThreadCurrent());
#endif  // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)

  MutexLocker locker(&mutex_);

  IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
  if (it == local_id_to_endpoint_map_.end()) {
    DVLOG(2) << "Remove endpoint ack error: not found";
    return false;
  }

  if (it->second) {
    DVLOG(2) << "Remove endpoint ack error: wrong state";
    return false;
  }

  local_id_to_endpoint_map_.erase(it);
  return true;
}

void Channel::HandleRemoteError(const char* error_message) {
  // TODO(vtl): Is this how we really want to handle this? Probably we want to
  // terminate the connection, since it's spewing invalid stuff.
  LOG(WARNING) << error_message;
}

void Channel::HandleLocalError(const char* error_message) {
  // TODO(vtl): Is this how we really want to handle this?
  // Sometimes we'll want to propagate the error back to the message pipe
  // (endpoint), and notify it that the remote is (effectively) closed.
  // Sometimes we'll want to kill the channel (and notify all the endpoints that
  // their remotes are dead.
  LOG(WARNING) << error_message;
}

// Note: |endpoint| being a |RefPtr| makes this function safe, since it keeps
// the endpoint alive even after the lock is released.
ChannelEndpointId Channel::AttachAndRunEndpoint(
    RefPtr<ChannelEndpoint>&& endpoint) {
  DCHECK(endpoint);

  ChannelEndpointId local_id;
  ChannelEndpointId remote_id;
  {
    MutexLocker locker(&mutex_);

    DLOG_IF(WARNING, is_shutting_down_)
        << "AttachAndRunEndpoint() while shutting down";

    do {
      local_id = local_id_generator_.GetNext();
    } while (local_id_to_endpoint_map_.find(local_id) !=
             local_id_to_endpoint_map_.end());

    // TODO(vtl): We also need to check for collisions of remote IDs here.
    remote_id = remote_id_generator_.GetNext();

    local_id_to_endpoint_map_[local_id] = endpoint;
  }

  if (!SendControlMessage(
          MessageInTransit::Subtype::CHANNEL_ATTACH_AND_RUN_ENDPOINT, local_id,
          remote_id, 0, nullptr)) {
    HandleLocalError(base::StringPrintf(
                         "Failed to send message to run remote endpoint (local "
                         "ID %u, remote ID %u)",
                         static_cast<unsigned>(local_id.value()),
                         static_cast<unsigned>(remote_id.value())).c_str());
    // TODO(vtl): Should we continue on to |AttachAndRun()|?
  }

  endpoint->AttachAndRun(this, local_id, remote_id);
  return remote_id;
}

bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
                                 ChannelEndpointId local_id,
                                 ChannelEndpointId remote_id,
                                 uint32_t num_bytes,
                                 const void* bytes) {
  DVLOG(2) << "Sending channel control message: subtype " << subtype
           << ", local ID " << local_id << ", remote ID " << remote_id;
  std::unique_ptr<MessageInTransit> message(new MessageInTransit(
      MessageInTransit::Type::CHANNEL, subtype, num_bytes, bytes));
  message->set_source_id(local_id);
  message->set_destination_id(remote_id);
  return WriteMessage(std::move(message));
}

}  // namespace system
}  // namespace mojo
