// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/common/handle_watcher.h"

#include <map>

#include "base/atomic_sequence_num.h"
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/singleton.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "mojo/common/message_pump_mojo.h"
#include "mojo/common/message_pump_mojo_handler.h"
#include "mojo/common/time_helper.h"

namespace mojo {
namespace common {

typedef int WatcherID;

namespace {

const char kWatcherThreadName[] = "handle-watcher-thread";

base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
  return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
      internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
}

// Tracks the data for a single call to Start().
struct WatchData {
  WatchData()
      : id(0),
        handle_signals(MOJO_HANDLE_SIGNAL_NONE),
        message_loop(NULL) {}

  WatcherID id;
  Handle handle;
  MojoHandleSignals handle_signals;
  base::TimeTicks deadline;
  base::Callback<void(MojoResult)> callback;
  scoped_refptr<base::MessageLoopProxy> message_loop;
};

// WatcherBackend --------------------------------------------------------------

// WatcherBackend is responsible for managing the requests and interacting with
// MessagePumpMojo. All access (outside of creation/destruction) is done on the
// thread WatcherThreadManager creates.
class WatcherBackend : public MessagePumpMojoHandler {
 public:
  WatcherBackend();
  ~WatcherBackend() override;

  void StartWatching(const WatchData& data);

  // Cancels a previously scheduled request to start a watch.
  void StopWatching(WatcherID watcher_id);

 private:
  typedef std::map<Handle, WatchData> HandleToWatchDataMap;

  // Invoked when a handle needs to be removed and notified.
  void RemoveAndNotify(const Handle& handle, MojoResult result);

  // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
  // and sets |handle| to the Handle. Returns false if not a known id.
  bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;

  // MessagePumpMojoHandler overrides:
  void OnHandleReady(const Handle& handle) override;
  void OnHandleError(const Handle& handle, MojoResult result) override;

  // Maps from assigned id to WatchData.
  HandleToWatchDataMap handle_to_data_;

  DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
};

WatcherBackend::WatcherBackend() {
}

WatcherBackend::~WatcherBackend() {
}

void WatcherBackend::StartWatching(const WatchData& data) {
  RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);

  DCHECK_EQ(0u, handle_to_data_.count(data.handle));

  handle_to_data_[data.handle] = data;
  MessagePumpMojo::current()->AddHandler(this, data.handle,
                                         data.handle_signals,
                                         data.deadline);
}

void WatcherBackend::StopWatching(WatcherID watcher_id) {
  // Because of the thread hop it is entirely possible to get here and not
  // have a valid handle registered for |watcher_id|.
  Handle handle;
  if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
    handle_to_data_.erase(handle);
    MessagePumpMojo::current()->RemoveHandler(handle);
  }
}

void WatcherBackend::RemoveAndNotify(const Handle& handle,
                                     MojoResult result) {
  if (handle_to_data_.count(handle) == 0)
    return;

  const WatchData data(handle_to_data_[handle]);
  handle_to_data_.erase(handle);
  MessagePumpMojo::current()->RemoveHandler(handle);

  data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
}

bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
                                              Handle* handle) const {
  for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
       i != handle_to_data_.end(); ++i) {
    if (i->second.id == watcher_id) {
      *handle = i->second.handle;
      return true;
    }
  }
  return false;
}

void WatcherBackend::OnHandleReady(const Handle& handle) {
  RemoveAndNotify(handle, MOJO_RESULT_OK);
}

void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
  RemoveAndNotify(handle, result);
}

// WatcherThreadManager --------------------------------------------------------

// WatcherThreadManager manages the background thread that listens for handles
// to be ready. All requests are handled by WatcherBackend.
}  // namespace

class WatcherThreadManager {
 public:
  ~WatcherThreadManager();

  // Returns the shared instance.
  static WatcherThreadManager* GetInstance();

  // Starts watching the requested handle. Returns a unique ID that is used to
  // stop watching the handle. When the handle is ready |callback| is notified
  // on the thread StartWatching() was invoked on.
  // This may be invoked on any thread.
  WatcherID StartWatching(const Handle& handle,
                          MojoHandleSignals handle_signals,
                          base::TimeTicks deadline,
                          const base::Callback<void(MojoResult)>& callback);

  // Stops watching a handle.
  // This may be invoked on any thread.
  void StopWatching(WatcherID watcher_id);

 private:
  enum RequestType {
    REQUEST_START,
    REQUEST_STOP,
  };

  // See description of |requests_| for details.
  struct RequestData {
    RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}

    RequestType type;
    WatchData start_data;
    WatcherID stop_id;
    base::WaitableEvent* stop_event;
  };

  typedef std::vector<RequestData> Requests;

  friend struct DefaultSingletonTraits<WatcherThreadManager>;

  WatcherThreadManager();

  // Schedules a request on the background thread. See |requests_| for details.
  void AddRequest(const RequestData& data);

  // Processes requests added to |requests_|. This is invoked on the backend
  // thread.
  void ProcessRequestsOnBackendThread();

  base::Thread thread_;

  base::AtomicSequenceNumber watcher_id_generator_;

  WatcherBackend backend_;

  // Protects |requests_|.
  base::Lock lock_;

  // Start/Stop result in adding a RequestData to |requests_| (protected by
  // |lock_|). When the background thread wakes up it processes the requests.
  Requests requests_;

  DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
};

WatcherThreadManager::~WatcherThreadManager() {
  thread_.Stop();
}

WatcherThreadManager* WatcherThreadManager::GetInstance() {
  return Singleton<WatcherThreadManager>::get();
}

WatcherID WatcherThreadManager::StartWatching(
    const Handle& handle,
    MojoHandleSignals handle_signals,
    base::TimeTicks deadline,
    const base::Callback<void(MojoResult)>& callback) {
  RequestData request_data;
  request_data.type = REQUEST_START;
  request_data.start_data.id = watcher_id_generator_.GetNext();
  request_data.start_data.handle = handle;
  request_data.start_data.callback = callback;
  request_data.start_data.handle_signals = handle_signals;
  request_data.start_data.deadline = deadline;
  request_data.start_data.message_loop = base::MessageLoopProxy::current();
  DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
            request_data.start_data.message_loop.get());
  AddRequest(request_data);
  return request_data.start_data.id;
}

void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
  // Handle the case of StartWatching() followed by StopWatching() before
  // |thread_| woke up.
  {
    base::AutoLock auto_lock(lock_);
    for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
      if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
        // Watcher ids are not reused, so if we find it we can stop.
        requests_.erase(i);
        return;
      }
    }
  }

  base::ThreadRestrictions::ScopedAllowWait allow_wait;
  base::WaitableEvent event(true, false);
  RequestData request_data;
  request_data.type = REQUEST_STOP;
  request_data.stop_id = watcher_id;
  request_data.stop_event = &event;
  AddRequest(request_data);

  // We need to block until the handle is actually removed.
  event.Wait();
}

void WatcherThreadManager::AddRequest(const RequestData& data) {
  {
    base::AutoLock auto_lock(lock_);
    const bool was_empty = requests_.empty();
    requests_.push_back(data);
    if (!was_empty)
      return;
  }
  // We own |thread_|, so it's safe to use Unretained() here.
  thread_.message_loop()->PostTask(
      FROM_HERE,
      base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
                 base::Unretained(this)));
}

void WatcherThreadManager::ProcessRequestsOnBackendThread() {
  DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());

  Requests requests;
  {
    base::AutoLock auto_lock(lock_);
    requests_.swap(requests);
  }
  for (size_t i = 0; i < requests.size(); ++i) {
    if (requests[i].type == REQUEST_START) {
      backend_.StartWatching(requests[i].start_data);
    } else {
      backend_.StopWatching(requests[i].stop_id);
      requests[i].stop_event->Signal();
    }
  }
}

WatcherThreadManager::WatcherThreadManager()
    : thread_(kWatcherThreadName) {
  base::Thread::Options thread_options;
  thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
  thread_.StartWithOptions(thread_options);
}

// HandleWatcher::StateBase and subclasses -------------------------------------

// The base class of HandleWatcher's state. Owns the user's callback and
// monitors the current thread's MessageLoop to know when to force the callback
// to run (with an error) even though the pipe hasn't been signaled yet.
class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
 public:
  StateBase(HandleWatcher* watcher,
            const base::Callback<void(MojoResult)>& callback)
      : watcher_(watcher),
        callback_(callback),
        got_ready_(false) {
    base::MessageLoop::current()->AddDestructionObserver(this);
  }

  ~StateBase() override {
    base::MessageLoop::current()->RemoveDestructionObserver(this);
  }

 protected:
  void NotifyHandleReady(MojoResult result) {
    got_ready_ = true;
    NotifyAndDestroy(result);
  }

  bool got_ready() const { return got_ready_; }

 private:
  void WillDestroyCurrentMessageLoop() override {
    // The current thread is exiting. Simulate a watch error.
    NotifyAndDestroy(MOJO_RESULT_ABORTED);
  }

  void NotifyAndDestroy(MojoResult result) {
    base::Callback<void(MojoResult)> callback = callback_;
    watcher_->Stop();  // Destroys |this|.

    callback.Run(result);
  }

  HandleWatcher* watcher_;
  base::Callback<void(MojoResult)> callback_;

  // Have we been notified that the handle is ready?
  bool got_ready_;

  DISALLOW_COPY_AND_ASSIGN(StateBase);
};

// If the thread on which HandleWatcher is used runs MessagePumpMojo,
// SameThreadWatchingState is used to directly watch the handle on the same
// thread.
class HandleWatcher::SameThreadWatchingState : public StateBase,
                                               public MessagePumpMojoHandler {
 public:
  SameThreadWatchingState(HandleWatcher* watcher,
                          const Handle& handle,
                          MojoHandleSignals handle_signals,
                          MojoDeadline deadline,
                          const base::Callback<void(MojoResult)>& callback)
      : StateBase(watcher, callback),
        handle_(handle) {
    DCHECK(MessagePumpMojo::IsCurrent());

    MessagePumpMojo::current()->AddHandler(
        this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
  }

  ~SameThreadWatchingState() override {
    if (!got_ready())
      MessagePumpMojo::current()->RemoveHandler(handle_);
  }

 private:
  // MessagePumpMojoHandler overrides:
  void OnHandleReady(const Handle& handle) override {
    StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
  }

  void OnHandleError(const Handle& handle, MojoResult result) override {
    StopWatchingAndNotifyReady(handle, result);
  }

  void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
    DCHECK_EQ(handle.value(), handle_.value());
    MessagePumpMojo::current()->RemoveHandler(handle_);
    NotifyHandleReady(result);
  }

  Handle handle_;

  DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
};

// If the thread on which HandleWatcher is used runs a message pump different
// from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
// handle on the handle watcher thread.
class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
 public:
  SecondaryThreadWatchingState(HandleWatcher* watcher,
                               const Handle& handle,
                               MojoHandleSignals handle_signals,
                               MojoDeadline deadline,
                               const base::Callback<void(MojoResult)>& callback)
      : StateBase(watcher, callback),
        weak_factory_(this) {
    watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
        handle,
        handle_signals,
        MojoDeadlineToTimeTicks(deadline),
        base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
                   weak_factory_.GetWeakPtr()));
  }

  ~SecondaryThreadWatchingState() override {
    // If we've been notified the handle is ready (|got_ready()| is true) then
    // the watch has been implicitly removed by
    // WatcherThreadManager/MessagePumpMojo and we don't have to call
    // StopWatching(). To do so would needlessly entail posting a task and
    // blocking until the background thread services it.
    if (!got_ready())
      WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
  }

 private:
  WatcherID watcher_id_;

  // Used to weakly bind |this| to the WatcherThreadManager.
  base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;

  DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
};

// HandleWatcher ---------------------------------------------------------------

HandleWatcher::HandleWatcher() {
}

HandleWatcher::~HandleWatcher() {
}

void HandleWatcher::Start(const Handle& handle,
                          MojoHandleSignals handle_signals,
                          MojoDeadline deadline,
                          const base::Callback<void(MojoResult)>& callback) {
  DCHECK(handle.is_valid());
  DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);

  // Need to clear the state before creating a new one.
  state_.reset();
  if (MessagePumpMojo::IsCurrent()) {
    state_.reset(new SameThreadWatchingState(
        this, handle, handle_signals, deadline, callback));
  } else {
    state_.reset(new SecondaryThreadWatchingState(
        this, handle, handle_signals, deadline, callback));
  }
}

void HandleWatcher::Stop() {
  state_.reset();
}

}  // namespace common
}  // namespace mojo
