blob: 3d5011ce64a4a27c1f23b01a8b3d23636ace33bb [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/logging.h"
#include "services/media/framework/parts/reader_cache.h"
namespace mojo {
namespace media {
// static
std::shared_ptr<ReaderCache> ReaderCache::Create(
std::shared_ptr<Reader> upstream_reader) {
return std::shared_ptr<ReaderCache>(new ReaderCache(upstream_reader));
}
ReaderCache::ReaderCache(std::shared_ptr<Reader> upstream_reader) {
upstream_reader->Describe(
[this, upstream_reader](Result result, size_t size, bool can_seek) {
store_.Initialize(result, size, can_seek);
describe_is_complete_.Occur();
if (result == Result::kOk) {
intake_.Start(&store_, upstream_reader);
}
});
}
ReaderCache::~ReaderCache() {}
void ReaderCache::Describe(const DescribeCallback& callback) {
describe_is_complete_.When([this, callback]() { store_.Describe(callback); });
}
void ReaderCache::ReadAt(size_t position,
uint8_t* buffer,
size_t bytes_to_read,
const ReadAtCallback& callback) {
DCHECK(buffer);
DCHECK(bytes_to_read > 0);
read_at_request_.Start(position, buffer, bytes_to_read, callback);
describe_is_complete_.When(
[this]() { store_.SetReadAtRequest(&read_at_request_); });
}
ReaderCache::ReadAtRequest::ReadAtRequest() {
in_progress_ = false;
}
ReaderCache::ReadAtRequest::~ReadAtRequest() {}
void ReaderCache::ReadAtRequest::Start(size_t position,
uint8_t* buffer,
size_t bytes_to_read,
const ReadAtCallback& callback) {
DCHECK(!in_progress_) << "concurrent calls to ReadAt are not allowed";
in_progress_ = true;
position_ = position;
buffer_ = buffer;
original_bytes_to_read_ = bytes_to_read;
remaining_bytes_to_read_ = bytes_to_read;
callback_ = callback;
}
void ReaderCache::ReadAtRequest::CopyFrom(uint8_t* source, size_t byte_count) {
DCHECK(source);
DCHECK(byte_count <= remaining_bytes_to_read_);
std::memcpy(buffer_, source, byte_count);
position_ += byte_count;
buffer_ += byte_count;
remaining_bytes_to_read_ -= byte_count;
}
void ReaderCache::ReadAtRequest::Complete(Result result) {
DCHECK(original_bytes_to_read_ >= remaining_bytes_to_read_);
size_t bytes_read = original_bytes_to_read_ - remaining_bytes_to_read_;
// If we've read 0 bytes, something must be wrong.
DCHECK((bytes_read == 0) == (result != Result::kOk));
ReadAtCallback callback;
callback_.swap(callback);
in_progress_ = false;
callback(result, bytes_read);
}
ReaderCache::Store::Store() {}
ReaderCache::Store::~Store() {}
void ReaderCache::Store::Initialize(Result result, size_t size, bool can_seek) {
base::AutoLock lock(lock_);
result_ = result;
size_ = size;
can_seek_ = can_seek;
// Create one hole spanning the entire asset.
sparse_byte_buffer_.Initialize(size_);
intake_hole_ = sparse_byte_buffer_.FindHoleContaining(0);
read_hole_ = sparse_byte_buffer_.null_hole();
read_region_ = sparse_byte_buffer_.null_region();
}
void ReaderCache::Store::Describe(const DescribeCallback& callback) {
Result result;
{
base::AutoLock lock(lock_);
result = result_;
}
callback(result, size_, can_seek_);
}
void ReaderCache::Store::SetReadAtRequest(ReadAtRequest* request) {
base::AutoLock lock(lock_);
DCHECK(read_request_ == nullptr);
DCHECK(request->position() < size_);
DCHECK(request->remaining_bytes_to_read() > 0);
read_request_ = request;
read_request_position_ = request->position();
read_request_remaining_bytes_ = request->remaining_bytes_to_read();
if (read_request_position_ + read_request_remaining_bytes_ > size_) {
read_request_remaining_bytes_ = size_ - read_request_position_;
}
ServeRequest();
}
size_t ReaderCache::Store::GetIntakePositionAndSize(size_t* size_out) {
DCHECK(size_out);
base::AutoLock lock(lock_);
size_t size = kDefaultReadSize;
if (read_hole_ != sparse_byte_buffer_.null_hole()) {
// To serve the read request, we need to intake starting at the beginning
// of read_hole_;
DCHECK(read_request_);
intake_hole_ = read_hole_;
read_hole_ = sparse_byte_buffer_.null_hole();
size = read_request_remaining_bytes_;
} else if (intake_hole_ == sparse_byte_buffer_.null_hole()) {
*size_out = 0;
return kUnknownSize;
}
if (size > intake_hole_.size()) {
size = intake_hole_.size();
}
*size_out = size;
return intake_hole_.position();
}
void ReaderCache::Store::PutIntakeBuffer(size_t position,
std::vector<uint8_t>&& buffer) {
base::AutoLock lock(lock_);
DCHECK(intake_hole_ != sparse_byte_buffer_.null_hole());
DCHECK(position == intake_hole_.position());
DCHECK(buffer.size() != 0);
DCHECK(buffer.size() <= intake_hole_.size());
if (read_hole_ != sparse_byte_buffer_.null_hole() &&
read_hole_.position() >= position &&
read_hole_.position() < position + buffer.size()) {
// read_hole_ was set after the termination of GetIntakePositionAndSize
// and before this point. We're in the process of delivering the requested
// data, so we don't need read_hole_ to be set anymore.
read_hole_ = sparse_byte_buffer_.null_hole();
}
intake_hole_ = sparse_byte_buffer_.Fill(intake_hole_, std::move(buffer));
if (read_request_ != nullptr) {
ServeRequest();
}
}
void ReaderCache::Store::ReportIntakeError(Result result) {
DCHECK(result != Result::kOk);
base::AutoLock lock(lock_);
result_ = result;
if (read_request_ != nullptr) {
ServeRequest();
}
}
void ReaderCache::Store::ServeRequest() {
lock_.AssertAcquired();
while (result_ == Result::kOk && read_request_remaining_bytes_ != 0u) {
read_region_ = sparse_byte_buffer_.FindRegionContaining(
read_request_position_, read_region_);
if (read_region_ == sparse_byte_buffer_.null_region()) {
// There's no region in the store for this position. Arrange for intake
// to fill this need.
read_hole_ = sparse_byte_buffer_.FindOrCreateHole(read_request_position_,
intake_hole_);
return;
}
// Perform the copy.
DCHECK(read_region_.position() <= read_request_position_);
DCHECK(read_region_.position() + read_region_.size() >
read_request_position_);
size_t bytes_to_copy = (read_region_.position() + read_region_.size()) -
read_request_position_;
if (bytes_to_copy > read_request_remaining_bytes_) {
bytes_to_copy = read_request_remaining_bytes_;
}
DCHECK(bytes_to_copy > 0);
read_request_->CopyFrom(read_region_.data() + (read_request_position_ -
read_region_.position()),
bytes_to_copy);
read_request_position_ += bytes_to_copy;
read_request_remaining_bytes_ -= bytes_to_copy;
}
// Done with this request. Complete it.
ReadAtRequest* read_request = read_request_;
read_request_ = nullptr;
base::AutoUnlock unlock(lock_);
read_request->Complete(result_);
}
ReaderCache::Intake::Intake() {}
ReaderCache::Intake::~Intake() {}
void ReaderCache::Intake::Start(Store* store,
std::shared_ptr<Reader> upstream_reader) {
DCHECK(store != nullptr);
DCHECK(upstream_reader);
store_ = store;
upstream_reader_ = upstream_reader;
Continue();
}
void ReaderCache::Intake::Continue() {
size_t size;
size_t position = store_->GetIntakePositionAndSize(&size);
if (position == kUnknownSize) {
return;
}
DCHECK(size > 0);
DCHECK(buffer_.size() == 0);
buffer_.resize(size);
upstream_reader_->ReadAt(position, buffer_.data(), size,
[this, position](Result result, size_t bytes_read) {
if (result != Result::kOk) {
LOG(ERROR) << "ReadAt failed";
store_->ReportIntakeError(result);
return;
}
DCHECK(bytes_read != 0);
store_->PutIntakeBuffer(position,
std::move(buffer_));
DCHECK(buffer_.size() == 0);
Continue();
});
}
} // namespace media
} // namespace mojo