blob: 6b47c89387154217dfb8fcd3235c530603eb60df [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/services/network/web_socket_read_queue.h"
#include <memory>
#include "base/bind.h"
#include "base/logging.h"
namespace mojo {
struct WebSocketReadQueue::Operation {
uint32_t num_bytes_;
base::Callback<void(const char*)> callback_;
};
WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
: handle_(handle), is_busy_(false), weak_factory_(this) {
}
WebSocketReadQueue::~WebSocketReadQueue() {
}
void WebSocketReadQueue::Read(uint32_t num_bytes,
base::Callback<void(const char*)> callback) {
Operation* op = new Operation;
op->num_bytes_ = num_bytes;
op->callback_ = callback;
queue_.push_back(op);
if (is_busy_)
return;
is_busy_ = true;
TryToRead();
}
void WebSocketReadQueue::TryToRead() {
DCHECK(is_busy_);
DCHECK(!queue_.empty());
do {
Operation* op = queue_[0];
// TODO(vtl): We could avoid allocating the buffer in the case that we'll
// have to wait by querying first. Or we could preallocate a buffer, etc.
std::unique_ptr<char[]> buffer(new char[op->num_bytes_]);
uint32_t bytes_read = op->num_bytes_;
MojoResult result = ReadDataRaw(handle_, buffer.get(), &bytes_read,
MOJO_READ_DATA_FLAG_ALL_OR_NONE);
// TODO(vtl): In the "out of range" case (there's data, but not enough
// data), this will cause it to spin (via the message loop). There's not
// much to be done about that until
// https://github.com/domokit/mojo/issues/442 is fixed (unless we want to do
// our own buffering).
if (result == MOJO_RESULT_SHOULD_WAIT ||
result == MOJO_RESULT_OUT_OF_RANGE) {
Wait();
return;
}
// Ensure |op| is deleted, whether or not |this| goes away.
std::unique_ptr<Operation> op_deleter(op);
queue_.weak_erase(queue_.begin());
// http://crbug.com/490193 This should run callback as well. May need to
// change the callback signature.
if (result != MOJO_RESULT_OK)
return;
uint32_t num_bytes = op_deleter->num_bytes_;
DCHECK_LE(num_bytes, bytes_read);
DataPipeConsumerHandle handle = handle_;
base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr());
// This call may delete |this|. In that case, |self| will be invalidated.
// It may re-enter Read() too. Because |is_busy_| is true during the whole
// process, TryToRead() won't be re-entered.
op->callback_.Run(buffer.get());
if (!self)
return;
} while (!queue_.empty());
is_busy_ = false;
}
void WebSocketReadQueue::Wait() {
DCHECK(is_busy_);
handle_watcher_.Start(
handle_,
MOJO_HANDLE_SIGNAL_READABLE,
MOJO_DEADLINE_INDEFINITE,
base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
}
void WebSocketReadQueue::OnHandleReady(MojoResult result) {
DCHECK(is_busy_);
TryToRead();
}
} // namespace mojo