// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/services/network/url_loader_impl.h"

#include <memory>

#include "base/memory/scoped_vector.h"
#include "base/message_loop/message_loop.h"
#include "mojo/common/common_type_converters.h"
#include "mojo/common/url_type_converters.h"
#include "mojo/services/network/net_adapters.h"
#include "mojo/services/network/network_context.h"
#include "net/base/elements_upload_data_stream.h"
#include "net/base/io_buffer.h"
#include "net/base/load_flags.h"
#include "net/base/upload_bytes_element_reader.h"
#include "net/http/http_response_headers.h"
#include "net/url_request/redirect_info.h"
#include "net/url_request/url_request_context.h"

namespace mojo {
namespace {

// Generates an URLResponsePtr from the response state of a net::URLRequest.
URLResponsePtr MakeURLResponse(const net::URLRequest* url_request) {
  URLResponsePtr response(URLResponse::New());
  response->url = String::From(url_request->url());

  const net::HttpResponseHeaders* headers = url_request->response_headers();
  if (headers) {
    response->status_code = headers->response_code();
    response->status_line = headers->GetStatusLine();

    response->headers = Array<HttpHeaderPtr>::New(0);
    std::vector<String> header_lines;
    void* iter = nullptr;
    std::string name, value;
    while (headers->EnumerateHeaderLines(&iter, &name, &value)) {
      HttpHeaderPtr header = HttpHeader::New();
      header->name = name;
      header->value = value;
      response->headers.push_back(header.Pass());
    }
  }

  std::string mime_type;
  url_request->GetMimeType(&mime_type);
  response->mime_type = mime_type;

  std::string charset;
  url_request->GetCharset(&charset);
  response->charset = charset;

  return response.Pass();
}

// Reads the request body upload data from a DataPipe.
class UploadDataPipeElementReader : public net::UploadElementReader {
 public:
  UploadDataPipeElementReader(ScopedDataPipeConsumerHandle pipe)
      : pipe_(pipe.Pass()), num_bytes_(0) {}
  ~UploadDataPipeElementReader() override {}

  // UploadElementReader overrides:
  int Init(const net::CompletionCallback& callback) override {
    offset_ = 0;
    ReadDataRaw(pipe_.get(), nullptr, &num_bytes_, MOJO_READ_DATA_FLAG_QUERY);
    return net::OK;
  }
  uint64 GetContentLength() const override { return num_bytes_; }
  uint64 BytesRemaining() const override { return num_bytes_ - offset_; }
  bool IsInMemory() const override { return false; }
  int Read(net::IOBuffer* buf,
           int buf_length,
           const net::CompletionCallback& callback) override {
    uint32_t bytes_read = std::min(static_cast<uint32_t>(BytesRemaining()),
                                   static_cast<uint32_t>(buf_length));
    if (bytes_read > 0) {
      ReadDataRaw(pipe_.get(), buf->data(), &bytes_read,
                  MOJO_READ_DATA_FLAG_NONE);
    }

    offset_ += bytes_read;
    return bytes_read;
  }

 private:
  ScopedDataPipeConsumerHandle pipe_;
  uint32_t num_bytes_;
  uint32_t offset_;

  DISALLOW_COPY_AND_ASSIGN(UploadDataPipeElementReader);
};

}  // namespace

// Each body fetcher takes ownership of a net::URLRequest and stream its data
// to a data pipe. It is owned by an URLLoaderImpl and will notify its owner
// when either the data pipe is closed or the request is finished.
class URLLoaderImpl::BodyFetcher : public net::URLRequest::Delegate {
 public:
  BodyFetcher(URLLoaderImpl* loader,
              uint32_t id,
              scoped_ptr<net::URLRequest> url_request,
              ScopedDataPipeProducerHandle response_body_stream)
      : loader_(loader),
        id_(id),
        url_request_(url_request.Pass()),
        response_body_stream_(response_body_stream.Pass()),
        weak_ptr_factory_(this) {
    url_request_->set_delegate(this);
    ListenForPeerClosed();
  }

  void Start() {
    ReadMore();
  }

  uint32_t id() { return id_; }

  URLLoaderStatusPtr QueryStatus() {
    URLLoaderStatusPtr status(URLLoaderStatus::New());
    if (url_request_) {
      status->is_loading = url_request_->is_pending();
      if (!url_request_->status().is_success())
        status->error = MakeNetworkError(url_request_->status().error());
    } else {
      status->is_loading = false;
    }
    return status.Pass();
  }

 private:
  // net::URLRequest::Delegate methods:
  void OnReceivedRedirect(net::URLRequest* url_request,
                          const net::RedirectInfo& redirect_info,
                          bool* defer_redirect) override {
    // This should already have been called.
    DCHECK(false);
  }

  void OnResponseStarted(net::URLRequest* url_request) override {
    // This should already have been called.
    DCHECK(false);
  }

  void OnReadCompleted(net::URLRequest* url_request, int bytes_read) override {
    DCHECK(url_request == url_request_.get());

    if (url_request->status().is_success()) {
      DidRead(static_cast<uint32_t>(bytes_read), false);
    } else {
      handle_watcher_.Stop();
      pending_write_ = nullptr;  // This closes the data pipe.
      DeleteIfNeeded();
      return;
    }
  }

  void OnResponseBodyStreamReady(MojoResult result) {
    // TODO(darin): Handle a bad |result| value.

    // Continue watching the handle in case the peer is closed.
    ListenForPeerClosed();
    ReadMore();
  }

  void OnResponseBodyStreamClosed(MojoResult result) {
    url_request_.reset();
    response_body_stream_.reset();
    pending_write_ = nullptr;
    DeleteIfNeeded();
  }

  void ReadMore() {
    DCHECK(!pending_write_.get());

    uint32_t num_bytes;
    MojoResult result = NetToMojoPendingBuffer::BeginWrite(
        &response_body_stream_, &pending_write_, &num_bytes);

    if (result == MOJO_RESULT_SHOULD_WAIT) {
      // The pipe is full. We need to wait for it to have more space.
      handle_watcher_.Start(
          response_body_stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
          MOJO_DEADLINE_INDEFINITE,
          base::Bind(&URLLoaderImpl::BodyFetcher::OnResponseBodyStreamReady,
                     base::Unretained(this)));
      return;
    } else if (result != MOJO_RESULT_OK) {
      // The response body stream is in a bad state. Bail.
      // TODO(darin): How should this be communicated to our client?
      handle_watcher_.Stop();
      response_body_stream_.reset();
      DeleteIfNeeded();
      return;
    }
    CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);

    scoped_refptr<net::IOBuffer> buf(
        new NetToMojoIOBuffer(pending_write_.get()));

    int bytes_read;
    url_request_->Read(buf.get(), static_cast<int>(num_bytes), &bytes_read);
    if (url_request_->status().is_io_pending()) {
      // Wait for OnReadCompleted.
    } else if (url_request_->status().is_success() && bytes_read > 0) {
      DidRead(static_cast<uint32_t>(bytes_read), true);
    } else {
      handle_watcher_.Stop();
      pending_write_->Complete(0);
      pending_write_ = nullptr;  // This closes the data pipe.
      DeleteIfNeeded();
      return;
    }
  }

  void DidRead(uint32_t num_bytes, bool completed_synchronously) {
    DCHECK(url_request_->status().is_success());

    response_body_stream_ = pending_write_->Complete(num_bytes);
    pending_write_ = nullptr;

    if (completed_synchronously) {
      base::MessageLoop::current()->PostTask(
          FROM_HERE, base::Bind(&URLLoaderImpl::BodyFetcher::ReadMore,
                                weak_ptr_factory_.GetWeakPtr()));
    } else {
      ReadMore();
    }
  }

  void DeleteIfNeeded() {
    bool has_data_pipe =
        pending_write_.get() || response_body_stream_.is_valid();
    if (!has_data_pipe) {
      loader_->OnBodyFetcherDone(this);
      // The callback deleted this object.
    }
  }

  void ListenForPeerClosed() {
    handle_watcher_.Start(
        response_body_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
        MOJO_DEADLINE_INDEFINITE,
        base::Bind(&URLLoaderImpl::BodyFetcher::OnResponseBodyStreamClosed,
                   base::Unretained(this)));
  }

  URLLoaderImpl* loader_;
  uint32_t id_;
  scoped_ptr<net::URLRequest> url_request_;
  ScopedDataPipeProducerHandle response_body_stream_;
  scoped_refptr<NetToMojoPendingBuffer> pending_write_;
  common::HandleWatcher handle_watcher_;
  base::WeakPtrFactory<URLLoaderImpl::BodyFetcher> weak_ptr_factory_;
};

URLLoaderImpl::URLLoaderImpl(NetworkContext* context,
                             InterfaceRequest<URLLoader> request,
                             std::vector<URLLoaderInterceptorPtr> interceptors)
    : context_(context),
      interceptors_(std::move(interceptors)),
      response_body_buffer_size_(0),
      auto_follow_redirects_(true),
      current_fetcher_id_(0),
      binding_(this, request.Pass()) {
  for (auto& interceptor : interceptors_) {
    interceptor.set_error_handler(this);
  }
  interceptor_index_ = interceptors_.size() - 1;
  binding_.set_error_handler(this);
  context_->RegisterURLLoader(this);
}

URLLoaderImpl::~URLLoaderImpl() {
  context_->DeregisterURLLoader(this);
}

void URLLoaderImpl::Cleanup() {
  // The associated network context is going away and we have to destroy
  // net::URLRequest hold by this loader.
  delete this;
}

void URLLoaderImpl::Start(URLRequestPtr request,
                          const Callback<void(URLResponsePtr)>& callback) {
  if (url_request_) {
    SendError(net::ERR_UNEXPECTED, callback);
    return;
  }

  callback_ = callback;
  StartInternal(request.Pass());
}

void URLLoaderImpl::FollowRedirect(
    const Callback<void(URLResponsePtr)>& callback) {
  if (!redirect_info_) {
    DLOG(ERROR) << "Spurious call to FollowRedirect";
    SendError(net::ERR_UNEXPECTED, callback);
    return;
  }
  DCHECK(url_request_);
  DCHECK(url_request_->is_redirecting());
  DCHECK(!auto_follow_redirects_);

  callback_ = callback;
  FollowRedirectInternal();
}

void URLLoaderImpl::QueryStatus(
    const Callback<void(URLLoaderStatusPtr)>& callback) {
  URLLoaderStatusPtr status(URLLoaderStatus::New());
  if (url_request_) {
    // A url request is owned by this class, the status is deduced from it.
    status->is_loading = url_request_->is_pending();
    if (!url_request_->status().is_success())
      status->error = MakeNetworkError(url_request_->status().error());
  } else {
    if (!body_fetchers_.empty()) {
      // At least one body fetcher is active, the status is deduced from it.
      status = body_fetchers_.back()->QueryStatus();
    } else if (last_status_) {
      // At least one body fetcher has been started and finished, the status is
      // deduced from its status when it finished.
      status = last_status_.Clone();
    } else {
      // No network request has been made yet.
      status->is_loading = false;
    }
  }
  // TODO(darin): Populate more status fields.
  callback.Run(status.Pass());
}

void URLLoaderImpl::OnConnectionError() {
  binding_.Close();
  if (body_fetchers_.empty())
    delete this;
}

void URLLoaderImpl::OnReceivedRedirect(net::URLRequest* url_request,
                                       const net::RedirectInfo& redirect_info,
                                       bool* defer_redirect) {
  DCHECK(url_request == url_request_.get());
  DCHECK(url_request->status().is_success());
  DCHECK(!redirect_info_);

  if (auto_follow_redirects_)
    return;

  // Send the redirect response to the client, allowing them to inspect it and
  // optionally follow the redirect.
  *defer_redirect = true;

  redirect_info_.reset(new net::RedirectInfo(redirect_info));
  URLResponsePtr response = MakeURLResponse(url_request);
  response->redirect_method = redirect_info.new_method;
  response->redirect_url = String::From(redirect_info.new_url);
  response->redirect_referrer = redirect_info.new_referrer;

  SendResponse(response.Pass());
}

void URLLoaderImpl::OnResponseStarted(net::URLRequest* url_request) {
  DCHECK(url_request == url_request_.get());
  DCHECK(!redirect_info_);

  if (!url_request->status().is_success()) {
    SendError(url_request->status().error(), callback_);
    callback_ = Callback<void(URLResponsePtr)>();
    return;
  }

  // TODO(darin): Add support for optional MIME sniffing.

  DataPipe data_pipe;
  // TODO(darin): Honor given buffer size.

  URLResponsePtr response = MakeURLResponse(url_request);
  response->body = data_pipe.consumer_handle.Pass();

  // Build the body fetcher.
  std::unique_ptr<BodyFetcher> body_fetcher(
      new BodyFetcher(this, ++current_fetcher_id_, url_request_.Pass(),
                      data_pipe.producer_handle.Pass()));

  SendResponse(response.Pass());
  body_fetchers_.push_back(std::move(body_fetcher));
  body_fetchers_.back()->Start();
}

void URLLoaderImpl::OnReadCompleted(net::URLRequest* url_request,
                                    int bytes_read) {
  // This should never be called on this object.
  DCHECK(false);
}

void URLLoaderImpl::SendError(
    int error_code,
    const Callback<void(URLResponsePtr)>& callback) {
  URLResponsePtr response(URLResponse::New());
  if (url_request_)
    response->url = String::From(url_request_->url());
  response->error = MakeNetworkError(error_code);
  callback.Run(response.Pass());
}

void URLLoaderImpl::OnIntercept(
    URLLoaderInterceptorResponsePtr interceptor_response) {
  if (interceptor_response->request && interceptor_response->response) {
    SendError(net::ERR_INVALID_ARGUMENT, callback_);
    return;
  }

  if (!interceptor_response) {
    // Interceptor can follow redirect only if a redirect is in progress.
    if (!redirect_info_) {
      SendError(net::ERR_INVALID_ARGUMENT, callback_);
      return;
    }
    // Request from the user to the the loader are intercepted from the newest
    // to the oldest interceptor.
    interceptor_index_--;
    FollowRedirectInternal();
    return;
  }

  // The interceptor returned a request.
  if (interceptor_response->request) {
    url_request_.reset();
    redirect_info_.reset();
    // Request from the user to the the loader are intercepted from the newest
    // to the oldest interceptor.
    interceptor_index_--;
    StartInternal(interceptor_response->request.Pass());
    return;
  }

  // The interceptor returned a response.
  // Check that if the response is a redirect, it didn't change the redirect
  // info.
  // TODO(qsr) Handles this case. The problem is that if the interceptor changed
  // the redirect info and then later on the client send a follow redirect, this
  // class will just transmit the follow redirect to the underlying url_fetcher
  // that will just redirect to the initial URL. To fix this, this class will
  // need to detect this and recreate a new url fetcher with the correct
  // request.
  if (interceptor_response->response->redirect_method) {
    if (!redirect_info_ ||
        interceptor_response->response->redirect_method !=
            redirect_info_->new_method ||
        interceptor_response->response->redirect_url !=
            String::From(redirect_info_->new_url) ||
        interceptor_response->response->redirect_referrer !=
            redirect_info_->new_referrer) {
      LOG(WARNING) << "Interceptor returned a response for a redirect.";
      SendError(net::ERR_INVALID_ARGUMENT, callback_);
      return;
    }
  } else {
    url_request_.reset();
    redirect_info_.reset();
  }
  // Response from the loader to the user are intercepted from the oldest to the
  // newest interceptor.
  interceptor_index_++;
  SendResponse(interceptor_response->response.Pass());
}

void URLLoaderImpl::FollowRedirectInternal() {
  DCHECK(url_request_);
  DCHECK(url_request_->is_redirecting());
  DCHECK(redirect_info_);

  if (interceptor_index_ >= 0 &&
      interceptor_index_ < static_cast<int>(interceptors_.size())) {
    interceptors_[interceptor_index_]->InterceptFollowRedirect(base::Bind(
        &URLLoaderImpl::OnIntercept, base::Unretained(this)));
    return;
  }

  DCHECK(interceptor_index_ == -1);
  interceptor_index_ = 0;
  redirect_info_.reset();
  // TODO(darin): Verify that it makes sense to call FollowDeferredRedirect.
  url_request_->FollowDeferredRedirect();
}

void URLLoaderImpl::StartInternal(URLRequestPtr request) {
  DCHECK(!url_request_);
  DCHECK(!redirect_info_);

  if (interceptor_index_ >= 0 &&
      interceptor_index_ < static_cast<int>(interceptors_.size())) {
    interceptors_[interceptor_index_]->InterceptRequest(
        request.Pass(), base::Bind(&URLLoaderImpl::OnIntercept,
                                   base::Unretained(this)));
    return;
  }

  DCHECK(interceptor_index_ == -1);
  interceptor_index_ = 0;
  // Start the network request.
  url_request_ = context_->url_request_context()->CreateRequest(
      GURL(request->url), net::DEFAULT_PRIORITY, this);
  url_request_->set_method(request->method);
  // TODO(jam): need to specify this policy.
  url_request_->set_referrer_policy(
      net::URLRequest::CLEAR_REFERRER_ON_TRANSITION_FROM_SECURE_TO_INSECURE);
  if (request->headers) {
    net::HttpRequestHeaders headers;
    for (size_t i = 0; i < request->headers.size(); ++i) {
      base::StringPiece header =
          request->headers[i]->name.To<base::StringPiece>();
      base::StringPiece value =
          request->headers[i]->value.To<base::StringPiece>();
      if (header == net::HttpRequestHeaders::kReferer) {
        url_request_->SetReferrer(value.as_string());
      } else {
        headers.SetHeader(header, value);
      }
    }
    url_request_->SetExtraRequestHeaders(headers);
  }
  if (request->body) {
    ScopedVector<net::UploadElementReader> element_readers;
    for (size_t i = 0; i < request->body.size(); ++i) {
      element_readers.push_back(
          new UploadDataPipeElementReader(request->body[i].Pass()));
    }
    url_request_->set_upload(make_scoped_ptr<net::UploadDataStream>(
        new net::ElementsUploadDataStream(element_readers.Pass(), 0)));
  }
  if (request->bypass_cache)
    url_request_->SetLoadFlags(net::LOAD_BYPASS_CACHE);

  response_body_buffer_size_ = request->response_body_buffer_size;
  auto_follow_redirects_ = request->auto_follow_redirects;

  url_request_->Start();
}

void URLLoaderImpl::SendResponse(URLResponsePtr response) {
  if (interceptor_index_ >= 0 &&
      interceptor_index_ < static_cast<int>(interceptors_.size())) {
    interceptors_[interceptor_index_]->InterceptResponse(
        response.Pass(),
        base::Bind(&URLLoaderImpl::OnIntercept, base::Unretained(this)));
    return;
  }

  DCHECK(interceptor_index_ == static_cast<int>(interceptors_.size()));
  interceptor_index_ = interceptors_.size() - 1;
  Callback<void(URLResponsePtr)> callback;
  std::swap(callback_, callback);
  callback.Run(response.Pass());
}

void URLLoaderImpl::OnBodyFetcherDone(BodyFetcher* fetcher) {
  auto it = std::find_if(body_fetchers_.begin(), body_fetchers_.end(),
                         [fetcher](const std::unique_ptr<BodyFetcher>& f) {
                           return fetcher == f.get();
                         });
  DCHECK(it != body_fetchers_.end());
  // This is the current fetcher. Capture the status.
  if ((*it)->id() == current_fetcher_id_) {
    last_status_ = fetcher->QueryStatus();
    last_status_->is_loading = false;
  }
  body_fetchers_.erase(it);
  if (body_fetchers_.empty() and !binding_.is_bound())
    delete this;
}

}  // namespace mojo
