Add a ChannelEndpointClient abstraction.

This is to allow ChannelEndpoint to call things other than MessagePipe
(and make ChannelEndpoint know only about "clients" and not message
pipes). Clients can receive messages (from ChannelEndpoint, thus from
Channel), and be told that they'll no longer receive messages.

This is needed to allow Channel to handle certain "transitions" (when
passing message pipes between processes) specially, without dumping code
into MessagePipe.

R=yzshen@chromium.org

Review URL: https://codereview.chromium.org/738453003
diff --git a/mojo/edk/mojo_edk.gyp b/mojo/edk/mojo_edk.gyp
index 05bde04..0959aac 100644
--- a/mojo/edk/mojo_edk.gyp
+++ b/mojo/edk/mojo_edk.gyp
@@ -54,6 +54,7 @@
         'system/channel.h',
         'system/channel_endpoint.cc',
         'system/channel_endpoint.h',
+        'system/channel_endpoint_client.h',
         'system/channel_endpoint_id.cc',
         'system/channel_endpoint_id.h',
         'system/channel_info.cc',
diff --git a/mojo/edk/system/BUILD.gn b/mojo/edk/system/BUILD.gn
index d0bfda7..d09d3d4 100644
--- a/mojo/edk/system/BUILD.gn
+++ b/mojo/edk/system/BUILD.gn
@@ -17,6 +17,7 @@
     "channel.h",
     "channel_endpoint.cc",
     "channel_endpoint.h",
+    "channel_endpoint_client.h",
     "channel_endpoint_id.cc",
     "channel_endpoint_id.h",
     "channel_info.cc",
diff --git a/mojo/edk/system/channel.h b/mojo/edk/system/channel.h
index 3460a30..e0ecc3b 100644
--- a/mojo/edk/system/channel.h
+++ b/mojo/edk/system/channel.h
@@ -201,10 +201,10 @@
   // TODO(vtl): This is a layering violation, since |Channel| shouldn't know
   // about |MessagePipe|. However, we can't just hang on to |ChannelEndpoint|s
   // (even if they have a reference to the |MessagePipe|) since their lifetimes
-  // are tied to the "remote" side. When |ChannelEndpoint::OnDisconnect()|
-  // (eventually) results in |ChannelEndpoint::DetachFromMessagePipe()| being
-  // called. We really need to hang on to the "local" side of the message pipe,
-  // to which dispatchers will be "attached".
+  // are tied to the "remote" side. When |ChannelEndpoint::DetachFromChannel()|
+  // (eventually) results in |ChannelEndpoint::DetachFromClient()| being called.
+  // We really need to hang on to the "local" side of the message pipe, to which
+  // dispatchers will be "attached".
   IdToMessagePipeMap incoming_message_pipes_;
   // TODO(vtl): We need to keep track of remote IDs (so that we don't collide
   // if/when we wrap).
diff --git a/mojo/edk/system/channel_endpoint.cc b/mojo/edk/system/channel_endpoint.cc
index d9dbb66..e9d69fc 100644
--- a/mojo/edk/system/channel_endpoint.cc
+++ b/mojo/edk/system/channel_endpoint.cc
@@ -6,18 +6,17 @@
 
 #include "base/logging.h"
 #include "mojo/edk/system/channel.h"
-#include "mojo/edk/system/message_pipe.h"
+#include "mojo/edk/system/channel_endpoint_client.h"
 #include "mojo/edk/system/transport_data.h"
 
 namespace mojo {
 namespace system {
 
-ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe,
-                                 unsigned port,
+ChannelEndpoint::ChannelEndpoint(ChannelEndpointClient* client,
+                                 unsigned client_port,
                                  MessageInTransitQueue* message_queue)
-    : message_pipe_(message_pipe), port_(port), channel_(nullptr) {
-  DCHECK(message_pipe_.get() || message_queue);
-  DCHECK(port_ == 0 || port_ == 1);
+    : client_(client), client_port_(client_port), channel_(nullptr) {
+  DCHECK(client_.get() || message_queue);
 
   if (message_queue)
     paused_message_queue_.Swap(message_queue);
@@ -44,11 +43,11 @@
   return WriteMessageNoLock(message.Pass());
 }
 
-void ChannelEndpoint::DetachFromMessagePipe() {
+void ChannelEndpoint::DetachFromClient() {
   {
     base::AutoLock locker(lock_);
-    DCHECK(message_pipe_.get());
-    message_pipe_ = nullptr;
+    DCHECK(client_.get());
+    client_ = nullptr;
 
     if (!channel_)
       return;
@@ -81,7 +80,7 @@
         << "Failed to write enqueue message to channel";
   }
 
-  if (!message_pipe_.get()) {
+  if (!client_.get()) {
     channel_->DetachEndpoint(this, local_id_, remote_id_);
     channel_ = nullptr;
     local_id_ = ChannelEndpointId();
@@ -93,12 +92,12 @@
     const MessageInTransit::View& message_view,
     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
   scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
-  scoped_refptr<MessagePipe> message_pipe;
-  unsigned port;
+  scoped_refptr<ChannelEndpointClient> client;
+  unsigned client_port;
   {
     base::AutoLock locker(lock_);
     DCHECK(channel_);
-    if (!message_pipe_.get()) {
+    if (!client_.get()) {
       // This isn't a failure per se. (It just means that, e.g., the other end
       // of the message point closed first.)
       return true;
@@ -113,30 +112,28 @@
     }
 
     // Take a ref, and call |EnqueueMessage()| outside the lock.
-    message_pipe = message_pipe_;
-    port = port_;
+    client = client_;
+    client_port = client_port_;
   }
 
-  MojoResult result = message_pipe->EnqueueMessage(
-      MessagePipe::GetPeerPort(port), message.Pass());
-  return (result == MOJO_RESULT_OK);
+  return client->OnReadMessage(client_port, message.Pass());
 }
 
 void ChannelEndpoint::DetachFromChannel() {
-  scoped_refptr<MessagePipe> message_pipe;
-  unsigned port = 0;
+  scoped_refptr<ChannelEndpointClient> client;
+  unsigned client_port = 0;
   {
     base::AutoLock locker(lock_);
 
-    if (message_pipe_.get()) {
-      // Take a ref, and call |Close()| outside the lock.
-      message_pipe = message_pipe_;
-      port = port_;
+    if (client_.get()) {
+      // Take a ref, and call |OnDetachFromChannel()| outside the lock.
+      client = client_;
+      client_port = client_port_;
     }
 
     // |channel_| may already be null if we already detached from the channel in
-    // |DetachFromMessagePipe()| by calling |Channel::DetachEndpoint()| (and
-    // there are racing detaches).
+    // |DetachFromClient()| by calling |Channel::DetachEndpoint()| (and there
+    // are racing detaches).
     if (channel_) {
       DCHECK(local_id_.is_valid());
       DCHECK(remote_id_.is_valid());
@@ -146,12 +143,12 @@
     }
   }
 
-  if (message_pipe.get())
-    message_pipe->Close(port);
+  if (client.get())
+    client->OnDetachFromChannel(client_port);
 }
 
 ChannelEndpoint::~ChannelEndpoint() {
-  DCHECK(!message_pipe_.get());
+  DCHECK(!client_.get());
   DCHECK(!channel_);
   DCHECK(!local_id_.is_valid());
   DCHECK(!remote_id_.is_valid());
diff --git a/mojo/edk/system/channel_endpoint.h b/mojo/edk/system/channel_endpoint.h
index 0823d94..ee8f8c3 100644
--- a/mojo/edk/system/channel_endpoint.h
+++ b/mojo/edk/system/channel_endpoint.h
@@ -11,6 +11,7 @@
 #include "base/synchronization/lock.h"
 #include "mojo/edk/embedder/platform_handle_vector.h"
 #include "mojo/edk/system/channel_endpoint_id.h"
+#include "mojo/edk/system/message_in_transit.h"
 #include "mojo/edk/system/message_in_transit_queue.h"
 #include "mojo/edk/system/system_impl_export.h"
 
@@ -18,8 +19,8 @@
 namespace system {
 
 class Channel;
+class ChannelEndpointClient;
 class MessageInTransit;
-class MessagePipe;
 
 // TODO(vtl): The plan:
 //   - (Done.) Move |Channel::Endpoint| to |ChannelEndpoint|. Make it
@@ -112,26 +113,26 @@
 class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
     : public base::RefCountedThreadSafe<ChannelEndpoint> {
  public:
-  // Constructor for a |ChannelEndpoint| attached to the given message pipe
-  // endpoint (specified by |message_pipe| and |port|). Optionally takes
-  // messages from |*message_queue| if |message_queue| is non-null.
+  // Constructor for a |ChannelEndpoint| with the given client (specified by
+  // |client| and |client_port|). Optionally takes messages from
+  // |*message_queue| if |message_queue| is non-null.
   //
-  // |message_pipe| may be null if this endpoint will never need to receive
-  // messages, in which case |message_queue| should not be null. In that case,
-  // this endpoint will simply send queued messages upon being attached to a
+  // |client| may be null if this endpoint will never need to receive messages,
+  // in which case |message_queue| should not be null. In that case, this
+  // endpoint will simply send queued messages upon being attached to a
   // |Channel| and immediately detach itself.
-  ChannelEndpoint(MessagePipe* message_pipe,
-                  unsigned port,
+  ChannelEndpoint(ChannelEndpointClient* client,
+                  unsigned client_port,
                   MessageInTransitQueue* message_queue = nullptr);
 
-  // Methods called by |MessagePipe| (via |ProxyMessagePipeEndpoint|):
+  // Methods called by |ChannelEndpointClient|:
 
   // TODO(vtl): This currently only works if we're "running". We'll move the
   // "paused message queue" here (will this be needed when we have
   // locally-allocated remote IDs?).
   bool EnqueueMessage(scoped_ptr<MessageInTransit> message);
 
-  void DetachFromMessagePipe();
+  void DetachFromClient();
 
   // Methods called by |Channel|:
 
@@ -142,7 +143,7 @@
                     ChannelEndpointId local_id,
                     ChannelEndpointId remote_id);
 
-  // Called by |Channel| when it receives a message for the message pipe.
+  // Called by |Channel| when it receives a message for the |ChannelEndpoint|.
   bool OnReadMessage(const MessageInTransit::View& message_view,
                      embedder::ScopedPlatformHandleVectorPtr platform_handles);
 
@@ -159,18 +160,17 @@
   // Protects the members below.
   base::Lock lock_;
 
-  // |message_pipe_| must be valid whenever it is non-null. Before
-  // |*message_pipe_| gives up its reference to this object, it must call
-  // |DetachFromMessagePipe()|.
+  // |client_| must be valid whenever it is non-null. Before |*client_| gives up
+  // its reference to this object, it must call |DetachFromClient()|.
   // NOTE: This is a |scoped_refptr<>|, rather than a raw pointer, since the
   // |Channel| needs to keep the |MessagePipe| alive for the "proxy-proxy" case.
   // Possibly we'll be able to eliminate that case when we have full
   // multiprocess support.
-  // WARNING: |MessagePipe| methods must not be called under |lock_|. Thus to
-  // make such a call, a reference must first be taken under |lock_| and the
-  // lock released.
-  scoped_refptr<MessagePipe> message_pipe_;
-  unsigned port_;
+  // WARNING: |ChannelEndpointClient| methods must not be called under |lock_|.
+  // Thus to make such a call, a reference must first be taken under |lock_| and
+  // the lock released.
+  scoped_refptr<ChannelEndpointClient> client_;
+  unsigned client_port_;
 
   // |channel_| must be valid whenever it is non-null. Before |*channel_| gives
   // up its reference to this object, it must call |DetachFromChannel()|.
diff --git a/mojo/edk/system/channel_endpoint_client.h b/mojo/edk/system/channel_endpoint_client.h
new file mode 100644
index 0000000..e14326f
--- /dev/null
+++ b/mojo/edk/system/channel_endpoint_client.h
@@ -0,0 +1,62 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_SYSTEM_CHANNEL_ENDPOINT_CLIENT_H_
+#define MOJO_EDK_SYSTEM_CHANNEL_ENDPOINT_CLIENT_H_
+
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "mojo/edk/system/system_impl_export.h"
+
+namespace mojo {
+namespace system {
+
+class MessageInTransit;
+
+// Interface for receivers of messages from |ChannelEndpoint| (hence from
+// |Channel|). |port| is simply the value passed to |ChannelEndpoint| on
+// construction, and provides a lightweight way for an object to be the client
+// of multiple |ChannelEndpoint|s. (|MessagePipe| implements this interface, in
+// which case |port| is the port number for the |ProxyMessagePipeEndpoint|
+// corresdponding to the |ChannelEndpoint|.)
+//
+// Implementations of this class should be thread-safe. |ChannelEndpointClient|
+// *precedes* |ChannelEndpoint| in the lock order, so |ChannelEndpoint| should
+// never call into this class with its lock held. (Instead, it should take a
+// reference under its lock, release its lock, and make any needed call(s).)
+//
+// Note: As a consequence of this, all the client methods may be called even
+// after |ChannelEndpoint::DetachFromClient()| has been called (so the
+// |ChannelEndpoint| has apparently relinquished its pointer to the
+// |ChannelEndpointClient|).
+class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpointClient
+    : public base::RefCountedThreadSafe<ChannelEndpointClient> {
+ public:
+  // Called by |ChannelEndpoint| in response to its |OnReadMessage()|, which is
+  // called by |Channel| when it receives a message for the |ChannelEndpoint|.
+  // (|port| is the value passed to |ChannelEndpoint|'s constructor as
+  // |client_port|.)
+  virtual bool OnReadMessage(unsigned port,
+                             scoped_ptr<MessageInTransit> message) = 0;
+
+  // Called by |ChannelEndpoint| when the |Channel| is relinquishing its pointer
+  // to the |ChannelEndpoint| (and vice versa). After this is called,
+  // |OnReadMessage()| will no longer be called.
+  virtual void OnDetachFromChannel(unsigned port) = 0;
+
+ protected:
+  ChannelEndpointClient() {}
+
+  virtual ~ChannelEndpointClient() {}
+  friend class base::RefCountedThreadSafe<ChannelEndpointClient>;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ChannelEndpointClient);
+};
+
+}  // namespace system
+}  // namespace mojo
+
+#endif  // MOJO_EDK_SYSTEM_CHANNEL_ENDPOINT_CLIENT_H_
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
index 731fd90..af007a2 100644
--- a/mojo/edk/system/message_pipe.cc
+++ b/mojo/edk/system/message_pipe.cc
@@ -139,7 +139,7 @@
     std::vector<DispatcherTransport>* transports,
     MojoWriteMessageFlags flags) {
   DCHECK(port == 0 || port == 1);
-  return EnqueueMessageInternal(
+  return EnqueueMessage(
       GetPeerPort(port),
       make_scoped_ptr(new MessageInTransit(
           MessageInTransit::kTypeMessagePipeEndpoint,
@@ -257,9 +257,18 @@
   return channel_endpoint;
 }
 
-MojoResult MessagePipe::EnqueueMessage(unsigned port,
-                                       scoped_ptr<MessageInTransit> message) {
-  return EnqueueMessageInternal(port, message.Pass(), nullptr);
+bool MessagePipe::OnReadMessage(unsigned port,
+                                scoped_ptr<MessageInTransit> message) {
+  // This is called when the |ChannelEndpoint| for the
+  // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
+  // We need to pass this message on to its peer port (typically a
+  // |LocalMessagePipeEndpoint|).
+  return EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr) ==
+         MOJO_RESULT_OK;
+}
+
+void MessagePipe::OnDetachFromChannel(unsigned port) {
+  Close(port);
 }
 
 MessagePipe::MessagePipe() {
@@ -273,7 +282,7 @@
   DCHECK(!endpoints_[1]);
 }
 
-MojoResult MessagePipe::EnqueueMessageInternal(
+MojoResult MessagePipe::EnqueueMessage(
     unsigned port,
     scoped_ptr<MessageInTransit> message,
     std::vector<DispatcherTransport>* transports) {
diff --git a/mojo/edk/system/message_pipe.h b/mojo/edk/system/message_pipe.h
index b39e517..90af7cb 100644
--- a/mojo/edk/system/message_pipe.h
+++ b/mojo/edk/system/message_pipe.h
@@ -15,6 +15,7 @@
 #include "base/memory/scoped_ptr.h"
 #include "base/synchronization/lock.h"
 #include "mojo/edk/embedder/platform_handle_vector.h"
+#include "mojo/edk/system/channel_endpoint_client.h"
 #include "mojo/edk/system/dispatcher.h"
 #include "mojo/edk/system/handle_signals_state.h"
 #include "mojo/edk/system/memory.h"
@@ -34,8 +35,7 @@
 // |MessagePipe| is the secondary object implementing a message pipe (see the
 // explanatory comment in core.cc). It is typically owned by the dispatcher(s)
 // corresponding to the local endpoints. This class is thread-safe.
-class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
-    : public base::RefCountedThreadSafe<MessagePipe> {
+class MOJO_SYSTEM_IMPL_EXPORT MessagePipe : public ChannelEndpointClient {
  public:
   // Creates a |MessagePipe| with two new |LocalMessagePipeEndpoint|s.
   static MessagePipe* CreateLocalLocal();
@@ -110,34 +110,30 @@
   // |EndSerialize()|).
   scoped_refptr<ChannelEndpoint> ConvertLocalToProxy(unsigned port);
 
-  // This is used by |Channel| to enqueue messages (typically to a
-  // |LocalMessagePipeEndpoint|). Unlike |WriteMessage()|, |port| is the
-  // *destination* port.
-  MojoResult EnqueueMessage(unsigned port,
-                            scoped_ptr<MessageInTransit> message);
+  // |ChannelEndpointClient| methods:
+  bool OnReadMessage(unsigned port,
+                     scoped_ptr<MessageInTransit> message) override;
+  void OnDetachFromChannel(unsigned port) override;
 
  private:
   MessagePipe();
+  virtual ~MessagePipe();
 
-  friend class base::RefCountedThreadSafe<MessagePipe>;
-  ~MessagePipe();
-
-  // This is used internally by |WriteMessage()| and by |EnqueueMessage()|.
+  // This is used internally by |WriteMessage()| and by |OnReadMessage()|.
   // |transports| may be non-null only if it's nonempty and |message| has no
   // dispatchers attached.
-  MojoResult EnqueueMessageInternal(
-      unsigned port,
-      scoped_ptr<MessageInTransit> message,
-      std::vector<DispatcherTransport>* transports);
+  MojoResult EnqueueMessage(unsigned port,
+                            scoped_ptr<MessageInTransit> message,
+                            std::vector<DispatcherTransport>* transports);
 
-  // Helper for |EnqueueMessageInternal()|. Must be called with |lock_| held.
+  // Helper for |EnqueueMessage()|. Must be called with |lock_| held.
   MojoResult AttachTransportsNoLock(
       unsigned port,
       MessageInTransit* message,
       std::vector<DispatcherTransport>* transports);
 
-  // Used by |EnqueueMessageInternal()| to handle control messages that are
-  // actually meant for us.
+  // Used by |EnqueueMessage()| to handle control messages that are actually
+  // meant for us.
   MojoResult HandleControlMessage(unsigned port,
                                   scoped_ptr<MessageInTransit> message);
 
diff --git a/mojo/edk/system/proxy_message_pipe_endpoint.cc b/mojo/edk/system/proxy_message_pipe_endpoint.cc
index 85437bb..4a954d9 100644
--- a/mojo/edk/system/proxy_message_pipe_endpoint.cc
+++ b/mojo/edk/system/proxy_message_pipe_endpoint.cc
@@ -48,7 +48,7 @@
 
 void ProxyMessagePipeEndpoint::DetachIfNecessary() {
   if (channel_endpoint_.get()) {
-    channel_endpoint_->DetachFromMessagePipe();
+    channel_endpoint_->DetachFromClient();
     channel_endpoint_ = nullptr;
   }
 }