blob: 980440cdfeb95867d748853f71e8896c6ed76b5a [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "services/media/framework/engine.h"
namespace mojo {
namespace media {
uint32_t Engine::Part::input_count() {
DCHECK(stage_ != nullptr);
return stage_->input_count();
}
Engine::Input Engine::Part::input(uint32_t index) {
DCHECK(stage_ != nullptr && index < stage_->input_count());
return Input(stage_, index);
}
Engine::Input Engine::Part::input() {
DCHECK(stage_ != nullptr && stage_->input_count() == 1);
return Input(stage_, 0);
}
uint32_t Engine::Part::output_count() {
DCHECK(stage_ != nullptr);
return stage_->output_count();
}
Engine::Output Engine::Part::output(uint32_t index) {
DCHECK(stage_ != nullptr && index < stage_->output_count());
return Output(stage_, index);
}
Engine::Output Engine::Part::output() {
DCHECK(stage_ != nullptr && stage_->output_count() == 1);
return Output(stage_, 0);
}
Engine::Part Engine::Part::upstream_part(uint32_t index) {
DCHECK(stage_ != nullptr && index < stage_->input_count());
return Part(stage_->input(index).upstream_stage());
}
Engine::Part Engine::Part::upstream_part() {
DCHECK(stage_ != nullptr && stage_->input_count() == 1);
return Part(stage_->input(0).upstream_stage());
}
Engine::Part Engine::Part::downstream_part(uint32_t index) {
DCHECK(stage_ != nullptr && index < stage_->output_count());
return Part(stage_->output(index).downstream_stage());
}
Engine::Part Engine::Part::downstream_part() {
DCHECK(stage_ != nullptr && stage_->output_count() == 1);
return Part(stage_->output(0).downstream_stage());
}
Engine::Engine() {
update_function_ = [this](Stage* stage) {
DCHECK(stage);
base::AutoLock lock(lock_);
UpdateUnsafe(stage);
UpdateUnsafe();
};
}
Engine::~Engine() {
Reset();
}
void Engine::RemovePart(Part part) {
DCHECK(part);
base::AutoLock lock(lock_);
RemoveUnsafe(part.stage_);
}
Engine::Part Engine::Connect(Output output, Input input) {
DCHECK(output);
DCHECK(input);
base::AutoLock lock(lock_);
if (output.connected()) {
DisconnectOutputUnsafe(output.stage_, output.index_);
}
if (input.connected()) {
DisconnectInputUnsafe(input.stage_, input.index_);
}
output.stage_output().connect(input.stage_, input.index_);
input.stage_input().connect(output.stage_, output.index_);
return input.part();
}
Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) {
DCHECK(upstream_part);
DCHECK(downstream_part);
Connect(upstream_part.output(), downstream_part.input());
return downstream_part;
}
Engine::Part Engine::ConnectOutputToPart(
Output output,
Part downstream_part) {
DCHECK(output);
DCHECK(downstream_part);
Connect(output, downstream_part.input());
return downstream_part;
}
Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) {
DCHECK(upstream_part);
DCHECK(input);
Connect(upstream_part.output(), input);
return input.part();
}
void Engine::DisconnectOutput(Output output) {
DCHECK(output);
base::AutoLock lock(lock_);
DisconnectOutputUnsafe(output.stage_, output.index_);
}
void Engine::DisconnectInput(Input input) {
DCHECK(input);
base::AutoLock lock(lock_);
DisconnectInputUnsafe(input.stage_, input.index_);
}
void Engine::RemovePartsConnectedToPart(Part part) {
DCHECK(part);
base::AutoLock lock(lock_);
std::deque<Part> to_remove { part };
while (!to_remove.empty()) {
Part part = to_remove.front();
to_remove.pop_front();
for (uint32_t i = 0; i < part.input_count(); ++i) {
to_remove.push_back(part.upstream_part(i));
}
for (uint32_t i = 0; i < part.output_count(); ++i) {
to_remove.push_back(part.downstream_part(i));
}
RemoveUnsafe(part.stage_);
}
}
void Engine::RemovePartsConnectedToOutput(Output output) {
DCHECK(output);
if (!output.connected()) {
return;
}
Part downstream_part = output.downstream_part();
DisconnectOutput(output);
RemovePartsConnectedToPart(downstream_part);
}
void Engine::RemovePartsConnectedToInput(Input input) {
DCHECK(input);
if (!input.connected()) {
return;
}
Part upstream_part = input.upstream_part();
DisconnectInput(input);
RemovePartsConnectedToPart(upstream_part);
}
void Engine::Prepare() {
base::AutoLock lock(lock_);
for (Stage* sink : sinks_) {
sink->Prepare(update_function_);
sink->prepared_ = true;
uint32_t input_count = sink->input_count();
for (uint32_t input_index = 0; input_index < input_count; input_index++) {
MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
}
}
}
void Engine::Prepare(Part part) {
DCHECK(part);
base::AutoLock lock(lock_);
MaybePrepareUnsafe(part.stage_);
}
void Engine::PrimeSinks() {
lock_.Acquire();
std::list<Stage*> sinks(sinks_);
lock_.Release();
// TODO(dalesat): Threading issue: these sinks may go away during priming.
for (Stage* sink : sinks) {
sink->Prime();
}
}
void Engine::Reset() {
base::AutoLock lock(lock_);
while (!supply_backlog_.empty()) {
supply_backlog_.pop();
}
while (!demand_backlog_.empty()) {
demand_backlog_.pop();
}
sources_.clear();
sinks_.clear();
while (!stages_.empty()) {
Stage* stage = stages_.front();
stages_.pop_front();
delete stage;
}
}
void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
lock_.AssertAcquired();
DCHECK(stage);
packets_produced_ = true;
if (!stage->in_supply_backlog_) {
supply_backlog_.push(stage);
stage->in_supply_backlog_ = true;
}
}
void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
lock_.AssertAcquired();
DCHECK(stage);
if (!stage->in_demand_backlog_) {
demand_backlog_.push(stage);
stage->in_demand_backlog_ = true;
}
}
Engine::Part Engine::Add(Stage* stage) {
base::AutoLock lock(lock_);
stages_.push_back(stage);
if (stage->input_count() == 0) {
sources_.push_back(stage);
}
if (stage->output_count() == 0) {
sinks_.push_back(stage);
}
return Part(stage);
}
void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
DCHECK(stage);
DCHECK(index < stage->output_count());
lock_.AssertAcquired();
StageOutput& stage_output = stage->output(index);
if (stage_output.downstream_stage() == nullptr) {
return;
}
stage_output.mate().disconnect();
stage_output.disconnect();
}
void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
DCHECK(stage);
DCHECK(index < stage->input_count());
lock_.AssertAcquired();
StageInput& stage_input = stage->input(index);
if (stage_input.upstream_stage() == nullptr) {
return;
}
stage_input.mate().disconnect();
stage_input.disconnect();
}
void Engine::RemoveUnsafe(Stage* stage) {
DCHECK(stage);
lock_.AssertAcquired();
uint32_t input_count = stage->input_count();
for (uint32_t input_index = 0; input_index < input_count; input_index++) {
if (stage->input(input_index).connected()) {
DisconnectInputUnsafe(stage, input_index);
}
}
uint32_t output_count = stage->output_count();
for (uint32_t output_index = 0; output_index < output_count; output_index++) {
if (stage->output(output_index).connected()) {
DisconnectOutputUnsafe(stage, output_index);
}
}
sources_.remove(stage);
sinks_.remove(stage);
stages_.remove(stage);
delete stage;
}
// static
Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
return new DistributorStage(source);
}
// static
Stage* Engine::CreateStage(PacketTransformPtr transform) {
return new PacketTransformStage(transform);
}
// static
Stage* Engine::CreateStage(ActiveSourcePtr source) {
return new ActiveSourceStage(source);
}
// static
Stage* Engine::CreateStage(ActiveSinkPtr sink) {
return new ActiveSinkStage(sink);
}
// static
Stage* Engine::CreateStage(LpcmTransformPtr transform) {
return new LpcmTransformStage(transform);
}
void Engine::MaybePrepareUnsafe(Stage* stage) {
lock_.AssertAcquired();
if (stage == nullptr || stage->prepared_) {
return;
}
// Make sure all downstream stages have been prepared.
uint32_t output_count = stage->output_count();
for (uint32_t output_index = 0; output_index < output_count; output_index++) {
StageOutput& output = stage->output(output_index);
if (output.connected() && !output.downstream_stage()->prepared()) {
return;
}
}
stage->Prepare(update_function_);
stage->prepared_ = true;
// Prepare all upstream stages.
uint32_t input_count = stage->input_count();
for (uint32_t input_index = 0; input_index < input_count; input_index++) {
MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
}
}
void Engine::UpdateUnsafe() {
lock_.AssertAcquired();
while (true) {
Stage* stage = PopFromSupplyBacklogUnsafe();
if (stage != nullptr) {
UpdateUnsafe(stage);
continue;
}
stage = PopFromDemandBacklogUnsafe();
if (stage != nullptr) {
UpdateUnsafe(stage);
continue;
}
break;
}
}
void Engine::UpdateUnsafe(Stage *stage) {
lock_.AssertAcquired();
DCHECK(stage);
packets_produced_ = false;
stage->Update(this);
// If the stage produced packets, it may need to reevaluate demand later.
if (packets_produced_) {
PushToDemandBacklogUnsafe(stage);
}
}
Stage* Engine::PopFromSupplyBacklogUnsafe() {
lock_.AssertAcquired();
if (supply_backlog_.empty()) {
return nullptr;
}
Stage* stage = supply_backlog_.front();
supply_backlog_.pop();
DCHECK(stage->in_supply_backlog_);
stage->in_supply_backlog_ = false;
return stage;
}
Stage* Engine::PopFromDemandBacklogUnsafe() {
lock_.AssertAcquired();
if (demand_backlog_.empty()) {
return nullptr;
}
Stage* stage = demand_backlog_.top();
demand_backlog_.pop();
DCHECK(stage->in_demand_backlog_);
stage->in_demand_backlog_ = false;
return stage;
}
} // namespace media
} // namespace mojo