Make mojo::embedder::ConnectToSlave() a bit more "correct".
Now it actually establishes a |Channel| and an initial message pipe.
(The implementation is in mojo::system::IPCSupport.) The old
IPCSupport::ConnectToSlave() was renamed ConnectToSlaveInternal() and
made private.
ConnectToMaster() needs to have the "same" thing done to it, at which
point I'll be able to test the two together.
(Also to do: Make the management of the channels created by
|ConnectToSlave()| more sane.)
R=yzshen@chromium.org
Review URL: https://codereview.chromium.org/1166303002.
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index 4c7aa25..374fc83 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -171,19 +171,33 @@
DCHECK(ok);
}
-void ConnectToSlave(SlaveInfo slave_info,
- ScopedPlatformHandle platform_handle,
- ScopedPlatformHandle* platform_connection_handle,
- std::string* platform_connection_id) {
- DCHECK(platform_connection_handle);
+ScopedMessagePipeHandle ConnectToSlave(
+ SlaveInfo slave_info,
+ ScopedPlatformHandle platform_handle,
+ const DidConnectToSlaveCallback& callback,
+ scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+ std::string* platform_connection_id,
+ ChannelInfo** channel_info) {
DCHECK(platform_connection_id);
+ DCHECK(channel_info);
DCHECK(internal::g_ipc_support);
system::ConnectionIdentifier connection_id =
internal::g_ipc_support->GenerateConnectionIdentifier();
*platform_connection_id = connection_id.ToString();
- *platform_connection_handle = internal::g_ipc_support->ConnectToSlave(
- connection_id, slave_info, platform_handle.Pass());
+ system::ChannelId channel_id = system::kInvalidChannelId;
+ scoped_refptr<system::MessagePipeDispatcher> dispatcher =
+ internal::g_ipc_support->ConnectToSlave(
+ connection_id, slave_info, platform_handle.Pass(), callback,
+ callback_thread_task_runner.Pass(), &channel_id);
+ *channel_info = new ChannelInfo(channel_id);
+
+ ScopedMessagePipeHandle rv(
+ MessagePipeHandle(internal::g_core->AddDispatcher(dispatcher)));
+ CHECK(rv.is_valid());
+ // TODO(vtl): The |.Pass()| below is only needed due to an MSVS bug; remove it
+ // once that's fixed.
+ return rv.Pass();
}
void ConnectToMaster(const std::string& platform_connection_id,
diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h
index f07e4c9..dfd318a 100644
--- a/mojo/edk/embedder/embedder.h
+++ b/mojo/edk/embedder/embedder.h
@@ -117,26 +117,27 @@
// This should typically be called *before* the slave process is even created.
// It requires an OS "pipe" to be established between the master and slave
// processes, with |platform_handle| being a handle to the end that remains on
-// the master. This will create a second OS "pipe" (returned in
-// |*platform_connection_handle|), and an ID string (returned in
-// |*platform_connection_id|) that must be passed to the slave, e.g., on the
-// command line.
+// the master.
//
-// The slave should call |InitIPCSupport()| with |ProcessType::SLAVE| and the
-// handle to the other end of the first "pipe" above. Then it should call
-// |ConnectToMaster()| with the ID string |*platform_connection_id|.
+// This will establish a channel and an initial message pipe (to which it
+// returns a handle), an ID string (returned in |*platform_connection_id|) that
+// must be passed to the slave (e.g., on the command line), and a
+// |ChannelInfo*| (in |*channel_info|) which should eventually be given to
+// |DestroyChannel()|/|DestroyChannelOnIOThread()|, but only after |callback|
+// has been run.
//
-// |slave_info| is caller-dependent slave information, which should typically
-// remain alive until the master process delegate's |OnSlaveDisconnect()| is
-// called. (It may, however, be null.)
+// |callback| will be run either using |callback_thread_task_runner| (if
+// non-null) or on the I/O thread, once the |ChannelInfo*| is valid.
//
-// TODO(vtl): This is not the right API. It should really establish a channel
-// and provide an initial message pipe.
-MOJO_SYSTEM_IMPL_EXPORT void ConnectToSlave(
- SlaveInfo slave_info,
- ScopedPlatformHandle platform_handle,
- ScopedPlatformHandle* platform_connection_handle,
- std::string* platform_connection_id);
+// TODO(vtl): The API is a little crazy with respect to the |ChannelInfo*|.
+using DidConnectToSlaveCallback = base::Closure;
+MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
+ConnectToSlave(SlaveInfo slave_info,
+ ScopedPlatformHandle platform_handle,
+ const DidConnectToSlaveCallback& callback,
+ scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+ std::string* platform_connection_id,
+ ChannelInfo** channel_info);
// Called in a slave process to connect it to the IPC system. (This should only
// be called in a process initialized (using |InitIPCSupport()|) with process
diff --git a/mojo/edk/system/ipc_support.cc b/mojo/edk/system/ipc_support.cc
index f677bed..ee0b2ac 100644
--- a/mojo/edk/system/ipc_support.cc
+++ b/mojo/edk/system/ipc_support.cc
@@ -9,6 +9,7 @@
#include "mojo/edk/embedder/slave_process_delegate.h"
#include "mojo/edk/system/channel_manager.h"
#include "mojo/edk/system/master_connection_manager.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
#include "mojo/edk/system/slave_connection_manager.h"
namespace mojo {
@@ -85,24 +86,25 @@
return connection_manager()->GenerateConnectionIdentifier();
}
-embedder::ScopedPlatformHandle IPCSupport::ConnectToSlave(
+scoped_refptr<system::MessagePipeDispatcher> IPCSupport::ConnectToSlave(
const ConnectionIdentifier& connection_id,
embedder::SlaveInfo slave_info,
- embedder::ScopedPlatformHandle platform_handle) {
- DCHECK_EQ(process_type_, embedder::ProcessType::MASTER);
+ embedder::ScopedPlatformHandle platform_handle,
+ const base::Closure& callback,
+ scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+ ChannelId* channel_id) {
+ DCHECK(channel_id);
- system::ProcessIdentifier slave_id =
- static_cast<system::MasterConnectionManager*>(connection_manager())
- ->AddSlaveAndBootstrap(slave_info, platform_handle.Pass(),
- connection_id);
+ // TODO(vtl): Use std::is_same instead when we are allowed to (C++11 library).
+ static_assert(sizeof(ChannelId) == sizeof(ProcessIdentifier),
+ "ChannelId and ProcessIdentifier types don't match");
- system::ProcessIdentifier peer_id = system::kInvalidProcessIdentifier;
- embedder::ScopedPlatformHandle platform_connection_handle;
- CHECK(connection_manager()->Connect(connection_id, &peer_id,
- &platform_connection_handle));
- DCHECK_EQ(peer_id, slave_id);
- DCHECK(platform_connection_handle.is_valid());
- return platform_connection_handle;
+ embedder::ScopedPlatformHandle platform_connection_handle =
+ ConnectToSlaveInternal(connection_id, slave_info, platform_handle.Pass(),
+ channel_id);
+ return channel_manager()->CreateChannel(
+ *channel_id, platform_connection_handle.Pass(), callback,
+ callback_thread_task_runner);
}
embedder::ScopedPlatformHandle IPCSupport::ConnectToMaster(
@@ -118,5 +120,27 @@
return platform_connection_handle;
}
+embedder::ScopedPlatformHandle IPCSupport::ConnectToSlaveInternal(
+ const ConnectionIdentifier& connection_id,
+ embedder::SlaveInfo slave_info,
+ embedder::ScopedPlatformHandle platform_handle,
+ ProcessIdentifier* slave_process_identifier) {
+ DCHECK(slave_process_identifier);
+ DCHECK_EQ(process_type_, embedder::ProcessType::MASTER);
+
+ *slave_process_identifier =
+ static_cast<system::MasterConnectionManager*>(connection_manager())
+ ->AddSlaveAndBootstrap(slave_info, platform_handle.Pass(),
+ connection_id);
+
+ system::ProcessIdentifier peer_id = system::kInvalidProcessIdentifier;
+ embedder::ScopedPlatformHandle platform_connection_handle;
+ CHECK(connection_manager()->Connect(connection_id, &peer_id,
+ &platform_connection_handle));
+ DCHECK_EQ(peer_id, *slave_process_identifier);
+ DCHECK(platform_connection_handle.is_valid());
+ return platform_connection_handle;
+}
+
} // namespace system
} // namespace mojo
diff --git a/mojo/edk/system/ipc_support.h b/mojo/edk/system/ipc_support.h
index e3a7bdb..8d58f9c 100644
--- a/mojo/edk/system/ipc_support.h
+++ b/mojo/edk/system/ipc_support.h
@@ -5,6 +5,8 @@
#ifndef MOJO_EDK_SYSTEM_IPC_SUPPORT_H_
#define MOJO_EDK_SYSTEM_IPC_SUPPORT_H_
+#include "base/callback_forward.h"
+#include "base/gtest_prod_util.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
@@ -12,7 +14,9 @@
#include "mojo/edk/embedder/process_type.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/embedder/slave_info.h"
+#include "mojo/edk/system/channel_id.h"
#include "mojo/edk/system/connection_identifier.h"
+#include "mojo/edk/system/process_identifier.h"
#include "mojo/edk/system/system_impl_export.h"
namespace mojo {
@@ -26,6 +30,9 @@
class ChannelManager;
class ConnectionManager;
+class MessagePipeDispatcher;
+
+FORWARD_DECLARE_TEST(IPCSupportTest, MultiprocessMasterSlaveInternal);
// |IPCSupport| encapsulates all the objects that are needed to support IPC for
// a single "process" (whether that be a master or a slave).
@@ -74,17 +81,28 @@
// Called in the master process to connect a slave process to the IPC system.
//
// |connection_id| should be a unique connection identifier, which will also
- // be given to the slave (in |ConnectToMaster()|, below). |platform_handle|
- // should be the master's handle to an OS "pipe" between master and slave.
- // This will create a second OS "pipe" between the master and slave and return
- // the master's handle.
+ // be given to the slave (in |ConnectToMaster()|, below). |slave_info| is
+ // context for the caller (it is treated as an opaque value by this class).
+ // |platform_handle| should be the master's handle to an OS "pipe" between
+ // master and slave. This will then bootstrap a |Channel| between master and
+ // slave together with an initial message pipe (returning a dispatcher to the
+ // master's side).
//
- // TODO(vtl): This isn't the right API. It should set up a channel and an
- // initial message pipe.
- embedder::ScopedPlatformHandle ConnectToSlave(
+ // |callback| will be run after the |Channel| is created, either using
+ // |callback_thread_task_runner| (if it is non-null) or on the I/O thread.
+ // |*channel_id| will be set to the ID for the channel (immediately); the
+ // channel may be destroyed using this ID, but only after the callback has
+ // been run.
+ //
+ // TODO(vtl): Add some more channel management functionality to this class.
+ // Maybe make this callback interface more sane.
+ scoped_refptr<system::MessagePipeDispatcher> ConnectToSlave(
const ConnectionIdentifier& connection_id,
embedder::SlaveInfo slave_info,
- embedder::ScopedPlatformHandle platform_handle);
+ embedder::ScopedPlatformHandle platform_handle,
+ const base::Closure& callback,
+ scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+ ChannelId* channel_id);
// Called in a slave process to connect it to the master process and thus the
// IPC system. See |ConnectToSlave()|, above.
@@ -109,6 +127,21 @@
ChannelManager* channel_manager() const { return channel_manager_.get(); }
private:
+ // This tests |ConnectToSlaveInternal()|.
+ // TODO(vtl): (... and |ConnectToMasterInternal()|.)
+ FRIEND_TEST_ALL_PREFIXES(IPCSupportTest, MultiprocessMasterSlaveInternal);
+
+ // Helper for |ConnectToSlave()|. Connects (using the connection manager) to
+ // the slave using |platform_handle| (a handle to an OS "pipe" between master
+ // and slave) and creates a second OS "pipe" between the master and slave
+ // (returning the master's handle). |*slave_process_identifier| will be set to
+ // the process identifier assigned to the slave.
+ embedder::ScopedPlatformHandle ConnectToSlaveInternal(
+ const ConnectionIdentifier& connection_id,
+ embedder::SlaveInfo slave_info,
+ embedder::ScopedPlatformHandle platform_handle,
+ ProcessIdentifier* slave_process_identifier);
+
ConnectionManager* connection_manager() const {
return connection_manager_.get();
}
diff --git a/mojo/edk/system/ipc_support_unittest.cc b/mojo/edk/system/ipc_support_unittest.cc
index f66f312..5bc03aa 100644
--- a/mojo/edk/system/ipc_support_unittest.cc
+++ b/mojo/edk/system/ipc_support_unittest.cc
@@ -16,6 +16,7 @@
#include "mojo/edk/embedder/simple_platform_support.h"
#include "mojo/edk/embedder/slave_process_delegate.h"
#include "mojo/edk/system/connection_identifier.h"
+#include "mojo/edk/system/process_identifier.h"
#include "mojo/edk/test/multiprocess_test_helper.h"
#include "mojo/edk/test/test_utils.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -63,14 +64,19 @@
DISALLOW_COPY_AND_ASSIGN(TestSlaveProcessDelegate);
};
+} // namespace
+
+// Note: This test isn't in an anonymous namespace, since it needs to be
+// friended by |IPCSupport|.
#if defined(OS_ANDROID)
// Android multi-process tests are not executing the new process. This is flaky.
// TODO(vtl): I'm guessing this is true of this test too?
-#define MAYBE_MultiprocessMasterSlave DISABLED_MultiprocessMasterSlave
+#define MAYBE_MultiprocessMasterSlaveInternal \
+ DISABLED_MultiprocessMasterSlaveInternal
#else
-#define MAYBE_MultiprocessMasterSlave MultiprocessMasterSlave
+#define MAYBE_MultiprocessMasterSlaveInternal MultiprocessMasterSlaveInternal
#endif // defined(OS_ANDROID)
-TEST(IPCSupportTest, MAYBE_MultiprocessMasterSlave) {
+TEST(IPCSupportTest, MAYBE_MultiprocessMasterSlaveInternal) {
embedder::SimplePlatformSupport platform_support;
base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
TestMasterProcessDelegate master_process_delegate;
@@ -83,14 +89,18 @@
ConnectionIdentifier connection_id =
ipc_support.GenerateConnectionIdentifier();
mojo::test::MultiprocessTestHelper multiprocess_test_helper;
+ ProcessIdentifier slave_id = kInvalidProcessIdentifier;
embedder::ScopedPlatformHandle second_platform_handle =
- ipc_support.ConnectToSlave(
+ ipc_support.ConnectToSlaveInternal(
connection_id, nullptr,
- multiprocess_test_helper.server_platform_handle.Pass());
+ multiprocess_test_helper.server_platform_handle.Pass(), &slave_id);
ASSERT_TRUE(second_platform_handle.is_valid());
+ EXPECT_NE(slave_id, kInvalidProcessIdentifier);
+ EXPECT_NE(slave_id, kMasterProcessIdentifier);
multiprocess_test_helper.StartChildWithExtraSwitch(
- "MultiprocessMasterSlave", kConnectionIdFlag, connection_id.ToString());
+ "MultiprocessMasterSlaveInternal", kConnectionIdFlag,
+ connection_id.ToString());
// We write a '?'. The slave should write a '!' in response.
size_t n = 0;
@@ -113,7 +123,7 @@
base::Unretained(&ipc_support)));
}
-MOJO_MULTIPROCESS_TEST_CHILD_TEST(MultiprocessMasterSlave) {
+MOJO_MULTIPROCESS_TEST_CHILD_TEST(MultiprocessMasterSlaveInternal) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
ASSERT_TRUE(client_platform_handle.is_valid());
@@ -160,6 +170,5 @@
// TODO(vtl): Also test the case of the master "dying" before the slave. (The
// slave should get OnMasterDisconnect(), which we currently don't test.)
-} // namespace
} // namespace system
} // namespace mojo
diff --git a/mojo/edk/system/master_connection_manager.h b/mojo/edk/system/master_connection_manager.h
index faf2081..49fd17b 100644
--- a/mojo/edk/system/master_connection_manager.h
+++ b/mojo/edk/system/master_connection_manager.h
@@ -30,9 +30,6 @@
namespace system {
-// The master process will always have this "process identifier".
-const ProcessIdentifier kMasterProcessIdentifier = 1;
-
// The |ConnectionManager| implementation for the master process.
//
// This class is thread-safe (except that no public methods may be called from
diff --git a/mojo/edk/system/process_identifier.h b/mojo/edk/system/process_identifier.h
index da611d8..73ed335 100644
--- a/mojo/edk/system/process_identifier.h
+++ b/mojo/edk/system/process_identifier.h
@@ -12,8 +12,13 @@
// Identifiers for processes (note that these are not OS process IDs):
using ProcessIdentifier = uint64_t;
+
+// Zero will never be a process identifier for any process.
const ProcessIdentifier kInvalidProcessIdentifier = 0;
+// The master process will always have this process identifier.
+const ProcessIdentifier kMasterProcessIdentifier = 1;
+
} // namespace system
} // namespace mojo
diff --git a/shell/child_process_host.cc b/shell/child_process_host.cc
index 863c7e8..ec51ec5 100644
--- a/shell/child_process_host.cc
+++ b/shell/child_process_host.cc
@@ -49,15 +49,15 @@
launch_data->child_path = context_->mojo_shell_child_path();
// TODO(vtl): Add something for |slave_info|.
- mojo::embedder::ScopedPlatformHandle platform_handle_for_channel;
- mojo::embedder::ConnectToSlave(
+ // TODO(vtl): The "unretained this" is wrong (see also below).
+ mojo::ScopedMessagePipeHandle handle(mojo::embedder::ConnectToSlave(
nullptr, launch_data->platform_channel_pair.PassServerHandle(),
- &platform_handle_for_channel, &launch_data->child_connection_id);
-
- mojo::ScopedMessagePipeHandle handle(mojo::embedder::CreateChannel(
- platform_handle_for_channel.Pass(),
- base::Bind(&ChildProcessHost::DidCreateChannel, base::Unretained(this)),
- base::MessageLoop::current()->message_loop_proxy()));
+ base::Bind(&ChildProcessHost::DidConnectToSlave, base::Unretained(this)),
+ base::MessageLoop::current()->message_loop_proxy(),
+ &launch_data->child_connection_id, &channel_info_));
+ // TODO(vtl): We should destroy the channel on destruction (using
+ // |channel_info_|, but only after the callback has been called.
+ CHECK(channel_info_);
controller_.Bind(mojo::InterfacePtrInfo<ChildController>(handle.Pass(), 0u));
controller_.set_error_handler(this);
@@ -109,13 +109,9 @@
child_process_ = child_process.Pass();
}
-// Callback for |mojo::embedder::CreateChannel()|.
-void ChildProcessHost::DidCreateChannel(
- mojo::embedder::ChannelInfo* channel_info) {
- DVLOG(2) << "ChildProcessHost::DidCreateChannel()";
-
- CHECK(channel_info);
- channel_info_ = channel_info;
+// Callback for |mojo::embedder::ConnectToSlave()|.
+void ChildProcessHost::DidConnectToSlave() {
+ DVLOG(2) << "ChildProcessHost::DidConnectToSlave()";
}
base::Process ChildProcessHost::DoLaunch(scoped_ptr<LaunchData> launch_data) {
diff --git a/shell/child_process_host.h b/shell/child_process_host.h
index 79fb292..67f243d 100644
--- a/shell/child_process_host.h
+++ b/shell/child_process_host.h
@@ -65,8 +65,8 @@
private:
struct LaunchData;
- // Callback for |mojo::embedder::CreateChannel()|.
- void DidCreateChannel(mojo::embedder::ChannelInfo* channel_info);
+ // Callback for |mojo::embedder::ConnectToSlave()|.
+ void DidConnectToSlave();
// Note: This is probably executed on a different thread (namely, using the
// blocking pool).