blob: 8d88b40abf38c7ff159f836b387f4176df1c3426 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <memory>
#include <utility>
#include <vector>
#include "base/logging.h"
#include "mojo/edk/embedder/simple_platform_support.h"
#include "mojo/edk/platform/platform_handle_utils_posix.h"
#include "mojo/edk/platform/platform_pipe.h"
#include "mojo/edk/platform/platform_shared_buffer.h"
#include "mojo/edk/platform/scoped_platform_handle.h"
#include "mojo/edk/platform/thread_utils.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
#include "mojo/edk/system/handle.h"
#include "mojo/edk/system/handle_transport.h"
#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/message_pipe.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
#include "mojo/edk/system/platform_handle_dispatcher.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/shared_buffer_dispatcher.h"
#include "mojo/edk/system/test/scoped_test_dir.h"
#include "mojo/edk/system/test/test_io_thread.h"
#include "mojo/edk/system/test/timeouts.h"
#include "mojo/edk/system/waiter.h"
#include "mojo/edk/util/ref_ptr.h"
#include "mojo/edk/util/scoped_file.h"
#include "mojo/public/cpp/system/macros.h"
#include "testing/gtest/include/gtest/gtest.h"
using mojo::platform::FILEFromPlatformHandle;
using mojo::platform::PlatformHandleFromFILE;
using mojo::platform::PlatformPipe;
using mojo::platform::PlatformSharedBufferMapping;
using mojo::platform::ScopedPlatformHandle;
using mojo::platform::ThreadSleep;
using mojo::util::MakeRefCounted;
using mojo::util::RefPtr;
namespace mojo {
namespace system {
namespace {
class RemoteMessagePipeTest : public testing::Test {
public:
RemoteMessagePipeTest()
: platform_support_(embedder::CreateSimplePlatformSupport()),
io_thread_(test::TestIOThread::StartMode::AUTO) {}
~RemoteMessagePipeTest() override {}
void SetUp() override {
io_thread_.PostTaskAndWait([this]() { SetUpOnIOThread(); });
}
void TearDown() override {
io_thread_.PostTaskAndWait([this]() { TearDownOnIOThread(); });
}
protected:
// This connects the two given |ChannelEndpoint|s. It assumes/requires that
// this is the bootstrap case (i.e., no other message pipes have ever been
// hosted on the channel).
void BootstrapChannelEndpoints(RefPtr<ChannelEndpoint>&& ep0,
RefPtr<ChannelEndpoint>&& ep1) {
io_thread_.PostTaskAndWait([this, &ep0, &ep1]() {
BootstrapChannelEndpointsOnIOThread(std::move(ep0), std::move(ep1));
});
}
// This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
// that this is the bootstrap case (i.e., no message pipes have ever been
// hosted on the channel). This returns *without* waiting.
void BootstrapChannelEndpointNoWait(unsigned channel_index,
RefPtr<ChannelEndpoint>&& ep) {
// Note: We have to copy |ep| here, since we're not waiting for it.
// TODO(vtl): With C++14 lambda captures, we'll be able to move it.
io_thread_.PostTask([this, channel_index, ep]() mutable {
BootstrapChannelEndpointOnIOThread(channel_index, std::move(ep));
});
}
void RestoreInitialState() {
io_thread_.PostTaskAndWait([this]() { RestoreInitialStateOnIOThread(); });
}
embedder::PlatformSupport* platform_support() {
return platform_support_.get();
}
test::TestIOThread* io_thread() { return &io_thread_; }
// Warning: It's up to the caller to ensure that the returned channel
// is/remains valid.
Channel* channels(size_t i) { return channels_[i].get(); }
private:
void SetUpOnIOThread() {
CHECK(io_thread()->IsCurrentAndRunning());
PlatformPipe channel_pair;
platform_handles_[0] = channel_pair.handle0.Pass();
platform_handles_[1] = channel_pair.handle1.Pass();
}
void TearDownOnIOThread() {
CHECK(io_thread()->IsCurrentAndRunning());
if (channels_[0]) {
channels_[0]->Shutdown();
channels_[0] = nullptr;
}
if (channels_[1]) {
channels_[1]->Shutdown();
channels_[1] = nullptr;
}
}
void CreateAndInitChannel(unsigned channel_index) {
CHECK(io_thread()->IsCurrentAndRunning());
CHECK(channel_index == 0 || channel_index == 1);
CHECK(!channels_[channel_index]);
channels_[channel_index] = MakeRefCounted<Channel>(platform_support_.get());
channels_[channel_index]->Init(
io_thread()->task_runner().Clone(),
io_thread()->platform_handle_watcher(),
RawChannel::Create(platform_handles_[channel_index].Pass()));
}
void BootstrapChannelEndpointsOnIOThread(RefPtr<ChannelEndpoint>&& ep0,
RefPtr<ChannelEndpoint>&& ep1) {
CHECK(io_thread()->IsCurrentAndRunning());
if (!channels_[0])
CreateAndInitChannel(0);
if (!channels_[1])
CreateAndInitChannel(1);
channels_[0]->SetBootstrapEndpoint(std::move(ep0));
channels_[1]->SetBootstrapEndpoint(std::move(ep1));
}
void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
RefPtr<ChannelEndpoint>&& ep) {
CHECK(io_thread()->IsCurrentAndRunning());
CHECK(channel_index == 0 || channel_index == 1);
CreateAndInitChannel(channel_index);
channels_[channel_index]->SetBootstrapEndpoint(std::move(ep));
}
void RestoreInitialStateOnIOThread() {
CHECK(io_thread()->IsCurrentAndRunning());
TearDownOnIOThread();
SetUpOnIOThread();
}
std::unique_ptr<embedder::PlatformSupport> platform_support_;
test::TestIOThread io_thread_;
ScopedPlatformHandle platform_handles_[2];
RefPtr<Channel> channels_[2];
MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
};
TEST_F(RemoteMessagePipeTest, Basic) {
static const char kHello[] = "hello";
static const char kWorld[] = "world!!!1!!!1!";
char buffer[100] = {0};
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// Connect message pipes. MP 0, port 1 will be attached to channel 0 and
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
// Write to MP 0, port 0.
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 1, port 1.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(kHello, buffer);
// Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp0->AddAwakable(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
EXPECT_EQ(
MOJO_RESULT_OK,
mp1->WriteMessage(1, UserPointer<const void>(kWorld), sizeof(kWorld),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(456u, context);
hss = HandleSignalsState();
mp0->RemoveAwakable(0, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_OK,
mp0->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
EXPECT_STREQ(kWorld, buffer);
// Close MP 0, port 0.
mp0->Close(0);
// Try to wait for MP 1, port 1 to become readable. This will eventually fail
// when it realizes that MP 0, port 0 has been closed. (It may also fail
// immediately.)
waiter.Init();
hss = HandleSignalsState();
MojoResult result =
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
if (result == MOJO_RESULT_OK) {
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(789u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
}
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
// And MP 1, port 1.
mp1->Close(1);
}
TEST_F(RemoteMessagePipeTest, PeerClosed) {
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// Connect message pipes. MP 0, port 1 will be attached to channel 0 and
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// Close MP 0, port 0.
mp0->Close(0);
// Try to wait for MP 1, port 1 to be signaled with peer closed.
waiter.Init();
hss = HandleSignalsState();
MojoResult result =
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 101, &hss);
if (result == MOJO_RESULT_OK) {
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(101u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
}
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
// And MP 1, port 1.
mp1->Close(1);
}
TEST_F(RemoteMessagePipeTest, Multiplex) {
static const char kHello[] = "hello";
static const char kWorld[] = "world!!!1!!!1!";
char buffer[100] = {0};
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// Connect message pipes as in the |Basic| test.
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// Now put another message pipe on the channel.
// Do this by creating a message pipe (for the |channels(0)| side) and
// attaching and running it, yielding the remote ID. A message is then sent
// via |ep0| (i.e., sent using |mp0|, port 0) with this remote ID. Upon
// receiving this message, |PassIncomingMessagePipe()| is used to obtain the
// message pipe on the other side.
auto mp2 = MessagePipe::CreateLocalLocal();
ASSERT_TRUE(channels(0));
size_t max_endpoint_info_size;
size_t max_platform_handle_count;
mp2->StartSerialize(1, channels(0), &max_endpoint_info_size,
&max_platform_handle_count);
EXPECT_GT(max_endpoint_info_size, 0u);
ASSERT_EQ(0u, max_platform_handle_count);
std::unique_ptr<char[]> endpoint_info(new char[max_endpoint_info_size]);
size_t endpoint_info_size;
mp2->EndSerialize(1, channels(0), endpoint_info.get(), &endpoint_info_size,
nullptr);
EXPECT_EQ(max_endpoint_info_size, endpoint_info_size);
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
EXPECT_EQ(MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(endpoint_info.get()),
static_cast<uint32_t>(endpoint_info_size),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
EXPECT_EQ(endpoint_info_size, channels(1)->GetSerializedEndpointSize());
std::unique_ptr<char[]> received_endpoint_info(new char[endpoint_info_size]);
buffer_size = static_cast<uint32_t>(endpoint_info_size);
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(received_endpoint_info.get()),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(endpoint_info_size, static_cast<size_t>(buffer_size));
EXPECT_EQ(0, memcmp(received_endpoint_info.get(), endpoint_info.get(),
endpoint_info_size));
// Warning: The local side of mp3 is port 0, not port 1.
RefPtr<IncomingEndpoint> incoming_endpoint =
channels(1)->DeserializeEndpoint(received_endpoint_info.get());
ASSERT_TRUE(incoming_endpoint);
RefPtr<MessagePipe> mp3 = incoming_endpoint->ConvertToMessagePipe();
ASSERT_TRUE(mp3);
// Write: MP 2, port 0 -> MP 3, port 1.
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp3->AddAwakable(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
EXPECT_EQ(
MOJO_RESULT_OK,
mp2->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(789u, context);
hss = HandleSignalsState();
mp3->RemoveAwakable(0, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp0->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp1->ReadMessage(1, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp2->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
// Read from MP 3, port 1.
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_OK,
mp3->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(kHello, buffer);
// Write: MP 0, port 0 -> MP 1, port 1 again.
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kWorld), sizeof(kWorld),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Make sure there's nothing on the other ports.
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp0->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp2->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp3->ReadMessage(0, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
EXPECT_STREQ(kWorld, buffer);
mp0->Close(0);
mp1->Close(1);
mp2->Close(0);
mp3->Close(0);
}
TEST_F(RemoteMessagePipeTest, CloseBeforeAttachAndRun) {
static const char kHello[] = "hello";
char buffer[100] = {0};
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// Connect message pipes. MP 0, port 1 will be attached to channel 0 and
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
// Write to MP 0, port 0.
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
// Close MP 0, port 0 before it's even been attached to the channel and run.
mp0->Close(0);
BootstrapChannelEndpointNoWait(0, std::move(ep0));
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
BootstrapChannelEndpointNoWait(1, std::move(ep1));
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
// Note: MP 1, port 1 should definitely should be readable, but it may or may
// not appear as writable (there's a race, and it may not have noticed that
// the other side was closed yet -- e.g., inserting a sleep here would make it
// much more likely to notice that it's no longer writable).
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
// Read from MP 1, port 1.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(kHello, buffer);
// And MP 1, port 1.
mp1->Close(1);
}
TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
static const char kHello[] = "hello";
char buffer[100] = {0};
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// Connect message pipes. MP 0, port 1 will be attached to channel 0 and
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
// Write to MP 0, port 0.
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
BootstrapChannelEndpointNoWait(0, std::move(ep0));
// Close MP 0, port 0 before channel 1 is even connected.
mp0->Close(0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
BootstrapChannelEndpointNoWait(1, std::move(ep1));
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
// Note: MP 1, port 1 should definitely should be readable, but it may or may
// not appear as writable (there's a race, and it may not have noticed that
// the other side was closed yet -- e.g., inserting a sleep here would make it
// much more likely to notice that it's no longer writable).
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
// Read from MP 1, port 1.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(buffer),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(kHello, buffer);
// And MP 1, port 1.
mp1->Close(1);
}
TEST_F(RemoteMessagePipeTest, HandlePassing) {
static const char kHello[] = "hello";
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// We'll try to pass this dispatcher.
auto dispatcher = MessagePipeDispatcher::Create(
MessagePipeDispatcher::kDefaultCreateOptions);
auto local_mp = MessagePipe::CreateLocalLocal();
dispatcher->Init(local_mp.Clone(), 0);
Handle handle(std::move(dispatcher),
MessagePipeDispatcher::kDefaultHandleRights);
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
// Write to MP 0, port 0.
{
HandleTransport transport(test::HandleTryStartTransport(handle));
EXPECT_TRUE(transport.is_valid());
std::vector<HandleTransport> transports;
transports.push_back(transport);
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
&transports, MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |handle.dispatcher| should have been closed. This is |DCHECK()|ed when
// the |handle.dispatcher| is destroyed.
EXPECT_TRUE(handle.dispatcher->HasOneRef());
handle.reset();
}
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 1, port 1.
char read_buffer[100] = {0};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
HandleVector read_handles;
uint32_t read_num_handles = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_handles,
&read_num_handles, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
EXPECT_EQ(1u, read_handles.size());
EXPECT_EQ(1u, read_num_handles);
ASSERT_TRUE(read_handles[0]);
EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());
EXPECT_EQ(Dispatcher::Type::MESSAGE_PIPE,
read_handles[0].dispatcher->GetType());
EXPECT_EQ(MessagePipeDispatcher::kDefaultHandleRights,
read_handles[0].rights);
dispatcher = RefPtr<MessagePipeDispatcher>(
static_cast<MessagePipeDispatcher*>(read_handles[0].dispatcher.get()));
// Add the waiter now, before it becomes readable to avoid a race.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
dispatcher->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456,
nullptr));
// Write to "local_mp", port 1.
EXPECT_EQ(
MOJO_RESULT_OK,
local_mp->WriteMessage(1, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
// TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
// here. (We don't crash if I sleep and then close.)
// Wait for the dispatcher to become readable.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(456u, context);
hss = HandleSignalsState();
dispatcher->RemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from the dispatcher.
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->ReadMessage(UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), 0,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
// Prepare to wait on "local_mp", port 1.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
local_mp->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789,
nullptr));
// Write to the dispatcher.
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->WriteMessage(
UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(789u, context);
hss = HandleSignalsState();
local_mp->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from "local_mp", port 1.
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
local_mp->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), nullptr,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
// TODO(vtl): Also test that messages queued up before the handle was sent are
// delivered properly.
// Close everything that belongs to us.
mp0->Close(0);
mp1->Close(1);
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
// Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
local_mp->Close(1);
}
TEST_F(RemoteMessagePipeTest, HandlePassingHalfClosed) {
static const char kHello[] = "hello";
static const char kWorld[] = "world!";
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// We'll try to pass this dispatcher.
auto dispatcher = MessagePipeDispatcher::Create(
MessagePipeDispatcher::kDefaultCreateOptions);
auto local_mp = MessagePipe::CreateLocalLocal();
dispatcher->Init(local_mp.Clone(), 0);
Handle handle(std::move(dispatcher),
MessagePipeDispatcher::kDefaultHandleRights);
hss = local_mp->GetHandleSignalsState(0);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Write to the other end (|local_mp|, port 1), and then close it.
EXPECT_EQ(
MOJO_RESULT_OK,
local_mp->WriteMessage(1, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
hss = local_mp->GetHandleSignalsState(0);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Then the second message....
EXPECT_EQ(
MOJO_RESULT_OK,
local_mp->WriteMessage(1, UserPointer<const void>(kWorld), sizeof(kWorld),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
hss = local_mp->GetHandleSignalsState(0);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Then close it.
local_mp->Close(1);
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
// Write to MP 0, port 0.
{
HandleTransport transport(test::HandleTryStartTransport(handle));
EXPECT_TRUE(transport.is_valid());
std::vector<HandleTransport> transports;
transports.push_back(transport);
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
&transports, MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |handle.dispatcher| should have been closed. This is |DCHECK()|ed when
// the |handle.dispatcher| is destroyed.
EXPECT_TRUE(handle.dispatcher->HasOneRef());
handle.reset();
}
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 1, port 1.
char read_buffer[100] = {0};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
HandleVector read_handles;
uint32_t read_num_handles = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_handles,
&read_num_handles, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
EXPECT_EQ(1u, read_handles.size());
EXPECT_EQ(1u, read_num_handles);
ASSERT_TRUE(read_handles[0]);
EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());
EXPECT_EQ(Dispatcher::Type::MESSAGE_PIPE,
read_handles[0].dispatcher->GetType());
EXPECT_EQ(MessagePipeDispatcher::kDefaultHandleRights,
read_handles[0].rights);
dispatcher = RefPtr<MessagePipeDispatcher>(
static_cast<MessagePipeDispatcher*>(read_handles[0].dispatcher.get()));
// |dispatcher| should already be readable and not writable.
hss = dispatcher->GetHandleSignalsState();
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// So read from it.
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->ReadMessage(UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), 0,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
// It should still be readable.
hss = dispatcher->GetHandleSignalsState();
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// So read from it.
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->ReadMessage(UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), 0,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kWorld, read_buffer);
// Now it should no longer be readable.
hss = dispatcher->GetHandleSignalsState();
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
// Close everything that belongs to us.
mp0->Close(0);
mp1->Close(1);
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
}
TEST_F(RemoteMessagePipeTest, SharedBufferPassing) {
static const char kHello[] = "hello";
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// We'll try to pass this dispatcher.
MojoResult result = MOJO_RESULT_INTERNAL;
auto dispatcher = SharedBufferDispatcher::Create(
platform_support(), SharedBufferDispatcher::kDefaultCreateOptions, 100,
&result);
EXPECT_EQ(MOJO_RESULT_OK, result);
ASSERT_TRUE(dispatcher);
Handle handle(std::move(dispatcher),
SharedBufferDispatcher::kDefaultHandleRights);
// Make a mapping.
std::unique_ptr<PlatformSharedBufferMapping> mapping0;
EXPECT_EQ(MOJO_RESULT_OK, handle.dispatcher->MapBuffer(
0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
ASSERT_TRUE(mapping0);
ASSERT_TRUE(mapping0->GetBase());
ASSERT_EQ(100u, mapping0->GetLength());
static_cast<char*>(mapping0->GetBase())[0] = 'A';
static_cast<char*>(mapping0->GetBase())[50] = 'B';
static_cast<char*>(mapping0->GetBase())[99] = 'C';
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
// Write to MP 0, port 0.
{
HandleTransport transport(test::HandleTryStartTransport(handle));
EXPECT_TRUE(transport.is_valid());
std::vector<HandleTransport> transports;
transports.push_back(transport);
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
&transports, MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |handle.dispatcher| should have been closed. This is |DCHECK()|ed when
// the |handle.dispatcher| is destroyed.
EXPECT_TRUE(handle.dispatcher->HasOneRef());
handle.reset();
}
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 1, port 1.
char read_buffer[100] = {0};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
HandleVector read_handles;
uint32_t read_num_handles = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_handles,
&read_num_handles, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
EXPECT_EQ(1u, read_handles.size());
EXPECT_EQ(1u, read_num_handles);
ASSERT_TRUE(read_handles[0]);
EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());
EXPECT_EQ(Dispatcher::Type::SHARED_BUFFER,
read_handles[0].dispatcher->GetType());
EXPECT_EQ(SharedBufferDispatcher::kDefaultHandleRights,
read_handles[0].rights);
dispatcher = RefPtr<SharedBufferDispatcher>(
static_cast<SharedBufferDispatcher*>(read_handles[0].dispatcher.get()));
// Make another mapping.
std::unique_ptr<PlatformSharedBufferMapping> mapping1;
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->MapBuffer(
0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
ASSERT_TRUE(mapping1);
ASSERT_TRUE(mapping1->GetBase());
ASSERT_EQ(100u, mapping1->GetLength());
EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
// Write stuff either way.
static_cast<char*>(mapping1->GetBase())[1] = 'x';
EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
static_cast<char*>(mapping0->GetBase())[2] = 'y';
EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
// Kill the first mapping; the second should still be valid.
mapping0.reset();
EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
// Close everything that belongs to us.
mp0->Close(0);
mp1->Close(1);
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
// The second mapping should still be good.
EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
}
TEST_F(RemoteMessagePipeTest, PlatformHandlePassing) {
test::ScopedTestDir test_dir;
static const char kHello[] = "hello";
static const char kWorld[] = "world";
Waiter waiter;
uint32_t context = 0;
HandleSignalsState hss;
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
util::ScopedFILE fp(test_dir.CreateFile());
EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
// We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
// be passed.
auto dispatcher =
PlatformHandleDispatcher::Create(PlatformHandleFromFILE(std::move(fp)));
Handle handle(std::move(dispatcher),
PlatformHandleDispatcher::kDefaultHandleRights);
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
// Write to MP 0, port 0.
{
HandleTransport transport(test::HandleTryStartTransport(handle));
EXPECT_TRUE(transport.is_valid());
std::vector<HandleTransport> transports;
transports.push_back(transport);
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kWorld), sizeof(kWorld),
&transports, MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |handle.dispatcher| should have been closed. This is |DCHECK()|ed when
// the |handle.dispatcher| is destroyed.
EXPECT_TRUE(handle.dispatcher->HasOneRef());
handle.reset();
}
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 1, port 1.
char read_buffer[100] = {0};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
HandleVector read_handles;
uint32_t read_num_handles = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_handles,
&read_num_handles, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kWorld, read_buffer);
EXPECT_EQ(1u, read_handles.size());
EXPECT_EQ(1u, read_num_handles);
ASSERT_TRUE(read_handles[0]);
EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());
EXPECT_EQ(Dispatcher::Type::PLATFORM_HANDLE,
read_handles[0].dispatcher->GetType());
EXPECT_EQ(PlatformHandleDispatcher::kDefaultHandleRights,
read_handles[0].rights);
dispatcher = RefPtr<PlatformHandleDispatcher>(
static_cast<PlatformHandleDispatcher*>(read_handles[0].dispatcher.get()));
ScopedPlatformHandle h = dispatcher->PassPlatformHandle();
EXPECT_TRUE(h.is_valid());
fp = FILEFromPlatformHandle(h.Pass(), "rb");
EXPECT_FALSE(h.is_valid());
EXPECT_TRUE(fp);
rewind(fp.get());
memset(read_buffer, 0, sizeof(read_buffer));
EXPECT_EQ(sizeof(kHello),
fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
EXPECT_STREQ(kHello, read_buffer);
// Close everything that belongs to us.
mp0->Close(0);
mp1->Close(1);
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
}
// Test racing closes (on each end).
// Note: A flaky failure would almost certainly indicate a problem in the code
// itself (not in the test). Also, any logged warnings/errors would also
// probably be indicative of bugs.
TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
MojoDeadline delay = test::DeadlineFromMilliseconds(5u);
for (unsigned i = 0; i < 256; i++) {
DVLOG(2) << "---------------------------------------- " << i;
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
BootstrapChannelEndpointNoWait(0, std::move(ep0));
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpointNoWait(1, std::move(ep1));
if (i & 1u) {
io_thread()->PostTask([delay]() { ThreadSleep(delay); });
}
if (i & 2u)
ThreadSleep(delay);
mp0->Close(0);
if (i & 4u) {
io_thread()->PostTask([delay]() { ThreadSleep(delay); });
}
if (i & 8u)
ThreadSleep(delay);
mp1->Close(1);
RestoreInitialState();
}
}
// Tests passing an end of a message pipe over a remote message pipe, and then
// passing that end back.
// TODO(vtl): Also test passing a message pipe across two remote message pipes.
TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
static const char kHello[] = "hello";
static const char kWorld[] = "world";
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
RefPtr<ChannelEndpoint> ep0;
auto mp0 = MessagePipe::CreateLocalProxy(&ep0);
RefPtr<ChannelEndpoint> ep1;
auto mp1 = MessagePipe::CreateProxyLocal(&ep1);
BootstrapChannelEndpoints(std::move(ep0), std::move(ep1));
// We'll try to pass this dispatcher.
auto dispatcher = MessagePipeDispatcher::Create(
MessagePipeDispatcher::kDefaultCreateOptions);
auto local_mp = MessagePipe::CreateLocalLocal();
dispatcher->Init(local_mp.Clone(), 0);
Handle handle(std::move(dispatcher),
MessagePipeDispatcher::kDefaultHandleRights);
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
// Write to MP 0, port 0.
{
HandleTransport transport(test::HandleTryStartTransport(handle));
EXPECT_TRUE(transport.is_valid());
std::vector<HandleTransport> transports;
transports.push_back(transport);
EXPECT_EQ(
MOJO_RESULT_OK,
mp0->WriteMessage(0, UserPointer<const void>(kHello), sizeof(kHello),
&transports, MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |handle.dispatcher| should have been closed. This is |DCHECK()|ed when
// the |handle.dispatcher| is destroyed.
EXPECT_TRUE(handle.dispatcher->HasOneRef());
handle.reset();
}
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
mp1->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 1, port 1.
char read_buffer[100] = {0};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
HandleVector read_handles;
uint32_t read_num_handles = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_handles,
&read_num_handles, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
EXPECT_EQ(1u, read_handles.size());
EXPECT_EQ(1u, read_num_handles);
ASSERT_TRUE(read_handles[0]);
EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());
EXPECT_EQ(Dispatcher::Type::MESSAGE_PIPE,
read_handles[0].dispatcher->GetType());
EXPECT_EQ(MessagePipeDispatcher::kDefaultHandleRights,
read_handles[0].rights);
handle = std::move(read_handles[0]);
read_handles.clear();
// Now pass it back.
// Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
waiter.Init();
ASSERT_EQ(
MOJO_RESULT_OK,
mp0->AddAwakable(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
// Write to MP 1, port 1.
{
HandleTransport transport(test::HandleTryStartTransport(handle));
EXPECT_TRUE(transport.is_valid());
std::vector<HandleTransport> transports;
transports.push_back(transport);
EXPECT_EQ(
MOJO_RESULT_OK,
mp1->WriteMessage(1, UserPointer<const void>(kWorld), sizeof(kWorld),
&transports, MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |handle.dispatcher| should have been closed. This is |DCHECK()|ed when
// the |handle.dispatcher| is destroyed.
EXPECT_TRUE(handle.dispatcher->HasOneRef());
handle.reset();
}
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(456u, context);
hss = HandleSignalsState();
mp0->RemoveAwakable(0, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from MP 0, port 0.
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
read_num_handles = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
mp0->ReadMessage(0, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_handles,
&read_num_handles, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kWorld, read_buffer);
EXPECT_EQ(1u, read_handles.size());
EXPECT_EQ(1u, read_num_handles);
ASSERT_TRUE(read_handles[0]);
EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());
EXPECT_EQ(Dispatcher::Type::MESSAGE_PIPE,
read_handles[0].dispatcher->GetType());
EXPECT_EQ(MessagePipeDispatcher::kDefaultHandleRights,
read_handles[0].rights);
dispatcher = RefPtr<MessagePipeDispatcher>(
static_cast<MessagePipeDispatcher*>(read_handles[0].dispatcher.get()));
read_handles.clear();
// Add the waiter now, before it becomes readable to avoid a race.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
dispatcher->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 789,
nullptr));
// Write to "local_mp", port 1.
EXPECT_EQ(
MOJO_RESULT_OK,
local_mp->WriteMessage(1, UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
// Wait for the dispatcher to become readable.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(789u, context);
hss = HandleSignalsState();
dispatcher->RemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from the dispatcher.
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->ReadMessage(UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), 0,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
// Prepare to wait on "local_mp", port 1.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
local_mp->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789,
nullptr));
// Write to the dispatcher.
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->WriteMessage(
UserPointer<const void>(kHello), sizeof(kHello),
nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(789u, context);
hss = HandleSignalsState();
local_mp->RemoveAwakable(1, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Read from "local_mp", port 1.
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
local_mp->ReadMessage(1, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), nullptr,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
// TODO(vtl): Also test the cases where messages are written and read (at
// various points) on the message pipe being passed around.
// Close everything that belongs to us.
mp0->Close(0);
mp1->Close(1);
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
// Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
local_mp->Close(1);
}
} // namespace
} // namespace system
} // namespace mojo