blob: 0fed6a53cdade10ff3b41a27ece61f57d231d109 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <cstdio>
#include <cstring>
#include <memory>
#include <set>
#include <unordered_map>
#include <vector>
#include <sys/time.h>
#include "mojo/public/platform/dart/dart_handle_watcher.h"
#include "dart/runtime/include/dart_api.h"
#include "dart/runtime/include/dart_native_api.h"
#include "mojo/public/c/system/functions.h"
#include "mojo/public/c/system/message_pipe.h"
#include "mojo/public/c/system/types.h"
namespace mojo {
namespace dart {
#define CONTROL_HANDLE_INDEX 0
static void PostNull(Dart_Port port) {
if (port == ILLEGAL_PORT) {
return;
}
Dart_CObject message;
message.type = Dart_CObject_kNull;
Dart_PostCObject(port, &message);
}
static void PostSignal(Dart_Port port,
int32_t signals,
int32_t signalled) {
if (port == ILLEGAL_PORT) {
return;
}
Dart_CObject message;
Dart_CObject value_0;
Dart_CObject value_1;
Dart_CObject* values[] = { &value_0, &value_1 };
value_0.type = Dart_CObject_kInt32;
value_0.value.as_int32 = signals;
value_1.type = Dart_CObject_kInt32;
value_1.value.as_int32 = signalled;
message.type = Dart_CObject_kArray;
message.value.as_array.length = 2;
message.value.as_array.values = &values[0];
Dart_PostCObject(port, &message);
}
// The internal state of the handle watcher thread.
class HandleWatcherThreadState {
public:
HandleWatcherThreadState(MojoHandle control_pipe_consumer_handle);
~HandleWatcherThreadState();
void Run();
private:
struct HandleWatcherTimer {
int64_t deadline;
Dart_Port port;
// Sort on deadline.
friend bool operator<(const HandleWatcherTimer& l,
const HandleWatcherTimer& r) {
return l.deadline < r.deadline;
}
};
void AddHandle(MojoHandle handle,
MojoHandleSignals signals,
Dart_Port port);
void RemoveHandle(MojoHandle handle);
void CloseHandle(MojoHandle handle, Dart_Port port, bool pruning = false);
void UpdateTimer(int64_t deadline, Dart_Port port);
void Shutdown(Dart_Port port);
void RemoveHandleAtIndex(intptr_t i);
void ProcessControlMessage();
void ProcessTimers();
void ProcessWaitManyResults(MojoResult result, uint32_t result_index);
void PruneClosedHandles(bool signals_state_is_valid);
void CompleteNextTimer();
bool HasTimers();
int64_t NextTimerDeadline();
int64_t WaitDeadline();
Dart_Port shutdown_port_;
MojoHandle control_pipe_consumer_handle_;
// All of these vectors are indexed together.
std::vector<MojoHandle> wait_many_handles_;
std::vector<MojoHandleSignals> wait_many_signals_;
std::vector<MojoHandleSignalsState> wait_many_signals_state_;
std::vector<Dart_Port> handle_ports_;
// Map from MojoHandle -> index into above arrays.
std::unordered_map<MojoHandle, intptr_t> handle_to_index_map_;
// Set of timers sorted by earliest deadline.
std::set<HandleWatcherTimer> timers_;
MOJO_DISALLOW_COPY_AND_ASSIGN(HandleWatcherThreadState);
};
HandleWatcherThreadState::HandleWatcherThreadState(
MojoHandle control_pipe_consumer_handle)
: shutdown_port_(ILLEGAL_PORT),
control_pipe_consumer_handle_(control_pipe_consumer_handle) {
MOJO_CHECK(control_pipe_consumer_handle_ != MOJO_HANDLE_INVALID);
// Add the control handle.
AddHandle(control_pipe_consumer_handle_,
MOJO_HANDLE_SIGNAL_READABLE,
ILLEGAL_PORT);
}
HandleWatcherThreadState::~HandleWatcherThreadState() {
if (control_pipe_consumer_handle_ != MOJO_HANDLE_INVALID) {
MojoClose(control_pipe_consumer_handle_);
control_pipe_consumer_handle_ = MOJO_HANDLE_INVALID;
}
}
void HandleWatcherThreadState::AddHandle(MojoHandle handle,
MojoHandleSignals signals,
Dart_Port port) {
const size_t index = wait_many_handles_.size();
MojoHandleSignalsState signals_state =
{ MOJO_HANDLE_SIGNAL_NONE, MOJO_HANDLE_SIGNAL_NONE};
auto it = handle_to_index_map_.find(handle);
if (it != handle_to_index_map_.end()) {
intptr_t index = it->second;
// Sanity check.
MOJO_CHECK(wait_many_handles_[index] == handle);
// We only support 1:1 mapping from handles to ports.
MOJO_CHECK(handle_ports_[index] == port);
// Adjust the signals for this handle.
wait_many_signals_[index] |= signals;
} else {
// New handle.
wait_many_handles_.push_back(handle);
wait_many_signals_.push_back(signals);
wait_many_signals_state_.push_back(signals_state);
handle_ports_.push_back(port);
handle_to_index_map_[handle] = index;
}
// Sanity check.
MOJO_CHECK(wait_many_handles_.size() == handle_to_index_map_.size());
}
void HandleWatcherThreadState::RemoveHandle(MojoHandle handle) {
auto it = handle_to_index_map_.find(handle);
MOJO_CHECK(it != handle_to_index_map_.end());
const intptr_t index = it->second;
// We should never be removing the control handle.
MOJO_CHECK(index != CONTROL_HANDLE_INDEX);
RemoveHandleAtIndex(index);
}
void HandleWatcherThreadState::CloseHandle(MojoHandle handle,
Dart_Port port,
bool pruning) {
MOJO_CHECK(!pruning || (port == ILLEGAL_PORT));
auto it = handle_to_index_map_.find(handle);
if (it == handle_to_index_map_.end()) {
// An app isolate may request that the handle watcher close a handle that
// has already been pruned. This happens when the app isolate has not yet
// received the PEER_CLOSED event. The app isolate will not close the
// handle, so we must do so here.
MojoClose(handle);
if (port != ILLEGAL_PORT) {
// Notify that close is done.
PostNull(port);
}
return;
}
MojoClose(handle);
if (port != ILLEGAL_PORT) {
// Notify that close is done.
PostNull(port);
}
const intptr_t index = it->second;
MOJO_CHECK(index != CONTROL_HANDLE_INDEX);
if (pruning) {
// If this handle is being pruned, notify the application isolate
// by sending PEER_CLOSED;
PostSignal(handle_ports_[index],
wait_many_signals_[index],
MOJO_HANDLE_SIGNAL_PEER_CLOSED);
}
// Remove the handle.
RemoveHandle(handle);
}
void HandleWatcherThreadState::UpdateTimer(int64_t deadline, Dart_Port port) {
// Scan the timers to see if we have a timer with |port|.
auto it = timers_.begin();
while (it != timers_.end()) {
if (it->port == port) {
break;
}
it++;
}
// We found an existing timer with |port|. Remove it.
if (it != timers_.end()) {
timers_.erase(it);
}
if (deadline < 0) {
// A negative deadline means we should cancel this timer completely.
return;
}
// Create a new timer with the current deadline.
HandleWatcherTimer timer;
timer.deadline = deadline;
timer.port = port;
timers_.insert(timer);
}
void HandleWatcherThreadState::Shutdown(Dart_Port port) {
// Break out of the loop by setting the shutdown port.
MOJO_CHECK(port != ILLEGAL_PORT);
shutdown_port_ = port;
}
void HandleWatcherThreadState::RemoveHandleAtIndex(intptr_t index) {
MOJO_CHECK(index != CONTROL_HANDLE_INDEX);
const intptr_t last_index = wait_many_handles_.size() - 1;
// Remove handle from handle map.
handle_to_index_map_.erase(wait_many_handles_[index]);
if (index != last_index) {
// We should never be overwriting CONTROL_HANDLE_INDEX.
MojoHandle handle = wait_many_handles_[last_index];
// Replace |index| with |last_index|.
wait_many_handles_[index] = wait_many_handles_[last_index];
wait_many_signals_[index] = wait_many_signals_[last_index];
wait_many_signals_state_[index] = wait_many_signals_state_[last_index];
handle_ports_[index] = handle_ports_[last_index];
// Update handle map.
handle_to_index_map_[handle] = index;
}
wait_many_handles_.pop_back();
wait_many_signals_.pop_back();
wait_many_signals_state_.pop_back();
handle_ports_.pop_back();
MOJO_CHECK(wait_many_handles_.size() >= 1);
// Sanity check.
MOJO_CHECK(wait_many_handles_.size() == handle_to_index_map_.size());
}
void HandleWatcherThreadState::ProcessControlMessage() {
HandleWatcherCommand command = HandleWatcherCommand::Empty();
uint32_t num_bytes = sizeof(command);
uint32_t num_handles = 0;
MojoResult res = MojoReadMessage(control_pipe_consumer_handle_,
reinterpret_cast<void*>(&command),
&num_bytes,
nullptr,
&num_handles,
0);
// Sanity check that we received the expected amount of data.
MOJO_CHECK(res == MOJO_RESULT_OK);
MOJO_CHECK(num_bytes == sizeof(command));
MOJO_CHECK(num_handles == 0);
switch (command.command()) {
case HandleWatcherCommand::kCommandAddHandle:
AddHandle(command.handle(), command.signals(), command.port());
break;
case HandleWatcherCommand::kCommandRemoveHandle:
RemoveHandle(command.handle());
break;
case HandleWatcherCommand::kCommandCloseHandle:
CloseHandle(command.handle(), command.port());
break;
case HandleWatcherCommand::kCommandAddTimer:
UpdateTimer(command.deadline(), command.port());
break;
case HandleWatcherCommand::kCommandShutdownHandleWatcher:
Shutdown(command.port());
break;
default:
MOJO_CHECK(false);
break;
}
}
// Dart's DateTime class calls gettimeofday to get the time.
// TODO(johnmccutchan): Expose an API in |dart_api.h| that returns the same
// value that DateTime uses.
static int64_t GetDartTimeInMillis() {
struct timeval tv;
if (gettimeofday(&tv, nullptr) < 0) {
MOJO_CHECK(false);
return 0;
}
return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000;
}
void HandleWatcherThreadState::ProcessTimers() {
int64_t now = GetDartTimeInMillis();
while (HasTimers() && now >= NextTimerDeadline()) {
CompleteNextTimer();
now = GetDartTimeInMillis();
}
}
void HandleWatcherThreadState::CompleteNextTimer() {
auto it = timers_.begin();
MOJO_CHECK(it != timers_.end());
// Notify that the timer is complete.
PostNull(it->port);
// Remove it from the timer set.
timers_.erase(it);
}
bool HandleWatcherThreadState::HasTimers() {
return !timers_.empty();
}
int64_t HandleWatcherThreadState::NextTimerDeadline() {
auto it = timers_.begin();
MOJO_CHECK(it != timers_.end());
return it->deadline;
}
int64_t HandleWatcherThreadState::WaitDeadline() {
if (!HasTimers()) {
// No pending timers. Wait indefinitely.
return MOJO_DEADLINE_INDEFINITE;
}
int64_t now = GetDartTimeInMillis();
return (NextTimerDeadline() - now) * 1000;
}
static bool ShouldCloseHandle(MojoHandle handle) {
if (handle == MOJO_HANDLE_INVALID) {
return false;
}
// Call wait with a deadline of 0. If the result of this is OK or
// DEADLINE_EXCEEDED, the handle is still open.
MojoResult result = MojoWait(handle, MOJO_HANDLE_SIGNAL_ALL, 0, NULL);
return (result != MOJO_RESULT_OK) &&
(result != MOJO_RESULT_DEADLINE_EXCEEDED);
}
void HandleWatcherThreadState::PruneClosedHandles(bool signals_state_is_valid) {
std::vector<MojoHandle> closed_handles;
const intptr_t num_handles = wait_many_handles_.size();
if (signals_state_is_valid) {
// We can rely on |wait_many_signals_state_| having valid data.
for (intptr_t i = 0; i < num_handles; i++) {
// Check if the handle at index |i| has been closed.
MojoHandleSignals satisfied_signals =
wait_many_signals_state_[i].satisfied_signals;
if ((satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0) {
closed_handles.push_back(wait_many_handles_[i]);
}
}
} else {
// We can't rely on |wait_many_signals_state_| having valid data. So
// we call Wait on each handle and check the status.
for (intptr_t i = 0; i < num_handles; i++) {
MojoHandle handle = wait_many_handles_[i];
if (ShouldCloseHandle(handle)) {
closed_handles.push_back(handle);
}
}
}
// Process all closed handles and notify their ports.
for (size_t i = 0; i < closed_handles.size(); i++) {
MojoHandle handle = closed_handles[i];
CloseHandle(handle, ILLEGAL_PORT, true);
}
}
void HandleWatcherThreadState::ProcessWaitManyResults(MojoResult result,
uint32_t result_index) {
MOJO_CHECK(result != MOJO_RESULT_DEADLINE_EXCEEDED);
if (result != MOJO_RESULT_OK) {
// The WaitMany call failed. We need to prune closed handles from our
// wait many set and try again.
//
// If the result is an invalid argument |wait_many_signals_state_| is
// meaningless.
PruneClosedHandles(result != MOJO_RESULT_INVALID_ARGUMENT);
return;
}
MOJO_CHECK(result == MOJO_RESULT_OK);
// Indexes of handles that we are done with.
std::vector<intptr_t> to_remove;
const intptr_t num_handles = wait_many_handles_.size();
// Loop over all handles except for the control handle.
// The order of the looping matters because we call RemoveHandleAtIndex
// and need the handle indexes to start at the highest and decrease.
for (intptr_t i = num_handles - 1; i > 0; i--) {
MojoHandleSignals signals = wait_many_signals_[i];
MojoHandleSignals satisfied_signals =
wait_many_signals_state_[i].satisfied_signals;
satisfied_signals &= signals;
if (satisfied_signals != 0) {
// Something happened to this handle.
// Notify the port.
PostSignal(handle_ports_[i], signals, satisfied_signals);
// Now that we have notified the waiting Dart program, remove this handle
// from the wait many set until we are requested to add it again.
to_remove.push_back(i);
}
}
// Remove any handles we are finished with.
const intptr_t num_to_remove = to_remove.size();
for (intptr_t i = 0; i < num_to_remove; i++) {
RemoveHandleAtIndex(to_remove[i]);
}
// Now check for control messages.
{
MojoHandleSignals signals = wait_many_signals_[CONTROL_HANDLE_INDEX];
MojoHandleSignals satisfied_signals =
wait_many_signals_state_[CONTROL_HANDLE_INDEX].satisfied_signals;
satisfied_signals &= signals;
if (satisfied_signals != 0) {
// We have a control message.
ProcessControlMessage();
}
}
}
void HandleWatcherThreadState::Run() {
while (shutdown_port_ == ILLEGAL_PORT) {
// Process timers.
ProcessTimers();
// Wait for the next timer or an event on a handle.
uint32_t result_index = -1;
uint32_t num_handles = wait_many_handles_.size();
MOJO_CHECK(wait_many_signals_.size() == num_handles);
MojoResult result = MojoWaitMany(wait_many_handles_.data(),
wait_many_signals_.data(),
num_handles,
WaitDeadline(),
&result_index,
wait_many_signals_state_.data());
if (result == MOJO_RESULT_DEADLINE_EXCEEDED) {
// Timers are ready.
continue;
}
// Process wait results.
ProcessWaitManyResults(result, result_index);
}
// Close our end of the message pipe.
MojoClose(control_pipe_consumer_handle_);
// Notify that we've shutdown.
PostNull(shutdown_port_);
}
// Create a message pipe for communication and spawns a handle watcher thread.
MojoHandle HandleWatcher::Start() {
MojoCreateMessagePipeOptions options;
options.struct_size = sizeof(MojoCreateMessagePipeOptions);
options.flags = MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
MojoHandle control_pipe_consumer_handle = MOJO_HANDLE_INVALID;
MojoHandle control_pipe_producer_handle = MOJO_HANDLE_INVALID;
MojoResult res = MojoCreateMessagePipe(&options,
&control_pipe_consumer_handle,
&control_pipe_producer_handle);
if (res != MOJO_RESULT_OK) {
return MOJO_HANDLE_INVALID;
}
// Spawn thread and pass both ends of the pipe to it.
std::thread thread = std::thread(ThreadMain,
control_pipe_consumer_handle);
// Detach the thread. Thread can be made to exit by sending a shutdown
// command.
thread.detach();
// Return producer end of pipe to caller.
return control_pipe_producer_handle;
}
void HandleWatcher::ThreadMain(MojoHandle control_pipe_consumer_handle) {
HandleWatcherThreadState state(control_pipe_consumer_handle);
// Run the main loop. When this returns the handle watcher has exited.
state.Run();
}
MojoResult HandleWatcher::SendCommand(MojoHandle control_pipe_producer_handle,
const HandleWatcherCommand& command) {
return MojoWriteMessage(control_pipe_producer_handle,
reinterpret_cast<const void*>(&command),
sizeof(command),
nullptr,
0,
0);
}
} // namespace dart
} // namespace mojo