blob: 00dac4ed277c98b29a2745748d8e0fce186bd753 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/environment/logging.h"
#include "mojo/public/cpp/utility/run_loop.h"
#include "mojo/services/media/common/cpp/circular_buffer_media_pipe_adapter.h"
#include "mojo/services/media/common/interfaces/media_common.mojom.h"
#include "mojo/services/media/common/interfaces/media_pipe.mojom.h"
namespace mojo {
namespace media {
constexpr size_t CircularBufferMediaPipeAdapter::MappedPacket::kMaxRegions;
CircularBufferMediaPipeAdapter::MappedPacket::MappedPacket() { }
CircularBufferMediaPipeAdapter::MappedPacket::~MappedPacket() {
// If packet_ is non-null, it means that someone created a MappedPacket using
// CreateMediaPacket, but they never sent it, and never canceled it.
//
// TODO(johngro): Should we maintain a reference to our pipe adapter and just
// auto cancel this packet if the user forgets? This could be Very Bad if
// they have created and forgotten about multiple packets.
MOJO_DCHECK(packet_.is_null());
}
CircularBufferMediaPipeAdapter::PacketState::PacketState(
uint64_t post_consume_rd,
uint32_t seq_num,
const MediaPipe::SendPacketCallback& cbk)
: post_consume_rd_(post_consume_rd),
seq_num_(seq_num),
cbk_(cbk) {}
CircularBufferMediaPipeAdapter::PacketState::~PacketState() { }
CircularBufferMediaPipeAdapter::CircularBufferMediaPipeAdapter(
MediaPipePtr pipe)
: pipe_(pipe.Pass()) {
MOJO_DCHECK(pipe_);
MOJO_DCHECK(RunLoop::current());
pipe_get_state_cbk_ = MediaPipe::GetStateCallback(
[this] (MediaPipeStatePtr state) {
HandleGetState(state.Pass());
});
pipe_flush_cbk_ = MediaPipe::FlushCallback(
[this] (MediaResult result) {
HandleFlush(result);
});
handle_signal_cbk_ = Closure([this] () { HandleSignalCallback(); });
// Begin by getting a hold of the shared buffer from our pipe over which we
// will push data.
// TODO(johngro): if the pipe is broken, go into a fatal error state
MOJO_DCHECK(get_state_in_progress_);
pipe_->GetState(pipe_get_state_cbk_);
}
CircularBufferMediaPipeAdapter::~CircularBufferMediaPipeAdapter() {
std::lock_guard<std::mutex> lock(signal_lock_);
CleanupLocked();
}
void CircularBufferMediaPipeAdapter::SetSignalCallback(SignalCbk cbk) {
bool schedule;
{
std::lock_guard<std::mutex> lock(signal_cbk_lock_);
signal_cbk_ = cbk;
schedule = (signal_cbk_ != nullptr);
}
// If the user supplied a non-null callback, make sure we schedule a
// callback if we are currently signalled.
if (schedule) {
std::lock_guard<std::mutex> lock(signal_lock_);
UpdateSignalledLocked();
}
}
void CircularBufferMediaPipeAdapter::SetWatermarks(uint64_t hi_water_mark,
uint64_t lo_water_mark) {
// Nothing to do if the arguments make no sense.
MOJO_DCHECK(hi_water_mark >= lo_water_mark);
if (hi_water_mark < lo_water_mark) {
return;
}
{
std::lock_guard<std::mutex> lock(signal_lock_);
hi_water_mark_ = hi_water_mark;
lo_water_mark_ = lo_water_mark;
// Marks have moved, check if we should be signalled or not as a result.
UpdateSignalledLocked();
}
}
MediaResult CircularBufferMediaPipeAdapter::CreateMediaPacket(
uint64_t size,
bool no_wrap,
MappedPacket* packet) {
// Args ok?
MOJO_DCHECK(packet);
if (nullptr == packet) {
return MediaResult::INVALID_ARGUMENT;
}
{
std::lock_guard<std::mutex> lock(signal_lock_);
// If we are faulted, or busy, we cannot proceed.
if (FaultedLocked()) { return MediaResult::BAD_STATE; }
if (BusyLocked()) { return MediaResult::BUSY; }
// Are we attempting to allocate something larger than the buffer can
// possibly hold?
MOJO_DCHECK(buffer_size_);
if (size > (buffer_size_ - 1)) {
return MediaResult::INSUFFICIENT_RESOURCES;
}
// Where should this allocation begin?
//
// If no-wrap was requested, and the end of the buffer comes before the read
// pointer, and the distance between the write pointer and the end of the
// buffer is too small to hold the requested size, then we need to pad out
// to the start of the circular buffer. Otherwise, the allocation can start
// where the write pointer currently is.
//
// TODO(johngro): someday, take alignment restrictions into account.
uint64_t alloc_start = wr_;
if ((no_wrap) && (wr_ > rd_) && ((buffer_size_ - wr_) < size)) {
alloc_start = 0;
} else {
alloc_start = wr_;
}
// Where should this allocation end, in non-modulo space?
uint64_t alloc_end = alloc_start + size;
// Does the end of the buffer exist within the pending region of the buffer?
// If so, we do not have the space for this packet's payload.
MOJO_DCHECK(wr_ < buffer_size_);
MOJO_DCHECK(rd_ < buffer_size_);
uint64_t non_mod_wr = wr_ + ((rd_ > wr_) ? buffer_size_ : 0);
MOJO_DCHECK(non_mod_wr >= rd_);
if ((alloc_end >= rd_) && (alloc_end < non_mod_wr)) {
return MediaResult::INSUFFICIENT_RESOURCES;
}
// Looks like we have the room. Allocate and fill out our packet.
const MediaPacketPtr& p = (packet->packet_ = MediaPacket::New());
const MediaPacketRegionPtr& r1 = (p->payload = MediaPacketRegion::New());
const MediaPacketRegionPtr* r2 = nullptr;
r1->offset = alloc_start;
if (alloc_end > buffer_size_) {
p->extra_payload = Array<MediaPacketRegionPtr>::New(1);
p->extra_payload[0] = MediaPacketRegion::New();
r1->length = buffer_size_ - alloc_start;
MOJO_DCHECK(size > r1->length);
r2 = &(p->extra_payload[0]);
(*r2)->offset = 0;
(*r2)->length = size - r1->length;
} else {
p->extra_payload = Array<MediaPacketRegionPtr>::New(0);
r1->length = size;
}
// Fill out the bookkeeping in our internal MappedPacket structure.
packet->data_[0] = reinterpret_cast<uint8_t*>(buffer_) + r1->offset;
packet->length_[0] = r1->length;
packet->cancel_wr_ = wr_;
packet->flush_generation_ = flush_generation_;
if (nullptr != r2) {
packet->data_[1] = reinterpret_cast<uint8_t*>(buffer_) + (*r2)->offset;
packet->length_[1] = (*r2)->length;
} else {
packet->data_[1] = nullptr;
packet->length_[1] = 0;
}
// Update our circular buffer bookkeeping, and we are done.
wr_ = alloc_end % buffer_size_;
// now that we have moved the write pointer, we may be signalled again.
// Need to re-evaluate.
UpdateSignalledLocked();
return MediaResult::OK;
}
}
MediaResult CircularBufferMediaPipeAdapter::SendMediaPacket(
MappedPacket* packet,
const MediaPipe::SendPacketCallback& cbk) {
MOJO_DCHECK(packet && !packet->packet_.is_null());
if (!packet || packet->packet_.is_null()) {
return MediaResult::INVALID_ARGUMENT;
}
const MediaPacketPtr& p = packet->packet_;
MOJO_DCHECK(!p->extra_payload.is_null());
MOJO_DCHECK(p->extra_payload.size() <= 1u);
uint64_t post_consume_rd = p->extra_payload.size()
? (p->extra_payload[0]->offset + p->extra_payload[0]->length)
: (p->payload->offset + p->payload->length);
{
std::lock_guard<std::mutex> lock(signal_lock_);
// Sometime between when the user created this packet, and when they got
// around to sending it, we either faulted, or we flushed one or more times.
// Reset the packet, and tell the user that their payload was flushed (it
// effectively was), so they know to not expect their callback to ever be
// called.
if (FaultedLocked() || (packet->flush_generation_ != flush_generation_)) {
packet->Reset();
return MediaResult::FLUSHED;
}
// There should be no way for us to be busy at this point in time.
//
// Users cannot create MappedPackets while we are in the process of getting
// state, so there should be no way for us to be here with a valid
// MappedPacket while we are in the process of getting state.
//
// Similarly, users cannot create packets while a flush is in progress, and
// the flush generation for a packet is captured as it is created. When a
// flush starts, the flush generation gets bumped. If there is a flush in
// progress, then the packet we have now should have a different flush
// generation from our current flush generation, we should have bailed out
// already in check above.
MOJO_DCHECK(!BusyLocked());
MOJO_DCHECK(post_consume_rd <= buffer_size_);
if (post_consume_rd == buffer_size_)
post_consume_rd = 0;
uint32_t seq_num = seq_num_gen_++;
in_flight_queue_.emplace_back(post_consume_rd, seq_num, cbk);
// TODO(johngro) : if the pipe is broken, go into a fatal error state
pipe_->SendPacket(
packet->packet_.Pass(),
[this, seq_num](MediaResult result) {
HandleSendPacket(seq_num, result);
});
packet->Reset();
}
return MediaResult::OK;
}
MediaResult CircularBufferMediaPipeAdapter::CancelMediaPacket(
MappedPacket* packet) {
MOJO_DCHECK(packet && !packet->packet_.is_null());
if (!packet || packet->packet_.is_null()) {
return MediaResult::INVALID_ARGUMENT;
}
{
std::lock_guard<std::mutex> lock(signal_lock_);
// See comment in SendMediaPacket about these checks.
if (FaultedLocked() || (packet->flush_generation_ != flush_generation_)) {
packet->Reset();
return MediaResult::FLUSHED;
}
MOJO_DCHECK(!BusyLocked());
wr_ = packet->cancel_wr_;
packet->Reset();
UpdateSignalledLocked();
}
return MediaResult::OK;
}
MediaResult CircularBufferMediaPipeAdapter::Flush() {
std::lock_guard<std::mutex> lock(signal_lock_);
if (FaultedLocked()) { return MediaResult::INTERNAL_ERROR; }
if (BusyLocked()) { return MediaResult::BUSY; }
// TODO(johngro) : if our bookkeeping indicates that we are already
// flushed, do we skip this or do we play dumb and do the flush anyway?
// For now, we play dumb.
in_flight_queue_.clear();
rd_ = wr_ = 0;
flush_in_progress_ = true;
flush_generation_++;
// TODO(johngro): if the pipe is broken, go into a fatal error state
pipe_->Flush(pipe_flush_cbk_);
return MediaResult::OK;
}
void CircularBufferMediaPipeAdapter::HandleGetState(MediaPipeStatePtr state) {
MOJO_DCHECK(state); // We must have a state structure.
MOJO_DCHECK(!buffer_); // We must not have already mapped a buffer.
MOJO_DCHECK(get_state_in_progress_); // We should be waiting for our cbk.
std::lock_guard<std::mutex> lock(signal_lock_);
// Success or failure, we are no longer waiting for our get state callback.
get_state_in_progress_ = false;
rd_ = wr_ = 0;
// Double init? How did that happen?
if (buffer_handle_.is_valid() || (nullptr != buffer_)) {
MOJO_LOG(ERROR) << "Double init during " << __PRETTY_FUNCTION__;
FaultLocked(MediaResult::UNKNOWN_ERROR);
return;
}
// No shared buffer? That's a fatal error.
if (!state->payload_buffer.is_valid()) {
MOJO_LOG(ERROR) << "Null payload buffer in " << __PRETTY_FUNCTION__;
FaultLocked(MediaResult::UNKNOWN_ERROR);
return;
}
// stash our state
buffer_handle_ = state->payload_buffer.Pass();
buffer_size_ = state->payload_buffer_len;
// Sanity checks the buffer size.
if (!buffer_size_ || (buffer_size_ > MediaPipeState::kMaxPayloadLen)) {
MOJO_LOG(ERROR) << "Bad buffer size in " << __PRETTY_FUNCTION__
<< " (" << buffer_size_ << ")";
FaultLocked(MediaResult::BAD_STATE);
return;
}
// Map our buffer
//
// TODO(johngro) : We really only need write access to this buffer.
// Ideally, we could request that using flags, but there does not seem to be
// a way to do this right now.
MojoResult res;
res = MapBuffer(buffer_handle_.get(),
0,
buffer_size_,
&buffer_,
MOJO_MAP_BUFFER_FLAG_NONE);
if (res != MOJO_RESULT_OK) {
MOJO_LOG(ERROR) << "Failed to map buffer in " << __PRETTY_FUNCTION__
<< " (error " << res << ")";
FaultLocked(MediaResult::UNKNOWN_ERROR);
return;
}
// Init is complete, we may be signalled now.
UpdateSignalledLocked();
}
void CircularBufferMediaPipeAdapter::HandleSendPacket(uint32_t seq_num,
MediaResult result) {
MediaPipe::SendPacketCallback cbk;
do {
std::lock_guard<std::mutex> lock(signal_lock_);
if (get_state_in_progress_) {
// If we are in the process of getting the initial state of the system,
// then something is seriously wrong. The other end of this interface is
// sending us Send callbacks while we are in the process of initializing
// (something which should be impossible)
FaultLocked(MediaResult::PROTOCOL_ERROR);
break;
}
// There should be at least one element in the in-flight queue, and the
// front of the queue's sequence number should match the sequence number of
// the payload being returned to us.
if (!in_flight_queue_.size() ||
(in_flight_queue_.front().seq_num_ != seq_num)) {
FaultLocked(MediaResult::UNKNOWN_ERROR);
break;
}
uint64_t new_rd = in_flight_queue_.front().post_consume_rd_;
cbk = in_flight_queue_.front().cbk_;
in_flight_queue_.pop_front();
// Only update our internal bookkeeping if we are not in the process of
// flushing.
if (!flush_in_progress_) {
// Now that this buffer has been consumed, the read pointer we are moving
// to must exist somewhere between the current read pointer, and the
// current write pointer (inclusively). If we fail this check, we must
// have some pretty serious internal bookkeeping trouble.
uint64_t new_rd_non_modulo = new_rd + ((new_rd < rd_) ? buffer_size_ : 0);
uint64_t wr_non_modulo = wr_ + ((wr_ < rd_) ? buffer_size_ : 0);
if (!((new_rd_non_modulo >= rd_) &&
(new_rd_non_modulo <= wr_non_modulo))) {
FaultLocked(MediaResult::UNKNOWN_ERROR);
break;
}
// Everything checks out. Advance our read pointer, re-evaluate our
// signalled vs. non-signalled state.
rd_ = new_rd;
UpdateSignalledLocked();
}
} while (false);
// If we managed to find a user registered callback for this packet, go ahead
// and execute it now.
if (!cbk.is_null()) {
cbk.Run(result);
}
}
void CircularBufferMediaPipeAdapter::HandleFlush(MediaResult result) {
std::lock_guard<std::mutex> lock(signal_lock_);
// If we are in a perma-fault state, ignore this callback.
if (FaultedLocked()) { return; }
if (!flush_in_progress_) {
// If we don't think that there should be a flush in progress at this point,
// then something is seriously wrong. The other end of the pipe should not
// be sending us flush complete callbacks if there is no flush in progress.
FaultLocked(MediaResult::PROTOCOL_ERROR);
return;
}
if (result != MediaResult::OK) {
// There is no reason the flush should ever fail (we block
// flushing-in-progress from our side of the interface). If something does
// go wrong, consider it a fault.
FaultLocked(result);
return;
}
// We are no longer flushing. Update our bookkeeping and re-evaluate our
// signalled vs. non-signalled state.
rd_ = wr_ = 0;
in_flight_queue_.pop_front();
flush_in_progress_ = false;
UpdateSignalledLocked();
}
void CircularBufferMediaPipeAdapter::HandleSignalCallback() {
MediaResult state;
std::lock_guard<std::mutex> lock(signal_cbk_lock_);
{
std::lock_guard<std::mutex> lock(signal_lock_);
// Clear the scheduled flag (also, it better be set otherwise how did we get
// here?)
MOJO_DCHECK(cbk_scheduled_);
cbk_scheduled_ = false;
// If we have made our final fault callback, or we are no longer signalled,
// squash this callback.
if (fault_cbk_made_ || !signalled_) {
return;
}
// Stash the internal state as we leave the signal lock. Our callback to
// the user must reflect the internal state of the system at the point that
// we leave the signal lock (which must not be held during the callback
// itself).
state = internal_state_;
}
// Looks like we should be dispatching this callback (presuming that we still
// have one)
if (signal_cbk_ != nullptr) {
// Perform the callback and clear the callback pointer if the user does
// not want to continue to receive callbacks.
if (!signal_cbk_(state)) {
signal_cbk_ = nullptr;
}
// If we just reported our final, fatal state, set the flag to ensure we
// make no further callbacks.
if (MediaResult::OK != state) {
std::lock_guard<std::mutex> lock(signal_lock_);
fault_cbk_made_ = true;
}
}
}
void CircularBufferMediaPipeAdapter::UpdateSignalledLocked() {
// TODO(johngro): Assert that we are holding the signal lock.
if (FaultedLocked()) {
// If we are in the unrecoverable fault state, we are signalled.
signalled_ = true;
} else if (BusyLocked()) {
// If we are busy, we are not signalled.
signalled_ = false;
} else {
uint64_t pending = GetPendingLocked();
if (!signalled_ && (pending < lo_water_mark_)) {
// If we were not signalled, and the amt of pending data has dropped below
// the low water mark, we are now signalled.
signalled_ = true;
} else if (signalled_ && (pending >= hi_water_mark_)) {
// If we were signalled, and the amt of pending data has reached the high
// water mark, we are now no longer signalled.
signalled_ = false;
}
}
// Schedule a callback if we are signalled, we don't already have a callback
// scheduled, and we have not made our final fault callback.
if (signalled_ && !cbk_scheduled_ && !fault_cbk_made_) {
RunLoop* loop = RunLoop::current();
MOJO_DCHECK(loop);
loop->PostDelayedTask(handle_signal_cbk_, 0);
cbk_scheduled_ = true;
}
}
void CircularBufferMediaPipeAdapter::FaultLocked(MediaResult reason) {
// TODO(johngro): Assert that we are holding the signal lock.
if (MediaResult::OK == internal_state_) {
MOJO_LOG(ERROR) << "cbuf media pipe entering unrecoverable fault state "
"(reason = " << reason << ")";
internal_state_ = reason;
CleanupLocked();
}
UpdateSignalledLocked();
}
void CircularBufferMediaPipeAdapter::CleanupLocked() {
// TODO(johngro): Assert that we are holding the signal lock.
// If our buffer had been mapped, un-map it. Then release it and reset our
// internal state.
if (nullptr != buffer_) {
MojoResult res;
res = UnmapBuffer(buffer_);
MOJO_CHECK(res == MOJO_RESULT_OK);
}
buffer_ = nullptr;
buffer_handle_.reset();
rd_ = wr_ = 0;
in_flight_queue_.clear();
}
uint64_t CircularBufferMediaPipeAdapter::GetPendingLocked() const {
// TODO(johngro): Assert that we are holding the signal lock.
// If we are not currently in sync with the other end of our pipe (we are only
// in sync if we have the shared buffer mapped into our address space), then
// there is no pending data.
if (nullptr == buffer_)
return 0;
return (wr_ - rd_) + ((wr_ < rd_) ? buffer_size_ : 0);
}
uint64_t CircularBufferMediaPipeAdapter::GetBufferSizeLocked() const {
// TODO(johngro): Assert that we are holding the signal lock.
if (nullptr == buffer_)
return 0;
MOJO_DCHECK(buffer_size_);
return buffer_size_ - 1;
}
} // namespace media
} // namespace mojo