| // Copyright 2015 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "mojo/edk/system/master_connection_manager.h" | 
 |  | 
 | #include "base/bind.h" | 
 | #include "base/bind_helpers.h" | 
 | #include "base/location.h" | 
 | #include "base/logging.h" | 
 | #include "base/message_loop/message_loop.h" | 
 | #include "base/synchronization/waitable_event.h" | 
 | #include "mojo/edk/embedder/master_process_delegate.h" | 
 | #include "mojo/edk/embedder/platform_channel_pair.h" | 
 | #include "mojo/edk/embedder/platform_handle_vector.h" | 
 | #include "mojo/edk/system/message_in_transit.h" | 
 | #include "mojo/edk/system/raw_channel.h" | 
 | #include "mojo/edk/system/transport_data.h" | 
 |  | 
 | namespace mojo { | 
 | namespace system { | 
 |  | 
 | const ProcessIdentifier kFirstSlaveProcessIdentifier = 2; | 
 |  | 
 | static_assert(kMasterProcessIdentifier != kInvalidProcessIdentifier, | 
 |               "Bad master process identifier"); | 
 | static_assert(kFirstSlaveProcessIdentifier != kInvalidProcessIdentifier, | 
 |               "Bad first slave process identifier"); | 
 | static_assert(kMasterProcessIdentifier != kFirstSlaveProcessIdentifier, | 
 |               "Master and first slave process identifiers are the same"); | 
 |  | 
 | // MasterConnectionManager::Helper --------------------------------------------- | 
 |  | 
 | // |MasterConnectionManager::Helper| is not thread-safe, and must only be used | 
 | // on its |owner_|'s private thread. | 
 | class MasterConnectionManager::Helper : public RawChannel::Delegate { | 
 |  public: | 
 |   Helper(MasterConnectionManager* owner, | 
 |          ProcessIdentifier process_identifier, | 
 |          scoped_ptr<embedder::SlaveInfo> slave_info, | 
 |          embedder::ScopedPlatformHandle platform_handle); | 
 |   ~Helper() override; | 
 |  | 
 |   void Init(); | 
 |   scoped_ptr<embedder::SlaveInfo> Shutdown(); | 
 |  | 
 |  private: | 
 |   // |RawChannel::Delegate| methods: | 
 |   void OnReadMessage( | 
 |       const MessageInTransit::View& message_view, | 
 |       embedder::ScopedPlatformHandleVectorPtr platform_handles) override; | 
 |   void OnError(Error error) override; | 
 |  | 
 |   // Handles an error that's fatal to this object. Note that this probably | 
 |   // results in |Shutdown()| being called (in the nested context) and then this | 
 |   // object being destroyed. | 
 |   void FatalError(); | 
 |  | 
 |   MasterConnectionManager* const owner_; | 
 |   const ProcessIdentifier process_identifier_; | 
 |   scoped_ptr<embedder::SlaveInfo> slave_info_; | 
 |   scoped_ptr<RawChannel> raw_channel_; | 
 |  | 
 |   DISALLOW_COPY_AND_ASSIGN(Helper); | 
 | }; | 
 |  | 
 | MasterConnectionManager::Helper::Helper( | 
 |     MasterConnectionManager* owner, | 
 |     ProcessIdentifier process_identifier, | 
 |     scoped_ptr<embedder::SlaveInfo> slave_info, | 
 |     embedder::ScopedPlatformHandle platform_handle) | 
 |     : owner_(owner), | 
 |       process_identifier_(process_identifier), | 
 |       slave_info_(slave_info.Pass()), | 
 |       raw_channel_(RawChannel::Create(platform_handle.Pass())) { | 
 | } | 
 |  | 
 | MasterConnectionManager::Helper::~Helper() { | 
 |   DCHECK(!slave_info_); | 
 | } | 
 |  | 
 | void MasterConnectionManager::Helper::Init() { | 
 |   raw_channel_->Init(this); | 
 | } | 
 |  | 
 | scoped_ptr<embedder::SlaveInfo> MasterConnectionManager::Helper::Shutdown() { | 
 |   raw_channel_->Shutdown(); | 
 |   raw_channel_.reset(); | 
 |   return slave_info_.Pass(); | 
 | } | 
 |  | 
 | void MasterConnectionManager::Helper::OnReadMessage( | 
 |     const MessageInTransit::View& message_view, | 
 |     embedder::ScopedPlatformHandleVectorPtr platform_handles) { | 
 |   if (message_view.type() != MessageInTransit::kTypeConnectionManager) { | 
 |     LOG(ERROR) << "Invalid message type " << message_view.type(); | 
 |     FatalError();  // WARNING: This destroys us. | 
 |     return; | 
 |   } | 
 |  | 
 |   // Currently, all the messages simply have a |ConnectionIdentifier| as data. | 
 |   if (message_view.num_bytes() != sizeof(ConnectionIdentifier)) { | 
 |     LOG(ERROR) << "Invalid message size " << message_view.num_bytes(); | 
 |     FatalError();  // WARNING: This destroys us. | 
 |     return; | 
 |   } | 
 |  | 
 |   // And none of them should have any platform handles attached. | 
 |   if (message_view.transport_data_buffer()) { | 
 |     LOG(ERROR) << "Invalid message with transport data"; | 
 |     FatalError();  // WARNING: This destroys us. | 
 |     return; | 
 |   } | 
 |  | 
 |   const ConnectionIdentifier* connection_id = | 
 |       reinterpret_cast<const ConnectionIdentifier*>(message_view.bytes()); | 
 |   bool result; | 
 |   ProcessIdentifier peer_process_identifier = kInvalidProcessIdentifier; | 
 |   embedder::ScopedPlatformHandle platform_handle; | 
 |   uint32_t num_bytes = 0; | 
 |   const void* bytes = nullptr; | 
 |   switch (message_view.subtype()) { | 
 |     case MessageInTransit::kSubtypeConnectionManagerAllowConnect: | 
 |       result = owner_->AllowConnectImpl(process_identifier_, *connection_id); | 
 |       break; | 
 |     case MessageInTransit::kSubtypeConnectionManagerCancelConnect: | 
 |       result = owner_->CancelConnectImpl(process_identifier_, *connection_id); | 
 |       break; | 
 |     case MessageInTransit::kSubtypeConnectionManagerConnect: | 
 |       result = owner_->ConnectImpl(process_identifier_, *connection_id, | 
 |                                    &peer_process_identifier, &platform_handle); | 
 |       // Success acks for "connect" have the peer process identifier as data | 
 |       // (and maybe also a platform handle). | 
 |       if (result) { | 
 |         num_bytes = static_cast<uint32_t>(sizeof(peer_process_identifier)); | 
 |         bytes = &peer_process_identifier; | 
 |       } | 
 |       break; | 
 |     default: | 
 |       LOG(ERROR) << "Invalid message subtype " << message_view.subtype(); | 
 |       FatalError();  // WARNING: This destroys us. | 
 |       return; | 
 |   } | 
 |  | 
 |   scoped_ptr<MessageInTransit> response(new MessageInTransit( | 
 |       MessageInTransit::kTypeConnectionManagerAck, | 
 |       result ? MessageInTransit::kSubtypeConnectionManagerAckSuccess | 
 |              : MessageInTransit::kSubtypeConnectionManagerAckFailure, | 
 |       num_bytes, bytes)); | 
 |  | 
 |   if (platform_handle.is_valid()) { | 
 |     // Only success acks for "connect" *may* have a platform handle attached. | 
 |     DCHECK(result); | 
 |     DCHECK_EQ(message_view.subtype(), | 
 |               MessageInTransit::kSubtypeConnectionManagerConnect); | 
 |  | 
 |     embedder::ScopedPlatformHandleVectorPtr platform_handles( | 
 |         new embedder::PlatformHandleVector()); | 
 |     platform_handles->push_back(platform_handle.release()); | 
 |     response->SetTransportData( | 
 |         make_scoped_ptr(new TransportData(platform_handles.Pass()))); | 
 |   } | 
 |  | 
 |   if (!raw_channel_->WriteMessage(response.Pass())) { | 
 |     LOG(ERROR) << "WriteMessage failed"; | 
 |     FatalError();  // WARNING: This destroys us. | 
 |     return; | 
 |   } | 
 | } | 
 |  | 
 | void MasterConnectionManager::Helper::OnError(Error /*error*/) { | 
 |   // Every error (read or write) is fatal (for that particular connection). Read | 
 |   // errors are fatal since no more commands will be received from that | 
 |   // connection. Write errors are fatal since it is no longer possible to send | 
 |   // responses. | 
 |   FatalError();  // WARNING: This destroys us. | 
 | } | 
 |  | 
 | void MasterConnectionManager::Helper::FatalError() { | 
 |   owner_->OnError(process_identifier_);  // WARNING: This destroys us. | 
 | } | 
 |  | 
 | // MasterConnectionManager::PendingConnectionInfo ------------------------------ | 
 |  | 
 | struct MasterConnectionManager::PendingConnectionInfo { | 
 |   // States: | 
 |   //   - This is created upon a first "allow connect" (with |first| set | 
 |   //     immediately). We then wait for a second "allow connect". | 
 |   //   - After the second "allow connect" (and |second| is set), we wait for | 
 |   //     "connects" from both |first| and |second|. | 
 |   //   - We may then receive "connect" from either |first| or |second|, at which | 
 |   //     which point it remains to wait for "connect" from the other. | 
 |   // I.e., the valid state transitions are: | 
 |   //   AWAITING_SECOND_ALLOW_CONNECT -> AWAITING_CONNECTS_FROM_BOTH | 
 |   //       -> {AWAITING_CONNECT_FROM_FIRST,AWAITING_CONNECT_FROM_SECOND} | 
 |   enum State { | 
 |     AWAITING_SECOND_ALLOW_CONNECT, | 
 |     AWAITING_CONNECTS_FROM_BOTH, | 
 |     AWAITING_CONNECT_FROM_FIRST, | 
 |     AWAITING_CONNECT_FROM_SECOND | 
 |   }; | 
 |  | 
 |   explicit PendingConnectionInfo(ProcessIdentifier first) | 
 |       : state(AWAITING_SECOND_ALLOW_CONNECT), | 
 |         first(first), | 
 |         second(kInvalidProcessIdentifier) { | 
 |     DCHECK_NE(first, kInvalidProcessIdentifier); | 
 |   } | 
 |   ~PendingConnectionInfo() {} | 
 |  | 
 |   State state; | 
 |  | 
 |   ProcessIdentifier first; | 
 |   ProcessIdentifier second; | 
 |  | 
 |   // Valid in AWAITING_CONNECT_FROM_{FIRST, SECOND} states. | 
 |   embedder::ScopedPlatformHandle remaining_handle; | 
 | }; | 
 |  | 
 | // MasterConnectionManager ----------------------------------------------------- | 
 |  | 
 | MasterConnectionManager::MasterConnectionManager() | 
 |     : master_process_delegate_(), | 
 |       private_thread_("MasterConnectionManagerPrivateThread"), | 
 |       next_process_identifier_(kFirstSlaveProcessIdentifier) { | 
 | } | 
 |  | 
 | MasterConnectionManager::~MasterConnectionManager() { | 
 |   DCHECK(!delegate_thread_task_runner_); | 
 |   DCHECK(!master_process_delegate_); | 
 |   DCHECK(!private_thread_.message_loop()); | 
 |   DCHECK(helpers_.empty()); | 
 |   DCHECK(pending_connections_.empty()); | 
 | } | 
 |  | 
 | void MasterConnectionManager::Init( | 
 |     scoped_refptr<base::TaskRunner> delegate_thread_task_runner, | 
 |     embedder::MasterProcessDelegate* master_process_delegate) { | 
 |   DCHECK(delegate_thread_task_runner); | 
 |   DCHECK(master_process_delegate); | 
 |   DCHECK(!delegate_thread_task_runner_); | 
 |   DCHECK(!master_process_delegate_); | 
 |   DCHECK(!private_thread_.message_loop()); | 
 |  | 
 |   delegate_thread_task_runner_ = delegate_thread_task_runner; | 
 |   master_process_delegate_ = master_process_delegate; | 
 |   CHECK(private_thread_.StartWithOptions( | 
 |       base::Thread::Options(base::MessageLoop::TYPE_IO, 0))); | 
 | } | 
 |  | 
 | void MasterConnectionManager::Shutdown() { | 
 |   AssertNotOnPrivateThread(); | 
 |   DCHECK(master_process_delegate_); | 
 |   DCHECK(private_thread_.message_loop()); | 
 |  | 
 |   // The |Stop()| will actually finish all posted tasks. | 
 |   private_thread_.message_loop()->PostTask( | 
 |       FROM_HERE, base::Bind(&MasterConnectionManager::ShutdownOnPrivateThread, | 
 |                             base::Unretained(this))); | 
 |   private_thread_.Stop(); | 
 |   DCHECK(helpers_.empty()); | 
 |   DCHECK(pending_connections_.empty()); | 
 |   master_process_delegate_ = nullptr; | 
 |   delegate_thread_task_runner_ = nullptr; | 
 | } | 
 |  | 
 | void MasterConnectionManager::AddSlave( | 
 |     scoped_ptr<embedder::SlaveInfo> slave_info, | 
 |     embedder::ScopedPlatformHandle platform_handle) { | 
 |   // We don't really care if |slave_info| is non-null or not. | 
 |   DCHECK(platform_handle.is_valid()); | 
 |   AssertNotOnPrivateThread(); | 
 |  | 
 |   // We have to wait for the task to be executed, in case someone calls | 
 |   // |AddSlave()| followed immediately by |Shutdown()|. | 
 |   base::WaitableEvent event(false, false); | 
 |   private_thread_.message_loop()->PostTask( | 
 |       FROM_HERE, | 
 |       base::Bind(&MasterConnectionManager::AddSlaveOnPrivateThread, | 
 |                  base::Unretained(this), base::Passed(&slave_info), | 
 |                  base::Passed(&platform_handle), base::Unretained(&event))); | 
 |   event.Wait(); | 
 | } | 
 |  | 
 | bool MasterConnectionManager::AllowConnect( | 
 |     const ConnectionIdentifier& connection_id) { | 
 |   AssertNotOnPrivateThread(); | 
 |   return AllowConnectImpl(kMasterProcessIdentifier, connection_id); | 
 | } | 
 |  | 
 | bool MasterConnectionManager::CancelConnect( | 
 |     const ConnectionIdentifier& connection_id) { | 
 |   AssertNotOnPrivateThread(); | 
 |   return CancelConnectImpl(kMasterProcessIdentifier, connection_id); | 
 | } | 
 |  | 
 | bool MasterConnectionManager::Connect( | 
 |     const ConnectionIdentifier& connection_id, | 
 |     ProcessIdentifier* peer_process_identifier, | 
 |     embedder::ScopedPlatformHandle* platform_handle) { | 
 |   AssertNotOnPrivateThread(); | 
 |   return ConnectImpl(kMasterProcessIdentifier, connection_id, | 
 |                      peer_process_identifier, platform_handle); | 
 | } | 
 |  | 
 | bool MasterConnectionManager::AllowConnectImpl( | 
 |     ProcessIdentifier process_identifier, | 
 |     const ConnectionIdentifier& connection_id) { | 
 |   DCHECK_NE(process_identifier, kInvalidProcessIdentifier); | 
 |  | 
 |   base::AutoLock locker(lock_); | 
 |  | 
 |   auto it = pending_connections_.find(connection_id); | 
 |   if (it == pending_connections_.end()) { | 
 |     pending_connections_[connection_id] = | 
 |         new PendingConnectionInfo(process_identifier); | 
 |     // TODO(vtl): Track process identifier -> pending connections also (so these | 
 |     // can be removed efficiently if that process disconnects). | 
 |     DVLOG(1) << "New pending connection ID " << connection_id | 
 |              << ": AllowConnect() from first process identifier " | 
 |              << process_identifier; | 
 |     return true; | 
 |   } | 
 |  | 
 |   PendingConnectionInfo* info = it->second; | 
 |   if (info->state == PendingConnectionInfo::AWAITING_SECOND_ALLOW_CONNECT) { | 
 |     info->state = PendingConnectionInfo::AWAITING_CONNECTS_FROM_BOTH; | 
 |     info->second = process_identifier; | 
 |     DVLOG(1) << "Pending connection ID " << connection_id | 
 |              << ": AllowConnect() from second process identifier " | 
 |              << process_identifier; | 
 |     return true; | 
 |   } | 
 |  | 
 |   // Someone's behaving badly, but we don't know who (it might not be the | 
 |   // caller). | 
 |   LOG(ERROR) << "AllowConnect() from process " << process_identifier | 
 |              << " for connection ID " << connection_id << " already in state " | 
 |              << info->state; | 
 |   pending_connections_.erase(it); | 
 |   delete info; | 
 |   return false; | 
 | } | 
 |  | 
 | bool MasterConnectionManager::CancelConnectImpl( | 
 |     ProcessIdentifier process_identifier, | 
 |     const ConnectionIdentifier& connection_id) { | 
 |   DCHECK_NE(process_identifier, kInvalidProcessIdentifier); | 
 |  | 
 |   base::AutoLock locker(lock_); | 
 |  | 
 |   auto it = pending_connections_.find(connection_id); | 
 |   if (it == pending_connections_.end()) { | 
 |     // Not necessarily the caller's fault, and not necessarily an error. | 
 |     DVLOG(1) << "CancelConnect() from process " << process_identifier | 
 |              << " for connection ID " << connection_id | 
 |              << " which is not (or no longer) pending"; | 
 |     return true; | 
 |   } | 
 |  | 
 |   PendingConnectionInfo* info = it->second; | 
 |   if (process_identifier != info->first && process_identifier != info->second) { | 
 |     LOG(ERROR) << "CancelConnect() from process " << process_identifier | 
 |                << " for connection ID " << connection_id | 
 |                << " which is neither connectee"; | 
 |     return false; | 
 |   } | 
 |  | 
 |   // Just erase it. The other side may also try to cancel, in which case it'll | 
 |   // "fail" in the first if statement above (we assume that connection IDs never | 
 |   // collide, so there's no need to carefully track both sides). | 
 |   pending_connections_.erase(it); | 
 |   delete info; | 
 |   return true; | 
 | } | 
 |  | 
 | bool MasterConnectionManager::ConnectImpl( | 
 |     ProcessIdentifier process_identifier, | 
 |     const ConnectionIdentifier& connection_id, | 
 |     ProcessIdentifier* peer_process_identifier, | 
 |     embedder::ScopedPlatformHandle* platform_handle) { | 
 |   DCHECK_NE(process_identifier, kInvalidProcessIdentifier); | 
 |   DCHECK(peer_process_identifier); | 
 |   DCHECK(platform_handle); | 
 |   DCHECK(!platform_handle->is_valid());  // Not technically wrong, but unlikely. | 
 |  | 
 |   base::AutoLock locker(lock_); | 
 |  | 
 |   auto it = pending_connections_.find(connection_id); | 
 |   if (it == pending_connections_.end()) { | 
 |     // Not necessarily the caller's fault. | 
 |     LOG(ERROR) << "Connect() from process " << process_identifier | 
 |                << " for connection ID " << connection_id | 
 |                << " which is not pending"; | 
 |     return false; | 
 |   } | 
 |  | 
 |   PendingConnectionInfo* info = it->second; | 
 |   if (info->state == PendingConnectionInfo::AWAITING_CONNECTS_FROM_BOTH) { | 
 |     DCHECK(!info->remaining_handle.is_valid()); | 
 |  | 
 |     if (process_identifier == info->first) { | 
 |       info->state = PendingConnectionInfo::AWAITING_CONNECT_FROM_SECOND; | 
 |       *peer_process_identifier = info->second; | 
 |     } else if (process_identifier == info->second) { | 
 |       info->state = PendingConnectionInfo::AWAITING_CONNECT_FROM_FIRST; | 
 |       *peer_process_identifier = info->first; | 
 |     } else { | 
 |       LOG(ERROR) << "Connect() from process " << process_identifier | 
 |                  << " for connection ID " << connection_id | 
 |                  << " which is neither connectee"; | 
 |       return false; | 
 |     } | 
 |  | 
 |     if (info->first == info->second) { | 
 |       platform_handle->reset(); | 
 |       DCHECK(!info->remaining_handle.is_valid()); | 
 |     } else { | 
 |       embedder::PlatformChannelPair platform_channel_pair; | 
 |       *platform_handle = platform_channel_pair.PassServerHandle(); | 
 |       DCHECK(platform_handle->is_valid()); | 
 |       info->remaining_handle = platform_channel_pair.PassClientHandle(); | 
 |       DCHECK(info->remaining_handle.is_valid()); | 
 |     } | 
 |     DVLOG(1) << "Connection ID " << connection_id | 
 |              << ": first Connect() from process identifier " | 
 |              << process_identifier; | 
 |     return true; | 
 |   } | 
 |  | 
 |   ProcessIdentifier remaining_connectee; | 
 |   ProcessIdentifier peer; | 
 |   if (info->state == PendingConnectionInfo::AWAITING_CONNECT_FROM_FIRST) { | 
 |     remaining_connectee = info->first; | 
 |     peer = info->second; | 
 |   } else if (info->state == | 
 |              PendingConnectionInfo::AWAITING_CONNECT_FROM_SECOND) { | 
 |     remaining_connectee = info->second; | 
 |     peer = info->first; | 
 |   } else { | 
 |     // Someone's behaving badly, but we don't know who (it might not be the | 
 |     // caller). | 
 |     LOG(ERROR) << "Connect() from process " << process_identifier | 
 |                << " for connection ID " << connection_id << " in state " | 
 |                << info->state; | 
 |     pending_connections_.erase(it); | 
 |     delete info; | 
 |     return false; | 
 |   } | 
 |  | 
 |   if (process_identifier != remaining_connectee) { | 
 |     LOG(ERROR) << "Connect() from process " << process_identifier | 
 |                << " for connection ID " << connection_id | 
 |                << " which is not the remaining connectee"; | 
 |     pending_connections_.erase(it); | 
 |     delete info; | 
 |     return false; | 
 |   } | 
 |  | 
 |   *peer_process_identifier = peer; | 
 |   *platform_handle = info->remaining_handle.Pass(); | 
 |   DCHECK((info->first == info->second) ^ platform_handle->is_valid()); | 
 |   pending_connections_.erase(it); | 
 |   delete info; | 
 |   DVLOG(1) << "Connection ID " << connection_id | 
 |            << ": second Connect() from process identifier " | 
 |            << process_identifier; | 
 |   return true; | 
 | } | 
 |  | 
 | void MasterConnectionManager::ShutdownOnPrivateThread() { | 
 |   AssertOnPrivateThread(); | 
 |  | 
 |   if (!pending_connections_.empty()) { | 
 |     DVLOG(1) << "Shutting down with connections pending"; | 
 |     for (auto& p : pending_connections_) | 
 |       delete p.second; | 
 |     pending_connections_.clear(); | 
 |   } | 
 |  | 
 |   if (!helpers_.empty()) { | 
 |     DVLOG(1) << "Shutting down with slaves still connected"; | 
 |     for (auto& p : helpers_) { | 
 |       scoped_ptr<embedder::SlaveInfo> slave_info = p.second->Shutdown(); | 
 |       delete p.second; | 
 |       CallOnSlaveDisconnect(slave_info.Pass()); | 
 |     } | 
 |     helpers_.clear(); | 
 |   } | 
 | } | 
 |  | 
 | void MasterConnectionManager::AddSlaveOnPrivateThread( | 
 |     scoped_ptr<embedder::SlaveInfo> slave_info, | 
 |     embedder::ScopedPlatformHandle platform_handle, | 
 |     base::WaitableEvent* event) { | 
 |   DCHECK(platform_handle.is_valid()); | 
 |   DCHECK(event); | 
 |   AssertOnPrivateThread(); | 
 |  | 
 |   CHECK_NE(next_process_identifier_, kMasterProcessIdentifier); | 
 |   ProcessIdentifier process_identifier = next_process_identifier_; | 
 |   next_process_identifier_++; | 
 |  | 
 |   scoped_ptr<Helper> helper(new Helper( | 
 |       this, process_identifier, slave_info.Pass(), platform_handle.Pass())); | 
 |   helper->Init(); | 
 |  | 
 |   DCHECK(helpers_.find(process_identifier) == helpers_.end()); | 
 |   helpers_[process_identifier] = helper.release(); | 
 |  | 
 |   DVLOG(1) << "Added process identifier " << process_identifier; | 
 |   event->Signal(); | 
 | } | 
 |  | 
 | void MasterConnectionManager::OnError(ProcessIdentifier process_identifier) { | 
 |   DCHECK_NE(process_identifier, kInvalidProcessIdentifier); | 
 |   AssertOnPrivateThread(); | 
 |  | 
 |   auto it = helpers_.find(process_identifier); | 
 |   DCHECK(it != helpers_.end()); | 
 |   Helper* helper = it->second; | 
 |   scoped_ptr<embedder::SlaveInfo> slave_info = helper->Shutdown(); | 
 |   helpers_.erase(it); | 
 |   delete helper; | 
 |  | 
 |   { | 
 |     base::AutoLock locker(lock_); | 
 |  | 
 |     // TODO(vtl): This isn't very efficient. | 
 |     for (auto it = pending_connections_.begin(); | 
 |          it != pending_connections_.end();) { | 
 |       if (it->second->first == process_identifier || | 
 |           it->second->second == process_identifier) { | 
 |         auto it_to_erase = it; | 
 |         ++it; | 
 |         delete it_to_erase->second; | 
 |         pending_connections_.erase(it_to_erase); | 
 |       } else { | 
 |         ++it; | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   CallOnSlaveDisconnect(slave_info.Pass()); | 
 | } | 
 |  | 
 | void MasterConnectionManager::CallOnSlaveDisconnect( | 
 |     scoped_ptr<embedder::SlaveInfo> slave_info) { | 
 |   AssertOnPrivateThread(); | 
 |   DCHECK(master_process_delegate_); | 
 |   delegate_thread_task_runner_->PostTask( | 
 |       FROM_HERE, base::Bind(&embedder::MasterProcessDelegate::OnSlaveDisconnect, | 
 |                             base::Unretained(master_process_delegate_), | 
 |                             base::Passed(&slave_info))); | 
 | } | 
 |  | 
 | void MasterConnectionManager::AssertNotOnPrivateThread() const { | 
 |   // This should only be called after |Init()| and before |Shutdown()|. (If not, | 
 |   // the subsequent |DCHECK_NE()| is invalid, since the current thread may not | 
 |   // have a message loop.) | 
 |   DCHECK(private_thread_.message_loop()); | 
 |   DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop()); | 
 | } | 
 |  | 
 | void MasterConnectionManager::AssertOnPrivateThread() const { | 
 |   // This should only be called after |Init()| and before |Shutdown()|. | 
 |   DCHECK(private_thread_.message_loop()); | 
 |   DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop()); | 
 | } | 
 |  | 
 | }  // namespace system | 
 | }  // namespace mojo |