blob: 8fb5244dda35511d2e09e98628f580a7147b02a7 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <limits>
#include <string>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "services/media/framework_mojo/mojo_reader.h"
#include "services/media/framework_mojo/mojo_type_conversions.h"
namespace mojo {
namespace media {
MojoReader::MojoReader(InterfaceHandle<SeekingReader> seeking_reader)
: seeking_reader_(SeekingReaderPtr::Create(seeking_reader.Pass())) {
task_runner_ = base::MessageLoop::current()->task_runner();
DCHECK(task_runner_);
read_in_progress_ = false;
seeking_reader_->Describe(
[this](MediaResult result, uint64_t size, bool can_seek) {
result_ = Convert(result);
if (result_ == Result::kOk) {
size_ = size;
can_seek_ = can_seek;
}
ready_.Occur();
});
}
MojoReader::~MojoReader() {}
void MojoReader::Describe(const DescribeCallback& callback) {
ready_.When([this, callback]() { callback(result_, size_, can_seek_); });
}
void MojoReader::ReadAt(size_t position,
uint8_t* buffer,
size_t bytes_to_read,
const ReadAtCallback& callback) {
DCHECK(buffer);
DCHECK(bytes_to_read);
DCHECK(!read_in_progress_)
<< "ReadAt called while previous call still in progress";
read_in_progress_ = true;
read_at_position_ = position;
read_at_buffer_ = buffer;
read_at_bytes_to_read_ = bytes_to_read;
read_at_callback_ = callback;
// ReadAt may be called on non-mojo threads, so we use the runner.
task_runner_->PostTask(FROM_HERE, base::Bind(&MojoReader::ContinueReadAt,
base::Unretained(this)));
}
void MojoReader::ContinueReadAt() {
ready_.When([this]() {
if (result_ != Result::kOk) {
CompleteReadAt(result_);
return;
}
DCHECK(read_at_position_ < size_);
if (read_at_position_ + read_at_bytes_to_read_ > size_) {
read_at_bytes_to_read_ = size_ - read_at_position_;
}
read_at_bytes_remaining_ = read_at_bytes_to_read_;
if (read_at_position_ == consumer_handle_position_) {
ReadResponseBody();
return;
}
consumer_handle_.reset();
consumer_handle_position_ = kUnknownSize;
if (!can_seek_ && read_at_position_ != 0) {
CompleteReadAt(Result::kInvalidArgument);
return;
}
seeking_reader_->ReadAt(
read_at_position_,
[this](MediaResult result,
ScopedDataPipeConsumerHandle consumer_handle) {
result_ = Convert(result);
if (result_ != Result::kOk) {
CompleteReadAt(result_);
return;
}
consumer_handle_ = consumer_handle.Pass();
consumer_handle_position_ = read_at_position_;
ReadResponseBody();
});
});
}
void MojoReader::ReadResponseBody() {
DCHECK(read_at_bytes_remaining_ < std::numeric_limits<uint32_t>::max());
uint32_t byte_count = static_cast<uint32_t>(read_at_bytes_remaining_);
MojoResult result = ReadDataRaw(consumer_handle_.get(), read_at_buffer_,
&byte_count, MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
byte_count = 0;
} else if (result != MOJO_RESULT_OK) {
LOG(ERROR) << "ReadDataRaw failed " << result;
FailReadAt(result);
return;
}
read_at_buffer_ += byte_count;
read_at_bytes_remaining_ -= byte_count;
consumer_handle_position_ += byte_count;
if (read_at_bytes_remaining_ == 0) {
CompleteReadAt(Result::kOk, read_at_bytes_to_read_);
return;
}
Environment::GetDefaultAsyncWaiter()->AsyncWait(
consumer_handle_.get().value(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_DEADLINE_INDEFINITE, MojoReader::ReadResponseBodyStatic, this);
}
void MojoReader::CompleteReadAt(Result result, size_t bytes_read) {
ReadAtCallback read_at_callback;
read_at_callback_.swap(read_at_callback);
read_in_progress_ = false;
read_at_callback(result, bytes_read);
}
void MojoReader::FailReadAt(MojoResult result) {
result_ = ConvertResult(result);
consumer_handle_.reset();
consumer_handle_position_ = kUnknownSize;
CompleteReadAt(result_);
}
// static
void MojoReader::ReadResponseBodyStatic(void* reader_void_ptr,
MojoResult result) {
MojoReader* reader = reinterpret_cast<MojoReader*>(reader_void_ptr);
if (result != MOJO_RESULT_OK) {
LOG(ERROR) << "AsyncWait failed " << result;
reader->FailReadAt(result);
return;
}
reader->ReadResponseBody();
}
} // namespace media
} // namespace mojo