blob: abf23f45d233306b01d51e23008941eab62d6575 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/environment/logging.h"
#include "mojo/public/cpp/utility/run_loop.h"
#include "mojo/services/media/common/cpp/circular_buffer_media_pipe_adapter.h"
#include "mojo/services/media/common/interfaces/media_common.mojom.h"
#include "mojo/services/media/common/interfaces/media_transport.mojom.h"
namespace mojo {
namespace media {
constexpr size_t CircularBufferMediaPipeAdapter::MappedPacket::kMaxRegions;
CircularBufferMediaPipeAdapter::MappedPacket::MappedPacket() { }
CircularBufferMediaPipeAdapter::MappedPacket::~MappedPacket() {
// If packet_ is non-null, it means that someone created a MappedPacket using
// CreateMediaPacket, but they never sent it, and never canceled it.
//
// TODO(johngro): Should we maintain a reference to our pipe adapter and just
// auto cancel this packet if the user forgets? This could be Very Bad if
// they have created and forgotten about multiple packets.
MOJO_DCHECK(packet_.is_null());
}
CircularBufferMediaPipeAdapter::PacketState::PacketState(
uint64_t post_consume_rd,
uint32_t seq_num,
const MediaConsumer::SendPacketCallback& cbk)
: post_consume_rd_(post_consume_rd),
seq_num_(seq_num),
cbk_(cbk) {}
CircularBufferMediaPipeAdapter::PacketState::~PacketState() { }
CircularBufferMediaPipeAdapter::CircularBufferMediaPipeAdapter(
MediaConsumerPtr pipe)
: pipe_(pipe.Pass())
, thiz_(new CircularBufferMediaPipeAdapter*(this)) {
MOJO_DCHECK(pipe_);
MOJO_DCHECK(RunLoop::current());
pipe_flush_cbk_ = MediaConsumer::FlushCallback(
[this] () {
HandleFlush();
});
pipe_.set_connection_error_handler(
[this]() {
Fault(MediaResult::CONNECTION_LOST);
});
std::shared_ptr<CircularBufferMediaPipeAdapter*> thiz(thiz_);
signalled_callback_ = Closure(
[thiz] () {
if (*thiz) {
(*thiz)->HandleSignalCallback();
}
});
}
CircularBufferMediaPipeAdapter::~CircularBufferMediaPipeAdapter() {
*thiz_ = nullptr;
Cleanup();
}
void CircularBufferMediaPipeAdapter::Init(uint64_t size) {
// Double Init? Fault unless the user is asking us to set up a buffer of the
// same size.
if (buffer_handle_.is_valid()) {
if (buffer_size_ != size) {
Fault(MediaResult::BAD_STATE);
}
return;
}
// Make the buffer
MojoResult res = CreateSharedBuffer(nullptr, size, &buffer_handle_);
if (res != MOJO_RESULT_OK) {
MOJO_LOG(ERROR) << "Failed to allocate buffer of size " << size
<< " in " << __PRETTY_FUNCTION__
<< " (error " << res << ")";
Fault(MediaResult::INSUFFICIENT_RESOURCES);
return;
}
buffer_size_ = size;
// Map the buffer
//
// TODO(johngro) : We really only need write access to this buffer.
// Ideally, we could request that using flags, but there does not seem to be
// a way to do this right now.
res = MapBuffer(buffer_handle_.get(),
0,
buffer_size_,
&buffer_,
MOJO_MAP_BUFFER_FLAG_NONE);
if (res != MOJO_RESULT_OK) {
MOJO_LOG(ERROR) << "Failed to map buffer in " << __PRETTY_FUNCTION__
<< " (error " << res << ")";
Fault(MediaResult::UNKNOWN_ERROR);
return;
}
// Duplicate the buffer and send it to the other side of the pipe.
//
// TODO(johngro) : It would be nice if we could restrict this handle to be
// read-only.
ScopedSharedBufferHandle duplicated_handle;
res = DuplicateBuffer(buffer_handle_.get(), nullptr, &duplicated_handle);
if (res != MOJO_RESULT_OK) {
MOJO_LOG(ERROR) << "Failed to duplicate handle in " << __PRETTY_FUNCTION__
<< " (error " << res << ")";
Fault(MediaResult::UNKNOWN_ERROR);
return;
}
pipe_->SetBuffer(duplicated_handle.Pass(), []() {});
}
void CircularBufferMediaPipeAdapter::SetSignalCallback(SignalCbk cbk) {
bool schedule;
signal_cbk_ = cbk;
schedule = (signal_cbk_ != nullptr);
// If the user supplied a non-null callback, make sure we schedule a
// callback if we are currently signalled.
if (schedule) {
UpdateSignalled();
}
}
void CircularBufferMediaPipeAdapter::SetWatermarks(uint64_t hi_water_mark,
uint64_t lo_water_mark) {
// Nothing to do if the arguments make no sense.
MOJO_DCHECK(hi_water_mark >= lo_water_mark);
if (hi_water_mark < lo_water_mark) {
return;
}
hi_water_mark_ = hi_water_mark;
lo_water_mark_ = lo_water_mark;
// Marks have moved, check if we should be signalled or not as a result.
UpdateSignalled();
}
MediaResult CircularBufferMediaPipeAdapter::CreateMediaPacket(
uint64_t size,
bool no_wrap,
MappedPacket* packet) {
// Args ok?
MOJO_DCHECK(packet);
if (nullptr == packet) {
return MediaResult::INVALID_ARGUMENT;
}
// If we are faulted, or busy, we cannot proceed.
if (Faulted()) { return MediaResult::BAD_STATE; }
if (Busy()) { return MediaResult::BUSY; }
// Are we attempting to allocate something larger than the buffer can
// possibly hold?
MOJO_DCHECK(buffer_size_);
if (size > (buffer_size_ - 1)) {
return MediaResult::INSUFFICIENT_RESOURCES;
}
// Where should this allocation begin?
//
// If no-wrap was requested, and the end of the buffer comes before the read
// pointer, and the distance between the write pointer and the end of the
// buffer is too small to hold the requested size, then we need to pad out
// to the start of the circular buffer. Otherwise, the allocation can start
// where the write pointer currently is.
//
// TODO(johngro): someday, take alignment restrictions into account.
uint64_t alloc_start = wr_;
if ((no_wrap) && (wr_ > rd_) && ((buffer_size_ - wr_) < size)) {
alloc_start = 0;
} else {
alloc_start = wr_;
}
// Where should this allocation end, in non-modulo space?
uint64_t alloc_end = alloc_start + size;
// Does the end of the buffer exist within the pending region of the buffer?
// If so, we do not have the space for this packet's payload.
MOJO_DCHECK(wr_ < buffer_size_);
MOJO_DCHECK(rd_ < buffer_size_);
uint64_t non_mod_wr = wr_ + ((rd_ > wr_) ? buffer_size_ : 0);
MOJO_DCHECK(non_mod_wr >= rd_);
if ((alloc_end >= rd_) && (alloc_end < non_mod_wr)) {
return MediaResult::INSUFFICIENT_RESOURCES;
}
// Looks like we have the room. Allocate and fill out our packet.
const MediaPacketPtr& p = (packet->packet_ = MediaPacket::New());
const MediaPacketRegionPtr& r1 = (p->payload = MediaPacketRegion::New());
const MediaPacketRegionPtr* r2 = nullptr;
r1->offset = alloc_start;
if (alloc_end > buffer_size_) {
p->extra_payload = Array<MediaPacketRegionPtr>::New(1);
p->extra_payload[0] = MediaPacketRegion::New();
r1->length = buffer_size_ - alloc_start;
MOJO_DCHECK(size > r1->length);
r2 = &(p->extra_payload[0]);
(*r2)->offset = 0;
(*r2)->length = size - r1->length;
} else {
p->extra_payload = Array<MediaPacketRegionPtr>::New(0);
r1->length = size;
}
// Fill out the bookkeeping in our internal MappedPacket structure.
packet->data_[0] = reinterpret_cast<uint8_t*>(buffer_) + r1->offset;
packet->length_[0] = r1->length;
packet->cancel_wr_ = wr_;
packet->flush_generation_ = flush_generation_;
if (nullptr != r2) {
packet->data_[1] = reinterpret_cast<uint8_t*>(buffer_) + (*r2)->offset;
packet->length_[1] = (*r2)->length;
} else {
packet->data_[1] = nullptr;
packet->length_[1] = 0;
}
// Update our circular buffer bookkeeping, and we are done.
wr_ = alloc_end % buffer_size_;
// now that we have moved the write pointer, we may be signalled again.
// Need to re-evaluate.
UpdateSignalled();
return MediaResult::OK;
}
MediaResult CircularBufferMediaPipeAdapter::SendMediaPacket(
MappedPacket* packet,
const MediaConsumer::SendPacketCallback& cbk) {
MOJO_DCHECK(packet && !packet->packet_.is_null());
if (!packet || packet->packet_.is_null()) {
return MediaResult::INVALID_ARGUMENT;
}
const MediaPacketPtr& p = packet->packet_;
MOJO_DCHECK(!p->extra_payload.is_null());
MOJO_DCHECK(p->extra_payload.size() <= 1u);
uint64_t post_consume_rd = p->extra_payload.size()
? (p->extra_payload[0]->offset + p->extra_payload[0]->length)
: (p->payload->offset + p->payload->length);
// If the pipe is broken, make sure that we have entered into a fatal error
// state.
if (!pipe_.is_bound()) {
Fault(MediaResult::CONNECTION_LOST);
}
// Sometime between when the user created this packet, and when they got
// around to sending it, we either faulted, or we flushed one or more times.
// Reset the packet, and tell the user that their payload was flushed (it
// effectively was), so they know to not expect their callback to ever be
// called.
if (Faulted() || (packet->flush_generation_ != flush_generation_)) {
packet->Reset();
return MediaResult::FLUSHED;
}
// There should be no way for us to be busy at this point in time.
//
// Users cannot create MappedPackets while we are in the process of getting
// state, so there should be no way for us to be here with a valid
// MappedPacket while we are in the process of getting state.
//
// Similarly, users cannot create packets while a flush is in progress, and
// the flush generation for a packet is captured as it is created. When a
// flush starts, the flush generation gets bumped. If there is a flush in
// progress, then the packet we have now should have a different flush
// generation from our current flush generation, we should have bailed out
// already in check above.
MOJO_DCHECK(!Busy());
MOJO_DCHECK(post_consume_rd <= buffer_size_);
if (post_consume_rd == buffer_size_)
post_consume_rd = 0;
uint32_t seq_num = seq_num_gen_++;
in_flight_queue_.emplace_back(post_consume_rd, seq_num, cbk);
pipe_->SendPacket(
packet->packet_.Pass(),
[this, seq_num](MediaConsumer::SendResult result) {
HandleSendPacket(seq_num, result);
});
packet->Reset();
return MediaResult::OK;
}
MediaResult CircularBufferMediaPipeAdapter::CancelMediaPacket(
MappedPacket* packet) {
MOJO_DCHECK(packet && !packet->packet_.is_null());
if (!packet || packet->packet_.is_null()) {
return MediaResult::INVALID_ARGUMENT;
}
// See comment in SendMediaPacket about these checks.
if (Faulted() || (packet->flush_generation_ != flush_generation_)) {
packet->Reset();
return MediaResult::FLUSHED;
}
MOJO_DCHECK(!Busy());
wr_ = packet->cancel_wr_;
packet->Reset();
UpdateSignalled();
return MediaResult::OK;
}
MediaResult CircularBufferMediaPipeAdapter::Flush() {
if (Faulted()) { return MediaResult::INTERNAL_ERROR; }
if (Busy()) { return MediaResult::BUSY; }
// TODO(johngro) : if our bookkeeping indicates that we are already
// flushed, do we skip this or do we play dumb and do the flush anyway?
// For now, we play dumb.
in_flight_queue_.clear();
rd_ = wr_ = 0;
flush_in_progress_ = true;
flush_generation_++;
pipe_->Flush(pipe_flush_cbk_);
return MediaResult::OK;
}
void CircularBufferMediaPipeAdapter::HandleSendPacket(
uint32_t seq_num,
MediaConsumer::SendResult result) {
MediaConsumer::SendPacketCallback cbk;
do {
// There should be at least one element in the in-flight queue, and the
// front of the queue's sequence number should match the sequence number of
// the payload being returned to us.
if (!in_flight_queue_.size() ||
(in_flight_queue_.front().seq_num_ != seq_num)) {
Fault(MediaResult::UNKNOWN_ERROR);
break;
}
uint64_t new_rd = in_flight_queue_.front().post_consume_rd_;
cbk = in_flight_queue_.front().cbk_;
in_flight_queue_.pop_front();
// Only update our internal bookkeeping if we are not in the process of
// flushing.
if (!flush_in_progress_) {
// Now that this buffer has been consumed, the read pointer we are moving
// to must exist somewhere between the current read pointer, and the
// current write pointer (inclusively). If we fail this check, we must
// have some pretty serious internal bookkeeping trouble.
uint64_t new_rd_non_modulo = new_rd + ((new_rd < rd_) ? buffer_size_ : 0);
uint64_t wr_non_modulo = wr_ + ((wr_ < rd_) ? buffer_size_ : 0);
if (!((new_rd_non_modulo >= rd_) &&
(new_rd_non_modulo <= wr_non_modulo))) {
Fault(MediaResult::UNKNOWN_ERROR);
break;
}
// Everything checks out. Advance our read pointer, re-evaluate our
// signalled vs. non-signalled state.
rd_ = new_rd;
UpdateSignalled();
}
} while (false);
// If we managed to find a user registered callback for this packet, go ahead
// and execute it now.
if (!cbk.is_null()) {
cbk.Run(result);
}
}
void CircularBufferMediaPipeAdapter::HandleFlush() {
// If we are in a perma-fault state, ignore this callback.
if (Faulted()) { return; }
if (!flush_in_progress_) {
// If we don't think that there should be a flush in progress at this point,
// then something is seriously wrong. The other end of the pipe should not
// be sending us flush complete callbacks if there is no flush in progress.
Fault(MediaResult::PROTOCOL_ERROR);
return;
}
// We are no longer flushing. Update our bookkeeping and re-evaluate our
// signalled vs. non-signalled state.
rd_ = wr_ = 0;
in_flight_queue_.pop_front();
flush_in_progress_ = false;
UpdateSignalled();
}
void CircularBufferMediaPipeAdapter::HandleSignalCallback() {
// Clear the scheduled flag (also, it better be set otherwise how did we get
// here?)
MOJO_DCHECK(cbk_scheduled_);
cbk_scheduled_ = false;
// If we have made our final fault callback, or we are no longer signalled,
// squash this callback.
if (fault_cbk_made_ || !signalled_) {
return;
}
// Looks like we should be dispatching this callback (presuming that we still
// have one)
if (signal_cbk_ != nullptr) {
// If we are about to reported our final fatal state, set the flag to ensure
// we make no further callbacks.
if (Faulted()) {
fault_cbk_made_ = true;
}
// Perform the callback.
signal_cbk_(internal_state_);
}
}
void CircularBufferMediaPipeAdapter::UpdateSignalled() {
if (Faulted()) {
// If we are in the unrecoverable fault state, we are signalled.
signalled_ = true;
} else if (Busy()) {
// If we are busy, we are not signalled.
signalled_ = false;
} else {
uint64_t pending = GetPending();
if (!signalled_ && (pending < lo_water_mark_)) {
// If we were not signalled, and the amt of pending data has dropped below
// the low water mark, we are now signalled.
signalled_ = true;
} else if (signalled_ && (pending >= hi_water_mark_)) {
// If we were signalled, and the amt of pending data has reached the high
// water mark, we are now no longer signalled.
signalled_ = false;
}
}
// Schedule a callback if we are signalled, we don't already have a callback
// scheduled, and we have not made our final fault callback.
if (signalled_ && !cbk_scheduled_ && !fault_cbk_made_) {
RunLoop* loop = RunLoop::current();
MOJO_DCHECK(loop);
loop->PostDelayedTask(signalled_callback_, 0);
cbk_scheduled_ = true;
}
}
void CircularBufferMediaPipeAdapter::Fault(MediaResult reason) {
if (MediaResult::OK == internal_state_) {
MOJO_LOG(ERROR) << "circular buffer media pipe adapter entering "
"unrecoverable fault state (reason = "
<< reason << ")";
internal_state_ = reason;
Cleanup();
}
UpdateSignalled();
}
void CircularBufferMediaPipeAdapter::Cleanup() {
// Close the connection to the service.
if (pipe_.is_bound()) {
pipe_.reset();
}
// If our buffer had been mapped, un-map it. Then release it and reset our
// internal state.
if (nullptr != buffer_) {
MojoResult res;
res = UnmapBuffer(buffer_);
MOJO_CHECK(res == MOJO_RESULT_OK);
}
buffer_ = nullptr;
buffer_handle_.reset();
rd_ = wr_ = 0;
in_flight_queue_.clear();
signal_cbk_ = nullptr;
internal_state_ = MediaResult::SHUTTING_DOWN;
}
uint64_t CircularBufferMediaPipeAdapter::GetPending() const {
// If we are not currently in sync with the other end of our pipe (we are only
// in sync if we have the shared buffer mapped into our address space), then
// there is no pending data.
if (nullptr == buffer_)
return 0;
return (wr_ - rd_) + ((wr_ < rd_) ? buffer_size_ : 0);
}
uint64_t CircularBufferMediaPipeAdapter::GetBufferSize() const {
if (nullptr == buffer_)
return 0;
MOJO_DCHECK(buffer_size_);
return buffer_size_ - 1;
}
} // namespace media
} // namespace mojo