blob: a1d2d1351f73683cf0338f8b8d1885c8e52dcdfc [file] [log] [blame]
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/message_pipe.h"
#include <memory>
#include <utility>
#include "base/logging.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
#include "mojo/edk/system/handle_transport.h"
#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/local_message_pipe_endpoint.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
#include "mojo/edk/system/message_pipe_endpoint.h"
#include "mojo/edk/system/proxy_message_pipe_endpoint.h"
#include "mojo/edk/util/make_unique.h"
using mojo::platform::ScopedPlatformHandle;
using mojo::util::MakeRefCounted;
using mojo::util::MakeUnique;
using mojo::util::MutexLocker;
using mojo::util::RefPtr;
namespace mojo {
namespace system {
// static
RefPtr<MessagePipe> MessagePipe::CreateLocalLocal()
MOJO_NO_THREAD_SAFETY_ANALYSIS {
RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
return message_pipe;
}
// static
RefPtr<MessagePipe> MessagePipe::CreateLocalProxy(
RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS {
DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
*channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe.Clone(), 1);
message_pipe->endpoints_[1].reset(
new ProxyMessagePipeEndpoint(channel_endpoint->Clone()));
return message_pipe;
}
// static
RefPtr<MessagePipe> MessagePipe::CreateLocalProxyFromExisting(
MessageInTransitQueue* message_queue,
RefPtr<ChannelEndpoint>&& channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS {
DCHECK(message_queue);
RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
message_pipe->endpoints_[0].reset(
new LocalMessagePipeEndpoint(message_queue));
if (channel_endpoint) {
bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
message_pipe->endpoints_[1].reset(
new ProxyMessagePipeEndpoint(std::move(channel_endpoint)));
if (!attached_to_channel)
message_pipe->OnDetachFromChannel(1);
} else {
// This means that the proxy side was already closed; we only need to inform
// the local side of this.
// TODO(vtl): This is safe to do without locking (but perhaps slightly
// dubious), since no other thread has access to |message_pipe| yet.
message_pipe->endpoints_[0]->OnPeerClose();
}
return message_pipe;
}
// static
RefPtr<MessagePipe> MessagePipe::CreateProxyLocal(
RefPtr<ChannelEndpoint>* channel_endpoint) MOJO_NO_THREAD_SAFETY_ANALYSIS {
DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
RefPtr<MessagePipe> message_pipe = AdoptRef(new MessagePipe());
*channel_endpoint = MakeRefCounted<ChannelEndpoint>(message_pipe, 0);
message_pipe->endpoints_[0].reset(
new ProxyMessagePipeEndpoint(channel_endpoint->Clone()));
message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
return message_pipe;
}
// static
unsigned MessagePipe::GetPeerPort(unsigned port) {
DCHECK(port == 0 || port == 1);
return port ^ 1;
}
// static
bool MessagePipe::Deserialize(Channel* channel,
const void* source,
size_t size,
RefPtr<MessagePipe>* message_pipe,
unsigned* port) {
DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
if (size != channel->GetSerializedEndpointSize()) {
LOG(ERROR) << "Invalid serialized message pipe";
return false;
}
RefPtr<IncomingEndpoint> incoming_endpoint =
channel->DeserializeEndpoint(source);
if (!incoming_endpoint)
return false;
*message_pipe = incoming_endpoint->ConvertToMessagePipe();
DCHECK(*message_pipe);
*port = 0;
return true;
}
MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
DCHECK(endpoints_[port]);
return endpoints_[port]->GetType();
}
void MessagePipe::CancelAllAwakables(unsigned port) {
MutexLocker locker(&mutex_);
CancelAllAwakablesNoLock(port);
}
void MessagePipe::Close(unsigned port) {
DCHECK(port == 0 || port == 1);
unsigned peer_port = GetPeerPort(port);
MutexLocker locker(&mutex_);
// The endpoint's |OnPeerClose()| may have been called first and returned
// false, which would have resulted in its destruction.
if (!endpoints_[port])
return;
endpoints_[port]->Close();
if (endpoints_[peer_port]) {
if (!endpoints_[peer_port]->OnPeerClose())
endpoints_[peer_port].reset();
}
endpoints_[port].reset();
}
// TODO(vtl): Handle flags.
MojoResult MessagePipe::WriteMessage(unsigned port,
UserPointer<const void> bytes,
uint32_t num_bytes,
std::vector<HandleTransport>* transports,
MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
return EnqueueMessageNoLock(
GetPeerPort(port),
MakeUnique<MessageInTransit>(
MessageInTransit::Type::ENDPOINT_CLIENT,
MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, num_bytes, bytes),
transports);
}
MojoResult MessagePipe::ReadMessage(unsigned port,
UserPointer<void> bytes,
UserPointer<uint32_t> num_bytes,
HandleVector* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags) {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
DCHECK(endpoints_[port]);
return endpoints_[port]->ReadMessage(bytes, num_bytes, handles, num_handles,
flags);
}
HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
DCHECK(endpoints_[port]);
return endpoints_[port]->GetHandleSignalsState();
}
MojoResult MessagePipe::AddAwakable(unsigned port,
Awakable* awakable,
MojoHandleSignals signals,
uint32_t context,
HandleSignalsState* signals_state) {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
DCHECK(endpoints_[port]);
return endpoints_[port]->AddAwakable(awakable, signals, context,
signals_state);
}
void MessagePipe::RemoveAwakable(unsigned port,
Awakable* awakable,
HandleSignalsState* signals_state) {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
DCHECK(endpoints_[port]);
endpoints_[port]->RemoveAwakable(awakable, signals_state);
}
void MessagePipe::StartSerialize(unsigned /*port*/,
Channel* channel,
size_t* max_size,
size_t* max_platform_handles) {
*max_size = channel->GetSerializedEndpointSize();
*max_platform_handles = 0;
}
bool MessagePipe::EndSerialize(
unsigned port,
Channel* channel,
void* destination,
size_t* actual_size,
std::vector<ScopedPlatformHandle>* /*platform_handles*/) {
DCHECK(port == 0 || port == 1);
MutexLocker locker(&mutex_);
DCHECK(endpoints_[port]);
// The port being serialized must be local.
DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
unsigned peer_port = GetPeerPort(port);
MessageInTransitQueue* message_queue =
static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
->message_queue();
// The replacement for |endpoints_[port]|, if any.
MessagePipeEndpoint* replacement_endpoint = nullptr;
// The three cases below correspond to the ones described above
// |Channel::SerializeEndpoint...()| (in channel.h).
if (!endpoints_[peer_port]) {
// Case 1: (known-)closed peer port. There's no reason for us to continue to
// exist afterwards.
channel->SerializeEndpointWithClosedPeer(destination, message_queue);
} else if (endpoints_[peer_port]->GetType() ==
MessagePipeEndpoint::kTypeLocal) {
// Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint|
// with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that
// the |Channel| returns to us.
RefPtr<ChannelEndpoint> channel_endpoint =
channel->SerializeEndpointWithLocalPeer(
destination, message_queue, RefPtr<ChannelEndpointClient>(this),
port);
replacement_endpoint =
new ProxyMessagePipeEndpoint(std::move(channel_endpoint));
} else {
// Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and
// pass it to the |Channel|. There's no reason for us to continue to exist
// afterwards.
DCHECK_EQ(endpoints_[peer_port]->GetType(),
MessagePipeEndpoint::kTypeProxy);
ProxyMessagePipeEndpoint* peer_endpoint =
static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
RefPtr<ChannelEndpoint> peer_channel_endpoint =
peer_endpoint->ReleaseChannelEndpoint();
channel->SerializeEndpointWithRemotePeer(destination, message_queue,
std::move(peer_channel_endpoint));
// No need to call |Close()| after |ReleaseChannelEndpoint()|.
endpoints_[peer_port].reset();
}
endpoints_[port]->Close();
endpoints_[port].reset(replacement_endpoint);
*actual_size = channel->GetSerializedEndpointSize();
return true;
}
void MessagePipe::CancelAllAwakablesNoLock(unsigned port) {
DCHECK(port == 0 || port == 1);
mutex_.AssertHeld();
DCHECK(endpoints_[port]);
endpoints_[port]->CancelAllAwakables();
}
bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
MutexLocker locker(&mutex_);
if (!endpoints_[port]) {
// This will happen only on the rare occasion that the call to
// |OnReadMessage()| is racing with us calling
// |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
// and the |ChannelEndpoint| can retry (calling the new client's
// |OnReadMessage()|).
return false;
}
// This is called when the |ChannelEndpoint| for the
// |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
// We need to pass this message on to its peer port (typically a
// |LocalMessagePipeEndpoint|).
MojoResult result = EnqueueMessageNoLock(
GetPeerPort(port), std::unique_ptr<MessageInTransit>(message), nullptr);
DLOG_IF(WARNING, result != MOJO_RESULT_OK)
<< "EnqueueMessageNoLock() failed (result = " << result << ")";
return true;
}
void MessagePipe::OnDetachFromChannel(unsigned port) {
Close(port);
}
MessagePipe::MessagePipe() {}
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
// |Close()|. Thus these should already be null.
DCHECK(!endpoints_[0]);
DCHECK(!endpoints_[1]);
}
MojoResult MessagePipe::EnqueueMessageNoLock(
unsigned port,
std::unique_ptr<MessageInTransit> message,
std::vector<HandleTransport>* transports) {
DCHECK(port == 0 || port == 1);
DCHECK(message);
DCHECK_EQ(message->type(), MessageInTransit::Type::ENDPOINT_CLIENT);
DCHECK(endpoints_[GetPeerPort(port)]);
// The destination port need not be open, unlike the source port.
if (!endpoints_[port])
return MOJO_RESULT_FAILED_PRECONDITION;
if (transports) {
MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
if (result != MOJO_RESULT_OK)
return result;
}
// The endpoint's |EnqueueMessage()| may not report failure.
endpoints_[port]->EnqueueMessage(std::move(message));
return MOJO_RESULT_OK;
}
MojoResult MessagePipe::AttachTransportsNoLock(
unsigned port,
MessageInTransit* message,
std::vector<HandleTransport>* transports) {
DCHECK(!message->has_handles());
// Clone the handles and attach them to the message. (This must be done as a
// separate loop, since we want to leave the handles alone on failure.)
std::unique_ptr<HandleVector> handles(new HandleVector());
handles->reserve(transports->size());
for (size_t i = 0; i < transports->size(); i++) {
if ((*transports)[i].is_valid()) {
handles->push_back(
transports->at(i).CreateEquivalentHandleAndClose(this, port));
} else {
LOG(WARNING) << "Enqueueing null dispatcher";
handles->push_back(Handle());
}
}
message->SetHandles(std::move(handles));
return MOJO_RESULT_OK;
}
} // namespace system
} // namespace mojo