// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// This file tests both |RemoteProducerDataPipeImpl| and
// |RemoteConsumerDataPipeImpl|.

#include <stdint.h>

#include <memory>
#include <utility>

#include "base/logging.h"
#include "mojo/edk/embedder/simple_platform_support.h"
#include "mojo/edk/platform/platform_pipe.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/data_pipe.h"
#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
#include "mojo/edk/system/handle.h"
#include "mojo/edk/system/handle_transport.h"
#include "mojo/edk/system/memory.h"
#include "mojo/edk/system/message_pipe.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/test/test_io_thread.h"
#include "mojo/edk/system/test/timeouts.h"
#include "mojo/edk/system/waiter.h"
#include "mojo/edk/util/ref_ptr.h"
#include "mojo/public/cpp/system/macros.h"
#include "testing/gtest/include/gtest/gtest.h"

using mojo::platform::PlatformPipe;
using mojo::util::MakeRefCounted;
using mojo::util::RefPtr;

namespace mojo {
namespace system {
namespace {

class RemoteDataPipeImplTest : public testing::Test {
 public:
  RemoteDataPipeImplTest()
      : platform_support_(embedder::CreateSimplePlatformSupport()),
        io_thread_(test::TestIOThread::StartMode::AUTO) {}
  ~RemoteDataPipeImplTest() override {}

  void SetUp() override {
    RefPtr<ChannelEndpoint> ep[2];
    message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]);
    message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]);

    io_thread_.PostTaskAndWait(
        [this, &ep]() { SetUpOnIOThread(std::move(ep[0]), std::move(ep[1])); });
  }

  void TearDown() override {
    EnsureMessagePipeClosed(0);
    EnsureMessagePipeClosed(1);
    io_thread_.PostTaskAndWait([this]() { TearDownOnIOThread(); });
  }

 protected:
  static RefPtr<DataPipe> CreateLocal(size_t element_size,
                                      size_t num_elements) {
    const MojoCreateDataPipeOptions options = {
        static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
        static_cast<uint32_t>(element_size),
        static_cast<uint32_t>(num_elements * element_size)};
    MojoCreateDataPipeOptions validated_options = {};
    CHECK_EQ(DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
                                             &validated_options),
             MOJO_RESULT_OK);
    return DataPipe::CreateLocal(validated_options);
  }

  RefPtr<MessagePipe> message_pipe(size_t i) { return message_pipes_[i]; }

  void EnsureMessagePipeClosed(size_t i) {
    if (!message_pipes_[i])
      return;
    message_pipes_[i]->Close(0);
    message_pipes_[i] = nullptr;
  }

 private:
  void SetUpOnIOThread(RefPtr<ChannelEndpoint>&& ep0,
                       RefPtr<ChannelEndpoint>&& ep1) {
    CHECK(io_thread_.IsCurrentAndRunning());

    PlatformPipe channel_pair;
    channels_[0] = MakeRefCounted<Channel>(platform_support_.get());
    channels_[0]->Init(io_thread_.task_runner().Clone(),
                       io_thread_.platform_handle_watcher(),
                       RawChannel::Create(channel_pair.handle0.Pass()));
    channels_[0]->SetBootstrapEndpoint(std::move(ep0));
    channels_[1] = MakeRefCounted<Channel>(platform_support_.get());
    channels_[1]->Init(io_thread_.task_runner().Clone(),
                       io_thread_.platform_handle_watcher(),
                       RawChannel::Create(channel_pair.handle1.Pass()));
    channels_[1]->SetBootstrapEndpoint(std::move(ep1));
  }

  void TearDownOnIOThread() {
    CHECK(io_thread_.IsCurrentAndRunning());

    if (channels_[0]) {
      channels_[0]->Shutdown();
      channels_[0] = nullptr;
    }
    if (channels_[1]) {
      channels_[1]->Shutdown();
      channels_[1] = nullptr;
    }
  }

  std::unique_ptr<embedder::PlatformSupport> platform_support_;
  test::TestIOThread io_thread_;
  RefPtr<Channel> channels_[2];
  RefPtr<MessagePipe> message_pipes_[2];

  MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTest);
};

// These tests are heavier-weight than ideal. They test remote data pipes by
// passing data pipe (producer/consumer) dispatchers over remote message pipes.
// Make sure that the test fixture works properly (i.e., that the message pipe
// works properly, and that things are shut down correctly).
// TODO(vtl): Make lighter-weight tests. Ideally, we'd have tests for remote
// data pipes which don't involve message pipes (or even data pipe dispatchers).
TEST_F(RemoteDataPipeImplTest, Sanity) {
  static const char kHello[] = "hello";
  char read_buffer[100] = {};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  // Write on MP 0 (port 0). Wait and receive on MP 1 (port 0). (Add the waiter
  // first, to avoid any handling the case where it's already readable.)
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->AddAwakable(
                0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
  EXPECT_EQ(MOJO_RESULT_OK,
            message_pipe(0)->WriteMessage(0, UserPointer<const void>(kHello),
                                          sizeof(kHello), nullptr,
                                          MOJO_WRITE_MESSAGE_FLAG_NONE));
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
            hss.satisfiable_signals);
  EXPECT_EQ(MOJO_RESULT_OK, message_pipe(1)->ReadMessage(
                                0, UserPointer<void>(read_buffer),
                                MakeUserPointer(&read_buffer_size), nullptr,
                                nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);
}

TEST_F(RemoteDataPipeImplTest, SendConsumerWithClosedProducer) {
  char read_buffer[100] = {};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  HandleVector read_handles;
  uint32_t read_num_handles = 10;  // Maximum to get.
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  RefPtr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000));
  // This is the consumer dispatcher we'll send.
  auto consumer = DataPipeConsumerDispatcher::Create();
  consumer->Init(dp.Clone());
  Handle consumer_handle(std::move(consumer),
                         DataPipeConsumerDispatcher::kDefaultHandleRights);

  // Write to the producer and close it, before sending the consumer.
  int32_t elements[10] = {123};
  uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
  EXPECT_EQ(MOJO_RESULT_OK,
            dp->ProducerWriteData(UserPointer<const void>(elements),
                                  MakeUserPointer(&num_bytes), false));
  EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
  dp->ProducerClose();

  // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0).
  // (Add the waiter first, to avoid any handling the case where it's already
  // readable.)
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->AddAwakable(
                0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
  {
    HandleTransport transport(test::HandleTryStartTransport(consumer_handle));
    EXPECT_TRUE(transport.is_valid());

    std::vector<HandleTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage(
                                  0, NullUserPointer(), 0, &transports,
                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |consumer_handle.dispatcher| should have been closed. This is
    // |DCHECK()|ed when it is destroyed.
    EXPECT_TRUE(consumer_handle.dispatcher->HasOneRef());
    consumer_handle.reset();
  }
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
            hss.satisfiable_signals);
  EXPECT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->ReadMessage(0, UserPointer<void>(read_buffer),
                                         MakeUserPointer(&read_buffer_size),
                                         &read_handles, &read_num_handles,
                                         MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
  EXPECT_EQ(1u, read_handles.size());
  EXPECT_EQ(1u, read_num_handles);
  ASSERT_TRUE(read_handles[0]);
  EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());

  EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER,
            read_handles[0].dispatcher->GetType());
  EXPECT_EQ(DataPipeConsumerDispatcher::kDefaultHandleRights,
            read_handles[0].rights);
  consumer = RefPtr<DataPipeConsumerDispatcher>(
      static_cast<DataPipeConsumerDispatcher*>(
          read_handles[0].dispatcher.get()));
  read_handles.clear();

  waiter.Init();
  hss = HandleSignalsState();
  MojoResult result =
      consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, &hss);
  if (result == MOJO_RESULT_OK) {
    context = 0;
    EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
    EXPECT_EQ(456u, context);
    consumer->RemoveAwakable(&waiter, &hss);
  } else {
    ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
  }
  // We don't know if the fact that the producer has been closed is known yet.
  EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
  EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READ_THRESHOLD));
  EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
  EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READ_THRESHOLD));

  // Read one element.
  elements[0] = -1;
  elements[1] = -1;
  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
  EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements),
                                               MakeUserPointer(&num_bytes),
                                               MOJO_READ_DATA_FLAG_NONE));
  EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
  EXPECT_EQ(123, elements[0]);
  EXPECT_EQ(-1, elements[1]);

  waiter.Init();
  hss = HandleSignalsState();
  result =
      consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
  if (result == MOJO_RESULT_OK) {
    context = 0;
    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
              waiter.Wait(test::ActionTimeout(), &context));
    EXPECT_EQ(789u, context);
    consumer->RemoveAwakable(&waiter, &hss);
  } else {
    ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
  }
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);

  consumer->Close();
}

TEST_F(RemoteDataPipeImplTest, SendConsumerDuringTwoPhaseWrite) {
  char read_buffer[100] = {};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  HandleVector read_handles;
  uint32_t read_num_handles = 10;  // Maximum to get.
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  RefPtr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000));
  // This is the consumer dispatcher we'll send.
  auto consumer = DataPipeConsumerDispatcher::Create();
  consumer->Init(dp.Clone());
  Handle consumer_handle(std::move(consumer),
                         DataPipeConsumerDispatcher::kDefaultHandleRights);

  void* write_ptr = nullptr;
  uint32_t num_bytes = 0u;
  EXPECT_EQ(MOJO_RESULT_OK,
            dp->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
                                       MakeUserPointer(&num_bytes)));
  ASSERT_GE(num_bytes, 1u * sizeof(int32_t));

  // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0).
  // (Add the waiter first, to avoid any handling the case where it's already
  // readable.)
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->AddAwakable(
                0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
  {
    HandleTransport transport(test::HandleTryStartTransport(consumer_handle));
    EXPECT_TRUE(transport.is_valid());

    std::vector<HandleTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage(
                                  0, NullUserPointer(), 0, &transports,
                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |consumer_handle.dispatcher| should have been closed. This is
    // |DCHECK()|ed when it is destroyed.
    EXPECT_TRUE(consumer_handle.dispatcher->HasOneRef());
    consumer_handle.reset();
  }
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
            hss.satisfiable_signals);
  EXPECT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->ReadMessage(0, UserPointer<void>(read_buffer),
                                         MakeUserPointer(&read_buffer_size),
                                         &read_handles, &read_num_handles,
                                         MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
  EXPECT_EQ(1u, read_handles.size());
  EXPECT_EQ(1u, read_num_handles);
  ASSERT_TRUE(read_handles[0]);
  EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());

  EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER,
            read_handles[0].dispatcher->GetType());
  EXPECT_EQ(DataPipeConsumerDispatcher::kDefaultHandleRights,
            read_handles[0].rights);
  consumer = RefPtr<DataPipeConsumerDispatcher>(
      static_cast<DataPipeConsumerDispatcher*>(
          read_handles[0].dispatcher.get()));
  read_handles.clear();

  // Now actually write the data, complete the two-phase write, and close the
  // producer.
  *static_cast<int32_t*>(write_ptr) = 123456;
  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(
                                static_cast<uint32_t>(1u * sizeof(int32_t))));
  dp->ProducerClose();

  // Wait for the consumer to be readable.
  waiter.Init();
  hss = HandleSignalsState();
  MojoResult result =
      consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, &hss);
  if (result == MOJO_RESULT_OK) {
    context = 0;
    EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
    EXPECT_EQ(456u, context);
    consumer->RemoveAwakable(&waiter, &hss);
  } else {
    ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
  }
  // We don't know if the fact that the producer has been closed is known yet.
  EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
  EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READ_THRESHOLD));
  EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
  EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READ_THRESHOLD));

  // Read one element.
  int32_t elements[10] = {-1, -1};
  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
  EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements),
                                               MakeUserPointer(&num_bytes),
                                               MOJO_READ_DATA_FLAG_NONE));
  EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
  EXPECT_EQ(123456, elements[0]);
  EXPECT_EQ(-1, elements[1]);

  consumer->Close();
}

// Like |SendConsumerDuringTwoPhaseWrite|, but transfers the consumer during the
// second two-phase write (to try to test that the offset in circular buffer is
// properly preserved).
TEST_F(RemoteDataPipeImplTest, SendConsumerDuringSecondTwoPhaseWrite) {
  char read_buffer[100] = {};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  HandleVector read_handles;
  uint32_t read_num_handles = 10;  // Maximum to get.
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  RefPtr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000));
  // This is the consumer dispatcher we'll send.
  auto consumer = DataPipeConsumerDispatcher::Create();
  consumer->Init(dp.Clone());
  Handle consumer_handle(std::move(consumer),
                         DataPipeConsumerDispatcher::kDefaultHandleRights);

  void* write_ptr = nullptr;
  uint32_t num_bytes = 0u;
  EXPECT_EQ(MOJO_RESULT_OK,
            dp->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
                                       MakeUserPointer(&num_bytes)));
  ASSERT_GE(num_bytes, 1u * sizeof(int32_t));
  *static_cast<int32_t*>(write_ptr) = 123456;
  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(
                                static_cast<uint32_t>(1u * sizeof(int32_t))));

  write_ptr = nullptr;
  num_bytes = 0u;
  EXPECT_EQ(MOJO_RESULT_OK,
            dp->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
                                       MakeUserPointer(&num_bytes)));
  ASSERT_GE(num_bytes, 1u * sizeof(int32_t));

  // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0).
  // (Add the waiter first, to avoid any handling the case where it's already
  // readable.)
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->AddAwakable(
                0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
  {
    HandleTransport transport(test::HandleTryStartTransport(consumer_handle));
    EXPECT_TRUE(transport.is_valid());

    std::vector<HandleTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage(
                                  0, NullUserPointer(), 0, &transports,
                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |consumer_handle.dispatcher| should have been closed. This is
    // |DCHECK()|ed when it is destroyed.
    EXPECT_TRUE(consumer_handle.dispatcher->HasOneRef());
    consumer_handle.reset();
  }
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
                MOJO_HANDLE_SIGNAL_PEER_CLOSED,
            hss.satisfiable_signals);
  EXPECT_EQ(MOJO_RESULT_OK,
            message_pipe(1)->ReadMessage(0, UserPointer<void>(read_buffer),
                                         MakeUserPointer(&read_buffer_size),
                                         &read_handles, &read_num_handles,
                                         MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
  EXPECT_EQ(1u, read_handles.size());
  EXPECT_EQ(1u, read_num_handles);
  ASSERT_TRUE(read_handles[0]);
  EXPECT_TRUE(read_handles[0].dispatcher->HasOneRef());

  EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER,
            read_handles[0].dispatcher->GetType());
  EXPECT_EQ(DataPipeConsumerDispatcher::kDefaultHandleRights,
            read_handles[0].rights);
  consumer = RefPtr<DataPipeConsumerDispatcher>(
      static_cast<DataPipeConsumerDispatcher*>(
          read_handles[0].dispatcher.get()));
  read_handles.clear();

  // Now actually write the data, complete the two-phase write, and close the
  // producer.
  *static_cast<int32_t*>(write_ptr) = 789012;
  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(
                                static_cast<uint32_t>(1u * sizeof(int32_t))));
  dp->ProducerClose();

  // Wait for the consumer to know that the producer is closed.
  waiter.Init();
  hss = HandleSignalsState();
  MojoResult result =
      consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 456, &hss);
  if (result == MOJO_RESULT_OK) {
    context = 0;
    EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
    EXPECT_EQ(456u, context);
    consumer->RemoveAwakable(&waiter, &hss);
  } else {
    ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
  }
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
                MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
            hss.satisfiable_signals);

  // Read some elements.
  int32_t elements[10] = {};
  num_bytes = static_cast<uint32_t>(sizeof(elements));
  EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements),
                                               MakeUserPointer(&num_bytes),
                                               MOJO_READ_DATA_FLAG_NONE));
  EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
  EXPECT_EQ(123456, elements[0]);
  EXPECT_EQ(789012, elements[1]);
  EXPECT_EQ(0, elements[2]);

  consumer->Close();
}

}  // namespace
}  // namespace system
}  // namespace mojo
