blob: 40cb18417b4a54862517649164c88c38bacc573b [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/channel_endpoint.h"
#include "base/logging.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/message_pipe.h"
#include "mojo/edk/system/transport_data.h"
namespace mojo {
namespace system {
ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port)
: state_(STATE_NORMAL),
message_pipe_(message_pipe),
port_(port),
channel_(),
local_id_(MessageInTransit::kInvalidEndpointId),
remote_id_(MessageInTransit::kInvalidEndpointId) {
DCHECK(message_pipe_.get());
DCHECK(port_ == 0 || port_ == 1);
}
void ChannelEndpoint::TakeMessages(MessageInTransitQueue* message_queue) {
DCHECK(paused_message_queue_.IsEmpty());
paused_message_queue_.Swap(message_queue);
}
bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
base::AutoLock locker(lock_);
if (!channel_ || remote_id_ == MessageInTransit::kInvalidEndpointId) {
// We may reach here if we haven't been attached or run yet.
// TODO(vtl): We may also reach here if the channel is shut down early for
// some reason (with live message pipes on it). We can't check |state_| yet,
// until it's protected under lock, but in this case we should return false
// (and not enqueue any messages).
paused_message_queue_.AddMessage(message.Pass());
return true;
}
// TODO(vtl): Currently, this only works in the "running" case.
DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
return WriteMessageNoLock(message.Pass());
}
void ChannelEndpoint::DetachFromMessagePipe() {
// TODO(vtl): Once |message_pipe_| is under |lock_|, we should null it out
// here. For now, get the channel to do so for us.
scoped_refptr<Channel> channel;
{
base::AutoLock locker(lock_);
DCHECK(message_pipe_.get());
message_pipe_ = nullptr;
if (!channel_)
return;
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
// TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
// here as well.
channel = channel_;
}
// Don't call this under |lock_|, since it'll call us back.
// TODO(vtl): This seems pretty suboptimal.
channel->DetachMessagePipeEndpoint(local_id_, remote_id_);
}
void ChannelEndpoint::AttachToChannel(Channel* channel,
MessageInTransit::EndpointId local_id) {
DCHECK(channel);
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_);
DCHECK(!channel_);
DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
channel_ = channel;
local_id_ = local_id;
}
void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) {
DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_);
DCHECK(channel_);
DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
remote_id_ = remote_id;
while (!paused_message_queue_.IsEmpty()) {
LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
<< "Failed to write enqueue message to channel";
}
}
bool ChannelEndpoint::OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
scoped_refptr<MessagePipe> message_pipe;
unsigned port;
{
base::AutoLock locker(lock_);
DCHECK(channel_);
if (!message_pipe_.get()) {
// This isn't a failure per se. (It just means that, e.g., the other end
// of the message point closed first.)
return true;
}
if (message_view.transport_data_buffer_size() > 0) {
DCHECK(message_view.transport_data_buffer());
message->SetDispatchers(TransportData::DeserializeDispatchers(
message_view.transport_data_buffer(),
message_view.transport_data_buffer_size(),
platform_handles.Pass(),
channel_));
}
// Take a ref, and call |EnqueueMessage()| outside the lock.
message_pipe = message_pipe_;
port = port_;
}
MojoResult result = message_pipe->EnqueueMessage(
MessagePipe::GetPeerPort(port), message.Pass());
return (result == MOJO_RESULT_OK);
}
void ChannelEndpoint::OnDisconnect() {
scoped_refptr<MessagePipe> message_pipe;
unsigned port;
{
base::AutoLock locker(lock_);
if (!message_pipe_.get())
return;
// Take a ref, and call |Close()| outside the lock.
message_pipe = message_pipe_;
port = port_;
}
message_pipe->Close(port);
}
void ChannelEndpoint::DetachFromChannel() {
base::AutoLock locker(lock_);
DCHECK(channel_);
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
// TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
// here as well.
channel_ = nullptr;
local_id_ = MessageInTransit::kInvalidEndpointId;
remote_id_ = MessageInTransit::kInvalidEndpointId;
}
ChannelEndpoint::~ChannelEndpoint() {
DCHECK(!message_pipe_.get());
DCHECK(!channel_);
DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
}
bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
lock_.AssertAcquired();
DCHECK(channel_);
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
message->SerializeAndCloseDispatchers(channel_);
message->set_source_id(local_id_);
message->set_destination_id(remote_id_);
return channel_->WriteMessage(message.Pass());
}
} // namespace system
} // namespace mojo