Make mojo::embedder::ConnectToSlave() a bit more "correct".

Now it actually establishes a |Channel| and an initial message pipe.
(The implementation is in mojo::system::IPCSupport.) The old
IPCSupport::ConnectToSlave() was renamed ConnectToSlaveInternal() and
made private.

ConnectToMaster() needs to have the "same" thing done to it, at which
point I'll be able to test the two together.

(Also to do: Make the management of the channels created by
|ConnectToSlave()| more sane.)

R=yzshen@chromium.org

Review URL: https://codereview.chromium.org/1166303002.
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index 4c7aa25..374fc83 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -171,19 +171,33 @@
   DCHECK(ok);
 }
 
-void ConnectToSlave(SlaveInfo slave_info,
-                    ScopedPlatformHandle platform_handle,
-                    ScopedPlatformHandle* platform_connection_handle,
-                    std::string* platform_connection_id) {
-  DCHECK(platform_connection_handle);
+ScopedMessagePipeHandle ConnectToSlave(
+    SlaveInfo slave_info,
+    ScopedPlatformHandle platform_handle,
+    const DidConnectToSlaveCallback& callback,
+    scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+    std::string* platform_connection_id,
+    ChannelInfo** channel_info) {
   DCHECK(platform_connection_id);
+  DCHECK(channel_info);
   DCHECK(internal::g_ipc_support);
 
   system::ConnectionIdentifier connection_id =
       internal::g_ipc_support->GenerateConnectionIdentifier();
   *platform_connection_id = connection_id.ToString();
-  *platform_connection_handle = internal::g_ipc_support->ConnectToSlave(
-      connection_id, slave_info, platform_handle.Pass());
+  system::ChannelId channel_id = system::kInvalidChannelId;
+  scoped_refptr<system::MessagePipeDispatcher> dispatcher =
+      internal::g_ipc_support->ConnectToSlave(
+          connection_id, slave_info, platform_handle.Pass(), callback,
+          callback_thread_task_runner.Pass(), &channel_id);
+  *channel_info = new ChannelInfo(channel_id);
+
+  ScopedMessagePipeHandle rv(
+      MessagePipeHandle(internal::g_core->AddDispatcher(dispatcher)));
+  CHECK(rv.is_valid());
+  // TODO(vtl): The |.Pass()| below is only needed due to an MSVS bug; remove it
+  // once that's fixed.
+  return rv.Pass();
 }
 
 void ConnectToMaster(const std::string& platform_connection_id,
diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h
index f07e4c9..dfd318a 100644
--- a/mojo/edk/embedder/embedder.h
+++ b/mojo/edk/embedder/embedder.h
@@ -117,26 +117,27 @@
 // This should typically be called *before* the slave process is even created.
 // It requires an OS "pipe" to be established between the master and slave
 // processes, with |platform_handle| being a handle to the end that remains on
-// the master. This will create a second OS "pipe" (returned in
-// |*platform_connection_handle|), and an ID string (returned in
-// |*platform_connection_id|) that must be passed to the slave, e.g., on the
-// command line.
+// the master.
 //
-// The slave should call |InitIPCSupport()| with |ProcessType::SLAVE| and the
-// handle to the other end of the first "pipe" above. Then it should call
-// |ConnectToMaster()| with the ID string |*platform_connection_id|.
+// This will establish a channel and an initial message pipe (to which it
+// returns a handle), an ID string (returned in |*platform_connection_id|) that
+// must be passed to the slave (e.g., on the command line), and a
+// |ChannelInfo*| (in |*channel_info|) which should eventually be given to
+// |DestroyChannel()|/|DestroyChannelOnIOThread()|, but only after |callback|
+// has been run.
 //
-// |slave_info| is caller-dependent slave information, which should typically
-// remain alive until the master process delegate's |OnSlaveDisconnect()| is
-// called. (It may, however, be null.)
+// |callback| will be run either using |callback_thread_task_runner| (if
+// non-null) or on the I/O thread, once the |ChannelInfo*| is valid.
 //
-// TODO(vtl): This is not the right API. It should really establish a channel
-// and provide an initial message pipe.
-MOJO_SYSTEM_IMPL_EXPORT void ConnectToSlave(
-    SlaveInfo slave_info,
-    ScopedPlatformHandle platform_handle,
-    ScopedPlatformHandle* platform_connection_handle,
-    std::string* platform_connection_id);
+// TODO(vtl): The API is a little crazy with respect to the |ChannelInfo*|.
+using DidConnectToSlaveCallback = base::Closure;
+MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
+ConnectToSlave(SlaveInfo slave_info,
+               ScopedPlatformHandle platform_handle,
+               const DidConnectToSlaveCallback& callback,
+               scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+               std::string* platform_connection_id,
+               ChannelInfo** channel_info);
 
 // Called in a slave process to connect it to the IPC system. (This should only
 // be called in a process initialized (using |InitIPCSupport()|) with process
diff --git a/mojo/edk/system/ipc_support.cc b/mojo/edk/system/ipc_support.cc
index f677bed..ee0b2ac 100644
--- a/mojo/edk/system/ipc_support.cc
+++ b/mojo/edk/system/ipc_support.cc
@@ -9,6 +9,7 @@
 #include "mojo/edk/embedder/slave_process_delegate.h"
 #include "mojo/edk/system/channel_manager.h"
 #include "mojo/edk/system/master_connection_manager.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
 #include "mojo/edk/system/slave_connection_manager.h"
 
 namespace mojo {
@@ -85,24 +86,25 @@
   return connection_manager()->GenerateConnectionIdentifier();
 }
 
-embedder::ScopedPlatformHandle IPCSupport::ConnectToSlave(
+scoped_refptr<system::MessagePipeDispatcher> IPCSupport::ConnectToSlave(
     const ConnectionIdentifier& connection_id,
     embedder::SlaveInfo slave_info,
-    embedder::ScopedPlatformHandle platform_handle) {
-  DCHECK_EQ(process_type_, embedder::ProcessType::MASTER);
+    embedder::ScopedPlatformHandle platform_handle,
+    const base::Closure& callback,
+    scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+    ChannelId* channel_id) {
+  DCHECK(channel_id);
 
-  system::ProcessIdentifier slave_id =
-      static_cast<system::MasterConnectionManager*>(connection_manager())
-          ->AddSlaveAndBootstrap(slave_info, platform_handle.Pass(),
-                                 connection_id);
+  // TODO(vtl): Use std::is_same instead when we are allowed to (C++11 library).
+  static_assert(sizeof(ChannelId) == sizeof(ProcessIdentifier),
+                "ChannelId and ProcessIdentifier types don't match");
 
-  system::ProcessIdentifier peer_id = system::kInvalidProcessIdentifier;
-  embedder::ScopedPlatformHandle platform_connection_handle;
-  CHECK(connection_manager()->Connect(connection_id, &peer_id,
-                                      &platform_connection_handle));
-  DCHECK_EQ(peer_id, slave_id);
-  DCHECK(platform_connection_handle.is_valid());
-  return platform_connection_handle;
+  embedder::ScopedPlatformHandle platform_connection_handle =
+      ConnectToSlaveInternal(connection_id, slave_info, platform_handle.Pass(),
+                             channel_id);
+  return channel_manager()->CreateChannel(
+      *channel_id, platform_connection_handle.Pass(), callback,
+      callback_thread_task_runner);
 }
 
 embedder::ScopedPlatformHandle IPCSupport::ConnectToMaster(
@@ -118,5 +120,27 @@
   return platform_connection_handle;
 }
 
+embedder::ScopedPlatformHandle IPCSupport::ConnectToSlaveInternal(
+    const ConnectionIdentifier& connection_id,
+    embedder::SlaveInfo slave_info,
+    embedder::ScopedPlatformHandle platform_handle,
+    ProcessIdentifier* slave_process_identifier) {
+  DCHECK(slave_process_identifier);
+  DCHECK_EQ(process_type_, embedder::ProcessType::MASTER);
+
+  *slave_process_identifier =
+      static_cast<system::MasterConnectionManager*>(connection_manager())
+          ->AddSlaveAndBootstrap(slave_info, platform_handle.Pass(),
+                                 connection_id);
+
+  system::ProcessIdentifier peer_id = system::kInvalidProcessIdentifier;
+  embedder::ScopedPlatformHandle platform_connection_handle;
+  CHECK(connection_manager()->Connect(connection_id, &peer_id,
+                                      &platform_connection_handle));
+  DCHECK_EQ(peer_id, *slave_process_identifier);
+  DCHECK(platform_connection_handle.is_valid());
+  return platform_connection_handle;
+}
+
 }  // namespace system
 }  // namespace mojo
diff --git a/mojo/edk/system/ipc_support.h b/mojo/edk/system/ipc_support.h
index e3a7bdb..8d58f9c 100644
--- a/mojo/edk/system/ipc_support.h
+++ b/mojo/edk/system/ipc_support.h
@@ -5,6 +5,8 @@
 #ifndef MOJO_EDK_SYSTEM_IPC_SUPPORT_H_
 #define MOJO_EDK_SYSTEM_IPC_SUPPORT_H_
 
+#include "base/callback_forward.h"
+#include "base/gtest_prod_util.h"
 #include "base/macros.h"
 #include "base/memory/ref_counted.h"
 #include "base/memory/scoped_ptr.h"
@@ -12,7 +14,9 @@
 #include "mojo/edk/embedder/process_type.h"
 #include "mojo/edk/embedder/scoped_platform_handle.h"
 #include "mojo/edk/embedder/slave_info.h"
+#include "mojo/edk/system/channel_id.h"
 #include "mojo/edk/system/connection_identifier.h"
+#include "mojo/edk/system/process_identifier.h"
 #include "mojo/edk/system/system_impl_export.h"
 
 namespace mojo {
@@ -26,6 +30,9 @@
 
 class ChannelManager;
 class ConnectionManager;
+class MessagePipeDispatcher;
+
+FORWARD_DECLARE_TEST(IPCSupportTest, MultiprocessMasterSlaveInternal);
 
 // |IPCSupport| encapsulates all the objects that are needed to support IPC for
 // a single "process" (whether that be a master or a slave).
@@ -74,17 +81,28 @@
   // Called in the master process to connect a slave process to the IPC system.
   //
   // |connection_id| should be a unique connection identifier, which will also
-  // be given to the slave (in |ConnectToMaster()|, below). |platform_handle|
-  // should be the master's handle to an OS "pipe" between master and slave.
-  // This will create a second OS "pipe" between the master and slave and return
-  // the master's handle.
+  // be given to the slave (in |ConnectToMaster()|, below). |slave_info| is
+  // context for the caller (it is treated as an opaque value by this class).
+  // |platform_handle| should be the master's handle to an OS "pipe" between
+  // master and slave. This will then bootstrap a |Channel| between master and
+  // slave together with an initial message pipe (returning a dispatcher to the
+  // master's side).
   //
-  // TODO(vtl): This isn't the right API. It should set up a channel and an
-  // initial message pipe.
-  embedder::ScopedPlatformHandle ConnectToSlave(
+  // |callback| will be run after the |Channel| is created, either using
+  // |callback_thread_task_runner| (if it is non-null) or on the I/O thread.
+  // |*channel_id| will be set to the ID for the channel (immediately); the
+  // channel may be destroyed using this ID, but only after the callback has
+  // been run.
+  //
+  // TODO(vtl): Add some more channel management functionality to this class.
+  // Maybe make this callback interface more sane.
+  scoped_refptr<system::MessagePipeDispatcher> ConnectToSlave(
       const ConnectionIdentifier& connection_id,
       embedder::SlaveInfo slave_info,
-      embedder::ScopedPlatformHandle platform_handle);
+      embedder::ScopedPlatformHandle platform_handle,
+      const base::Closure& callback,
+      scoped_refptr<base::TaskRunner> callback_thread_task_runner,
+      ChannelId* channel_id);
 
   // Called in a slave process to connect it to the master process and thus the
   // IPC system. See |ConnectToSlave()|, above.
@@ -109,6 +127,21 @@
   ChannelManager* channel_manager() const { return channel_manager_.get(); }
 
  private:
+  // This tests |ConnectToSlaveInternal()|.
+  // TODO(vtl): (... and |ConnectToMasterInternal()|.)
+  FRIEND_TEST_ALL_PREFIXES(IPCSupportTest, MultiprocessMasterSlaveInternal);
+
+  // Helper for |ConnectToSlave()|. Connects (using the connection manager) to
+  // the slave using |platform_handle| (a handle to an OS "pipe" between master
+  // and slave) and creates a second OS "pipe" between the master and slave
+  // (returning the master's handle). |*slave_process_identifier| will be set to
+  // the process identifier assigned to the slave.
+  embedder::ScopedPlatformHandle ConnectToSlaveInternal(
+      const ConnectionIdentifier& connection_id,
+      embedder::SlaveInfo slave_info,
+      embedder::ScopedPlatformHandle platform_handle,
+      ProcessIdentifier* slave_process_identifier);
+
   ConnectionManager* connection_manager() const {
     return connection_manager_.get();
   }
diff --git a/mojo/edk/system/ipc_support_unittest.cc b/mojo/edk/system/ipc_support_unittest.cc
index f66f312..5bc03aa 100644
--- a/mojo/edk/system/ipc_support_unittest.cc
+++ b/mojo/edk/system/ipc_support_unittest.cc
@@ -16,6 +16,7 @@
 #include "mojo/edk/embedder/simple_platform_support.h"
 #include "mojo/edk/embedder/slave_process_delegate.h"
 #include "mojo/edk/system/connection_identifier.h"
+#include "mojo/edk/system/process_identifier.h"
 #include "mojo/edk/test/multiprocess_test_helper.h"
 #include "mojo/edk/test/test_utils.h"
 #include "testing/gtest/include/gtest/gtest.h"
@@ -63,14 +64,19 @@
   DISALLOW_COPY_AND_ASSIGN(TestSlaveProcessDelegate);
 };
 
+}  // namespace
+
+// Note: This test isn't in an anonymous namespace, since it needs to be
+// friended by |IPCSupport|.
 #if defined(OS_ANDROID)
 // Android multi-process tests are not executing the new process. This is flaky.
 // TODO(vtl): I'm guessing this is true of this test too?
-#define MAYBE_MultiprocessMasterSlave DISABLED_MultiprocessMasterSlave
+#define MAYBE_MultiprocessMasterSlaveInternal \
+  DISABLED_MultiprocessMasterSlaveInternal
 #else
-#define MAYBE_MultiprocessMasterSlave MultiprocessMasterSlave
+#define MAYBE_MultiprocessMasterSlaveInternal MultiprocessMasterSlaveInternal
 #endif  // defined(OS_ANDROID)
-TEST(IPCSupportTest, MAYBE_MultiprocessMasterSlave) {
+TEST(IPCSupportTest, MAYBE_MultiprocessMasterSlaveInternal) {
   embedder::SimplePlatformSupport platform_support;
   base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
   TestMasterProcessDelegate master_process_delegate;
@@ -83,14 +89,18 @@
   ConnectionIdentifier connection_id =
       ipc_support.GenerateConnectionIdentifier();
   mojo::test::MultiprocessTestHelper multiprocess_test_helper;
+  ProcessIdentifier slave_id = kInvalidProcessIdentifier;
   embedder::ScopedPlatformHandle second_platform_handle =
-      ipc_support.ConnectToSlave(
+      ipc_support.ConnectToSlaveInternal(
           connection_id, nullptr,
-          multiprocess_test_helper.server_platform_handle.Pass());
+          multiprocess_test_helper.server_platform_handle.Pass(), &slave_id);
   ASSERT_TRUE(second_platform_handle.is_valid());
+  EXPECT_NE(slave_id, kInvalidProcessIdentifier);
+  EXPECT_NE(slave_id, kMasterProcessIdentifier);
 
   multiprocess_test_helper.StartChildWithExtraSwitch(
-      "MultiprocessMasterSlave", kConnectionIdFlag, connection_id.ToString());
+      "MultiprocessMasterSlaveInternal", kConnectionIdFlag,
+      connection_id.ToString());
 
   // We write a '?'. The slave should write a '!' in response.
   size_t n = 0;
@@ -113,7 +123,7 @@
                                             base::Unretained(&ipc_support)));
 }
 
-MOJO_MULTIPROCESS_TEST_CHILD_TEST(MultiprocessMasterSlave) {
+MOJO_MULTIPROCESS_TEST_CHILD_TEST(MultiprocessMasterSlaveInternal) {
   embedder::ScopedPlatformHandle client_platform_handle =
       mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
   ASSERT_TRUE(client_platform_handle.is_valid());
@@ -160,6 +170,5 @@
 // TODO(vtl): Also test the case of the master "dying" before the slave. (The
 // slave should get OnMasterDisconnect(), which we currently don't test.)
 
-}  // namespace
 }  // namespace system
 }  // namespace mojo
diff --git a/mojo/edk/system/master_connection_manager.h b/mojo/edk/system/master_connection_manager.h
index faf2081..49fd17b 100644
--- a/mojo/edk/system/master_connection_manager.h
+++ b/mojo/edk/system/master_connection_manager.h
@@ -30,9 +30,6 @@
 
 namespace system {
 
-// The master process will always have this "process identifier".
-const ProcessIdentifier kMasterProcessIdentifier = 1;
-
 // The |ConnectionManager| implementation for the master process.
 //
 // This class is thread-safe (except that no public methods may be called from
diff --git a/mojo/edk/system/process_identifier.h b/mojo/edk/system/process_identifier.h
index da611d8..73ed335 100644
--- a/mojo/edk/system/process_identifier.h
+++ b/mojo/edk/system/process_identifier.h
@@ -12,8 +12,13 @@
 
 // Identifiers for processes (note that these are not OS process IDs):
 using ProcessIdentifier = uint64_t;
+
+// Zero will never be a process identifier for any process.
 const ProcessIdentifier kInvalidProcessIdentifier = 0;
 
+// The master process will always have this process identifier.
+const ProcessIdentifier kMasterProcessIdentifier = 1;
+
 }  // namespace system
 }  // namespace mojo
 
diff --git a/shell/child_process_host.cc b/shell/child_process_host.cc
index 863c7e8..ec51ec5 100644
--- a/shell/child_process_host.cc
+++ b/shell/child_process_host.cc
@@ -49,15 +49,15 @@
   launch_data->child_path = context_->mojo_shell_child_path();
 
   // TODO(vtl): Add something for |slave_info|.
-  mojo::embedder::ScopedPlatformHandle platform_handle_for_channel;
-  mojo::embedder::ConnectToSlave(
+  // TODO(vtl): The "unretained this" is wrong (see also below).
+  mojo::ScopedMessagePipeHandle handle(mojo::embedder::ConnectToSlave(
       nullptr, launch_data->platform_channel_pair.PassServerHandle(),
-      &platform_handle_for_channel, &launch_data->child_connection_id);
-
-  mojo::ScopedMessagePipeHandle handle(mojo::embedder::CreateChannel(
-      platform_handle_for_channel.Pass(),
-      base::Bind(&ChildProcessHost::DidCreateChannel, base::Unretained(this)),
-      base::MessageLoop::current()->message_loop_proxy()));
+      base::Bind(&ChildProcessHost::DidConnectToSlave, base::Unretained(this)),
+      base::MessageLoop::current()->message_loop_proxy(),
+      &launch_data->child_connection_id, &channel_info_));
+  // TODO(vtl): We should destroy the channel on destruction (using
+  // |channel_info_|, but only after the callback has been called.
+  CHECK(channel_info_);
 
   controller_.Bind(mojo::InterfacePtrInfo<ChildController>(handle.Pass(), 0u));
   controller_.set_error_handler(this);
@@ -109,13 +109,9 @@
   child_process_ = child_process.Pass();
 }
 
-// Callback for |mojo::embedder::CreateChannel()|.
-void ChildProcessHost::DidCreateChannel(
-    mojo::embedder::ChannelInfo* channel_info) {
-  DVLOG(2) << "ChildProcessHost::DidCreateChannel()";
-
-  CHECK(channel_info);
-  channel_info_ = channel_info;
+// Callback for |mojo::embedder::ConnectToSlave()|.
+void ChildProcessHost::DidConnectToSlave() {
+  DVLOG(2) << "ChildProcessHost::DidConnectToSlave()";
 }
 
 base::Process ChildProcessHost::DoLaunch(scoped_ptr<LaunchData> launch_data) {
diff --git a/shell/child_process_host.h b/shell/child_process_host.h
index 79fb292..67f243d 100644
--- a/shell/child_process_host.h
+++ b/shell/child_process_host.h
@@ -65,8 +65,8 @@
  private:
   struct LaunchData;
 
-  // Callback for |mojo::embedder::CreateChannel()|.
-  void DidCreateChannel(mojo::embedder::ChannelInfo* channel_info);
+  // Callback for |mojo::embedder::ConnectToSlave()|.
+  void DidConnectToSlave();
 
   // Note: This is probably executed on a different thread (namely, using the
   // blocking pool).