| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a |
| // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to |
| // increase tolerance and reduce observed flakiness (though doing so reduces the |
| // meaningfulness of the test). |
| |
| #include "mojo/edk/system/message_pipe_dispatcher.h" |
| |
| #include <string.h> |
| |
| #include <limits> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "mojo/edk/platform/test_stopwatch.h" |
| #include "mojo/edk/platform/thread_utils.h" |
| #include "mojo/edk/system/message_pipe.h" |
| #include "mojo/edk/system/test/random.h" |
| #include "mojo/edk/system/test/simple_test_thread.h" |
| #include "mojo/edk/system/test/timeouts.h" |
| #include "mojo/edk/system/waiter.h" |
| #include "mojo/edk/system/waiter_test_utils.h" |
| #include "mojo/edk/util/make_unique.h" |
| #include "mojo/edk/util/ref_ptr.h" |
| #include "mojo/public/cpp/system/macros.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| using mojo::platform::test::Stopwatch; |
| using mojo::platform::ThreadSleep; |
| using mojo::util::MakeUnique; |
| using mojo::util::RefPtr; |
| |
| namespace mojo { |
| namespace system { |
| namespace { |
| |
| TEST(MessagePipeDispatcherTest, Basic) { |
| Stopwatch stopwatch; |
| int32_t buffer[1]; |
| const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); |
| uint32_t buffer_size; |
| |
| // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. |
| for (unsigned i = 0; i < 2; i++) { |
| auto d0 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| EXPECT_EQ(Dispatcher::Type::MESSAGE_PIPE, d0->GetType()); |
| auto d1 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d0->Init(mp.Clone(), i); // 0, 1. |
| d1->Init(std::move(mp), i ^ 1); // 1, 0. |
| } |
| Waiter w; |
| uint32_t context = 0; |
| HandleSignalsState hss; |
| |
| // Try adding a writable waiter when already writable. |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| // Shouldn't need to remove the waiter (it was not added). |
| |
| // Add a readable waiter to |d0|, then make it readable (by writing to |
| // |d1|), then wait. |
| w.Init(); |
| ASSERT_EQ(MOJO_RESULT_OK, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr)); |
| buffer[0] = 123456789; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, |
| nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| stopwatch.Start(); |
| EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); |
| EXPECT_EQ(1u, context); |
| EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); |
| hss = HandleSignalsState(); |
| d0->RemoveAwakable(&w, &hss); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Try adding a readable waiter when already readable (from above). |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| // Shouldn't need to remove the waiter (it was not added). |
| |
| // Make |d0| no longer readable (by reading from it). |
| buffer[0] = 0; |
| buffer_size = kBufferSize; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d0->ReadMessage(UserPointer<void>(buffer), |
| MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE)); |
| EXPECT_EQ(kBufferSize, buffer_size); |
| EXPECT_EQ(123456789, buffer[0]); |
| |
| // Wait for zero time for readability on |d0| (will time out). |
| w.Init(); |
| ASSERT_EQ(MOJO_RESULT_OK, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); |
| stopwatch.Start(); |
| EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); |
| EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); |
| hss = HandleSignalsState(); |
| d0->RemoveAwakable(&w, &hss); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Wait for non-zero, finite time for readability on |d0| (will time out). |
| w.Init(); |
| ASSERT_EQ(MOJO_RESULT_OK, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); |
| stopwatch.Start(); |
| EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, |
| w.Wait(2 * test::EpsilonTimeout(), nullptr)); |
| MojoDeadline elapsed = stopwatch.Elapsed(); |
| EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); |
| EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); |
| hss = HandleSignalsState(); |
| d0->RemoveAwakable(&w, &hss); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Check the peer closed signal. |
| w.Init(); |
| ASSERT_EQ(MOJO_RESULT_OK, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr)); |
| |
| // Close the peer. |
| EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); |
| |
| // It should be signaled. |
| EXPECT_EQ(MOJO_RESULT_OK, w.Wait(test::TinyTimeout(), &context)); |
| EXPECT_EQ(12u, context); |
| hss = HandleSignalsState(); |
| d0->RemoveAwakable(&w, &hss); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); |
| } |
| } |
| |
| TEST(MessagePipeDispatcherTest, SupportsEntrypointClass) { |
| auto d = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| ASSERT_TRUE(d); |
| |
| // We need to initialize |d|. |
| { |
| auto d_peer = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d->Init(mp.Clone(), 0); |
| d_peer->Init(std::move(mp), 1); |
| EXPECT_EQ(MOJO_RESULT_OK, d_peer->Close()); |
| } |
| |
| EXPECT_TRUE(d->SupportsEntrypointClass(EntrypointClass::MESSAGE_PIPE)); |
| EXPECT_FALSE(d->SupportsEntrypointClass(EntrypointClass::DATA_PIPE_PRODUCER)); |
| EXPECT_FALSE(d->SupportsEntrypointClass(EntrypointClass::DATA_PIPE_CONSUMER)); |
| EXPECT_FALSE(d->SupportsEntrypointClass(EntrypointClass::BUFFER)); |
| |
| // TODO(vtl): Check that it actually returns |MOJO_RESULT_INVALID_ARGUMENT| |
| // for methods in unsupported entrypoint classes. |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d->Close()); |
| } |
| |
| TEST(MessagePipeDispatcherTest, InvalidParams) { |
| char buffer[1]; |
| |
| auto d0 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto d1 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d0->Init(mp.Clone(), 0); |
| d1->Init(std::move(mp), 1); |
| } |
| |
| // |WriteMessage|: |
| // Huge buffer size. |
| EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, |
| d0->WriteMessage(UserPointer<const void>(buffer), |
| std::numeric_limits<uint32_t>::max(), nullptr, |
| MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); |
| EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); |
| } |
| |
| // These test invalid arguments that should cause death if we're being paranoid |
| // about checking arguments (which we would want to do if, e.g., we were in a |
| // true "kernel" situation, but we might not want to do otherwise for |
| // performance reasons). Probably blatant errors like passing in null pointers |
| // (for required pointer arguments) will still cause death, but perhaps not |
| // predictably. |
| TEST(MessagePipeDispatcherTest, InvalidParamsDeath) { |
| const char kMemoryCheckFailedRegex[] = "Check failed"; |
| |
| auto d0 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto d1 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d0->Init(mp.Clone(), 0); |
| d1->Init(std::move(mp), 1); |
| } |
| |
| // |WriteMessage|: |
| // Null buffer with nonzero buffer size. |
| EXPECT_DEATH_IF_SUPPORTED(d0->WriteMessage(NullUserPointer(), 1, nullptr, |
| MOJO_WRITE_MESSAGE_FLAG_NONE), |
| kMemoryCheckFailedRegex); |
| |
| // |ReadMessage|: |
| // Null buffer with nonzero buffer size. |
| // First write something so that we actually have something to read. |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d1->WriteMessage(UserPointer<const void>("x"), 1, nullptr, |
| MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| uint32_t buffer_size = 1; |
| EXPECT_DEATH_IF_SUPPORTED( |
| d0->ReadMessage(NullUserPointer(), MakeUserPointer(&buffer_size), 0, |
| nullptr, MOJO_READ_MESSAGE_FLAG_NONE), |
| kMemoryCheckFailedRegex); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); |
| EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); |
| } |
| |
| // Test what happens when one end is closed (single-threaded test). |
| TEST(MessagePipeDispatcherTest, BasicClosed) { |
| int32_t buffer[1]; |
| const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); |
| uint32_t buffer_size; |
| |
| // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. |
| for (unsigned i = 0; i < 2; i++) { |
| auto d0 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto d1 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d0->Init(mp.Clone(), i); // 0, 1. |
| d1->Init(std::move(mp), i ^ 1); // 1, 0. |
| } |
| Waiter w; |
| HandleSignalsState hss; |
| |
| // Write (twice) to |d1|. |
| buffer[0] = 123456789; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, |
| nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| buffer[0] = 234567890; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, |
| nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| |
| // Try waiting for readable on |d0|; should fail (already satisfied). |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Try reading from |d1|; should fail (nothing to read). |
| buffer[0] = 0; |
| buffer_size = kBufferSize; |
| EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, |
| d1->ReadMessage(UserPointer<void>(buffer), |
| MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE)); |
| |
| // Close |d1|. |
| EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); |
| |
| // Try waiting for readable on |d0|; should fail (already satisfied). |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Read from |d0|. |
| buffer[0] = 0; |
| buffer_size = kBufferSize; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d0->ReadMessage(UserPointer<void>(buffer), |
| MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE)); |
| EXPECT_EQ(kBufferSize, buffer_size); |
| EXPECT_EQ(123456789, buffer[0]); |
| |
| // Try waiting for readable on |d0|; should fail (already satisfied). |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Read again from |d0|. |
| buffer[0] = 0; |
| buffer_size = kBufferSize; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d0->ReadMessage(UserPointer<void>(buffer), |
| MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE)); |
| EXPECT_EQ(kBufferSize, buffer_size); |
| EXPECT_EQ(234567890, buffer[0]); |
| |
| // Try waiting for readable on |d0|; should fail (unsatisfiable). |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
| |
| // Try waiting for writable on |d0|; should fail (unsatisfiable). |
| w.Init(); |
| hss = HandleSignalsState(); |
| EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
| d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss)); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
| |
| // Try reading from |d0|; should fail (nothing to read and other end |
| // closed). |
| buffer[0] = 0; |
| buffer_size = kBufferSize; |
| EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
| d0->ReadMessage(UserPointer<void>(buffer), |
| MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE)); |
| |
| // Try writing to |d0|; should fail (other end closed). |
| buffer[0] = 345678901; |
| EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
| d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize, |
| nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); |
| } |
| } |
| |
| TEST(MessagePipeDispatcherTest, BasicThreaded) { |
| Stopwatch stopwatch; |
| int32_t buffer[1]; |
| const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); |
| uint32_t buffer_size; |
| MojoDeadline elapsed; |
| bool did_wait; |
| MojoResult result; |
| uint32_t context; |
| HandleSignalsState hss; |
| |
| // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. |
| for (unsigned i = 0; i < 2; i++) { |
| auto d0 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto d1 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d0->Init(mp.Clone(), i); // 0, 1. |
| d1->Init(std::move(mp), i ^ 1); // 1, 0. |
| } |
| |
| // Wait for readable on |d1|, which will become readable after some time. |
| { |
| test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, |
| MOJO_DEADLINE_INDEFINITE, 1, &did_wait, &result, |
| &context, &hss); |
| stopwatch.Start(); |
| thread.Start(); |
| ThreadSleep(2 * test::EpsilonTimeout()); |
| // Wake it up by writing to |d0|. |
| buffer[0] = 123456789; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize, |
| nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| } // Joins the thread. |
| elapsed = stopwatch.Elapsed(); |
| EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); |
| EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); |
| EXPECT_TRUE(did_wait); |
| EXPECT_EQ(MOJO_RESULT_OK, result); |
| EXPECT_EQ(1u, context); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Now |d1| is already readable. Try waiting for it again. |
| { |
| test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, |
| MOJO_DEADLINE_INDEFINITE, 2, &did_wait, &result, |
| &context, &hss); |
| stopwatch.Start(); |
| thread.Start(); |
| } // Joins the thread. |
| EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); |
| EXPECT_FALSE(did_wait); |
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
| hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | |
| MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| hss.satisfiable_signals); |
| |
| // Consume what we wrote to |d0|. |
| buffer[0] = 0; |
| buffer_size = kBufferSize; |
| EXPECT_EQ(MOJO_RESULT_OK, |
| d1->ReadMessage(UserPointer<void>(buffer), |
| MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE)); |
| EXPECT_EQ(kBufferSize, buffer_size); |
| EXPECT_EQ(123456789, buffer[0]); |
| |
| // Wait for readable on |d1| and close |d0| after some time, which should |
| // cancel that wait. |
| { |
| test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, |
| MOJO_DEADLINE_INDEFINITE, 3, &did_wait, &result, |
| &context, &hss); |
| stopwatch.Start(); |
| thread.Start(); |
| ThreadSleep(2 * test::EpsilonTimeout()); |
| EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); |
| } // Joins the thread. |
| elapsed = stopwatch.Elapsed(); |
| EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); |
| EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); |
| EXPECT_TRUE(did_wait); |
| EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); |
| EXPECT_EQ(3u, context); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); |
| } |
| |
| for (unsigned i = 0; i < 2; i++) { |
| auto d0 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto d1 = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d0->Init(mp.Clone(), i); // 0, 1. |
| d1->Init(std::move(mp), i ^ 1); // 1, 0. |
| } |
| |
| // Wait for readable on |d1| and close |d1| after some time, which should |
| // cancel that wait. |
| { |
| test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, |
| MOJO_DEADLINE_INDEFINITE, 4, &did_wait, &result, |
| &context, &hss); |
| stopwatch.Start(); |
| thread.Start(); |
| ThreadSleep(2 * test::EpsilonTimeout()); |
| EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); |
| } // Joins the thread. |
| elapsed = stopwatch.Elapsed(); |
| EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); |
| EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); |
| EXPECT_TRUE(did_wait); |
| EXPECT_EQ(MOJO_RESULT_CANCELLED, result); |
| EXPECT_EQ(4u, context); |
| EXPECT_EQ(0u, hss.satisfied_signals); |
| EXPECT_EQ(0u, hss.satisfiable_signals); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); |
| } |
| } |
| |
| // Stress test ----------------------------------------------------------------- |
| |
| const size_t kMaxMessageSize = 2000; |
| |
| class WriterThread : public test::SimpleTestThread { |
| public: |
| // |*messages_written| and |*bytes_written| belong to the thread while it's |
| // alive. |
| WriterThread(RefPtr<Dispatcher> write_dispatcher, |
| size_t* messages_written, |
| size_t* bytes_written) |
| : write_dispatcher_(write_dispatcher), |
| messages_written_(messages_written), |
| bytes_written_(bytes_written) { |
| *messages_written_ = 0; |
| *bytes_written_ = 0; |
| } |
| |
| ~WriterThread() override { Join(); } |
| |
| private: |
| void Run() override { |
| // Make some data to write. |
| unsigned char buffer[kMaxMessageSize]; |
| for (size_t i = 0; i < kMaxMessageSize; i++) |
| buffer[i] = static_cast<unsigned char>(i); |
| |
| // Number of messages to write. |
| *messages_written_ = static_cast<size_t>(test::RandomInt(1000, 6000)); |
| |
| // Write messages. |
| for (size_t i = 0; i < *messages_written_; i++) { |
| uint32_t bytes_to_write = static_cast<uint32_t>( |
| test::RandomInt(1, static_cast<int>(kMaxMessageSize))); |
| EXPECT_EQ(MOJO_RESULT_OK, |
| write_dispatcher_->WriteMessage(UserPointer<const void>(buffer), |
| bytes_to_write, nullptr, |
| MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| *bytes_written_ += bytes_to_write; |
| } |
| |
| // Write one last "quit" message. |
| EXPECT_EQ(MOJO_RESULT_OK, write_dispatcher_->WriteMessage( |
| UserPointer<const void>("quit"), 4, nullptr, |
| MOJO_WRITE_MESSAGE_FLAG_NONE)); |
| } |
| |
| const RefPtr<Dispatcher> write_dispatcher_; |
| size_t* const messages_written_; |
| size_t* const bytes_written_; |
| |
| MOJO_DISALLOW_COPY_AND_ASSIGN(WriterThread); |
| }; |
| |
| class ReaderThread : public test::SimpleTestThread { |
| public: |
| // |*messages_read| and |*bytes_read| belong to the thread while it's alive. |
| ReaderThread(RefPtr<Dispatcher> read_dispatcher, |
| size_t* messages_read, |
| size_t* bytes_read) |
| : read_dispatcher_(read_dispatcher), |
| messages_read_(messages_read), |
| bytes_read_(bytes_read) { |
| *messages_read_ = 0; |
| *bytes_read_ = 0; |
| } |
| |
| ~ReaderThread() override { Join(); } |
| |
| private: |
| void Run() override { |
| unsigned char buffer[kMaxMessageSize]; |
| Waiter w; |
| HandleSignalsState hss; |
| MojoResult result; |
| |
| // Read messages. |
| for (;;) { |
| // Wait for it to be readable. |
| w.Init(); |
| hss = HandleSignalsState(); |
| result = read_dispatcher_->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, |
| &hss); |
| EXPECT_TRUE(result == MOJO_RESULT_OK || |
| result == MOJO_RESULT_ALREADY_EXISTS) |
| << "result: " << result; |
| if (result == MOJO_RESULT_OK) { |
| // Actually need to wait. |
| EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr)); |
| read_dispatcher_->RemoveAwakable(&w, &hss); |
| } |
| // We may not actually be readable, since we're racing with other threads. |
| EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); |
| |
| // Now, try to do the read. |
| // Clear the buffer so that we can check the result. |
| memset(buffer, 0, sizeof(buffer)); |
| uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
| result = read_dispatcher_->ReadMessage( |
| UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, nullptr, |
| MOJO_READ_MESSAGE_FLAG_NONE); |
| EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT) |
| << "result: " << result; |
| // We're racing with others to read, so maybe we failed. |
| if (result == MOJO_RESULT_SHOULD_WAIT) |
| continue; // In which case, try again. |
| // Check for quit. |
| if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) |
| return; |
| EXPECT_GE(buffer_size, 1u); |
| EXPECT_LE(buffer_size, kMaxMessageSize); |
| EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); |
| |
| (*messages_read_)++; |
| *bytes_read_ += buffer_size; |
| } |
| } |
| |
| static bool IsValidMessage(const unsigned char* buffer, |
| uint32_t message_size) { |
| size_t i; |
| for (i = 0; i < message_size; i++) { |
| if (buffer[i] != static_cast<unsigned char>(i)) |
| return false; |
| } |
| // Check that the remaining bytes weren't stomped on. |
| for (; i < kMaxMessageSize; i++) { |
| if (buffer[i] != 0) |
| return false; |
| } |
| return true; |
| } |
| |
| const RefPtr<Dispatcher> read_dispatcher_; |
| size_t* const messages_read_; |
| size_t* const bytes_read_; |
| |
| MOJO_DISALLOW_COPY_AND_ASSIGN(ReaderThread); |
| }; |
| |
| TEST(MessagePipeDispatcherTest, Stress) { |
| static const size_t kNumWriters = 30; |
| static const size_t kNumReaders = kNumWriters; |
| |
| auto d_write = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| auto d_read = MessagePipeDispatcher::Create( |
| MessagePipeDispatcher::kDefaultCreateOptions); |
| { |
| auto mp = MessagePipe::CreateLocalLocal(); |
| d_write->Init(mp.Clone(), 0); |
| d_read->Init(std::move(mp), 1); |
| } |
| |
| size_t messages_written[kNumWriters]; |
| size_t bytes_written[kNumWriters]; |
| size_t messages_read[kNumReaders]; |
| size_t bytes_read[kNumReaders]; |
| { |
| // Make writers. |
| std::vector<std::unique_ptr<WriterThread>> writers; |
| for (size_t i = 0; i < kNumWriters; i++) { |
| writers.push_back(MakeUnique<WriterThread>(d_write, &messages_written[i], |
| &bytes_written[i])); |
| } |
| |
| // Make readers. |
| std::vector<std::unique_ptr<ReaderThread>> readers; |
| for (size_t i = 0; i < kNumReaders; i++) { |
| readers.push_back( |
| MakeUnique<ReaderThread>(d_read, &messages_read[i], &bytes_read[i])); |
| } |
| |
| // Start writers. |
| for (size_t i = 0; i < kNumWriters; i++) |
| writers[i]->Start(); |
| |
| // Start readers. |
| for (size_t i = 0; i < kNumReaders; i++) |
| readers[i]->Start(); |
| |
| // TODO(vtl): Maybe I should have an event that triggers all the threads to |
| // start doing stuff for real (so that the first ones created/started aren't |
| // advantaged). |
| } // Joins all the threads. |
| |
| size_t total_messages_written = 0; |
| size_t total_bytes_written = 0; |
| for (size_t i = 0; i < kNumWriters; i++) { |
| total_messages_written += messages_written[i]; |
| total_bytes_written += bytes_written[i]; |
| } |
| size_t total_messages_read = 0; |
| size_t total_bytes_read = 0; |
| for (size_t i = 0; i < kNumReaders; i++) { |
| total_messages_read += messages_read[i]; |
| total_bytes_read += bytes_read[i]; |
| // We'd have to be really unlucky to have read no messages on a thread. |
| EXPECT_GT(messages_read[i], 0u) << "reader: " << i; |
| EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; |
| } |
| EXPECT_EQ(total_messages_written, total_messages_read); |
| EXPECT_EQ(total_bytes_written, total_bytes_read); |
| |
| EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); |
| EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); |
| } |
| |
| } // namespace |
| } // namespace system |
| } // namespace mojo |