Motown: Remove LPCM optimizations, fix prepare, add flush, add ActiveMultistreamSink model/stage
- Removed LPCM optimizations
- Converted LpcmReformatter from LPCM transport to Packet transport
- Added ActiveMultistreamSink model and hosting stage for system mixer use (needs debugging)
- Moved graph building from Engine to new Graph class
- Established new graph-building threading model (not thread-safe)
- Fixed Prepare logic
- Added Unprepare
- Disallowing disconnection of prepared inputs/outputs
- Added Flush
- Now using InputRef and OutputRef internally
- Renamed:
    Engine::Part -> PartRef
    Engine::Input -> InputRef
    Engine::Output -> OutputRef
    StageInput -> Input
    StageOutput -> Output
    Allocator -> PayloadAllocator
- Many instances of uint32_t and uint64_t replaced with size_t

R=johngro@google.com

Review URL: https://codereview.chromium.org/1678433002 .
diff --git a/services/media/BUILD.gn b/services/media/BUILD.gn
index fa73662..c638ebb 100644
--- a/services/media/BUILD.gn
+++ b/services/media/BUILD.gn
@@ -1,4 +1,4 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
+# Copyright 2016 The Chromium Authors. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
diff --git a/services/media/framework/BUILD.gn b/services/media/framework/BUILD.gn
index 29a97c9..7747bc0 100644
--- a/services/media/framework/BUILD.gn
+++ b/services/media/framework/BUILD.gn
@@ -7,26 +7,25 @@
 
 source_set("framework") {
   sources = [
-    "allocator.cc",
-    "allocator.h",
     "conversion_pipeline_builder.cc",
     "conversion_pipeline_builder.h",
     "engine.cc",
     "engine.h",
     "formatting.cc",
     "formatting.h",
+    "graph.cc",
+    "graph.h",
     "lpcm_util.cc",
     "lpcm_util.h",
     "metadata.cc",
     "metadata.h",
+    "models/active_multistream_sink.h",
     "models/active_sink.h",
     "models/active_source.h",
     "models/demand.h",
-    "models/lpcm_frame_buffer.cc",
-    "models/lpcm_frame_buffer.h",
-    "models/lpcm_transform.h",
-    "models/multistream_packet_source.h",
-    "models/packet_transform.h",
+    "models/multistream_source.h",
+    "models/part.h",
+    "models/transform.h",
     "packet.cc",
     "packet.h",
     "parts/decoder.h",
@@ -39,28 +38,28 @@
     "parts/null_sink.h",
     "parts/reader.cc",
     "parts/reader.h",
-    "ptr.h",
+    "payload_allocator.cc",
+    "payload_allocator.h",
+    "refs.cc",
+    "refs.h",
     "result.h",
+    "safe_clone.h",
+    "stages/active_multistream_sink_stage.cc",
+    "stages/active_multistream_sink_stage.h",
     "stages/active_sink_stage.cc",
     "stages/active_sink_stage.h",
     "stages/active_source_stage.cc",
     "stages/active_source_stage.h",
-    "stages/distributor_stage.cc",
-    "stages/distributor_stage.h",
-    "stages/lpcm_stage_input.cc",
-    "stages/lpcm_stage_input.h",
-    "stages/lpcm_stage_output.cc",
-    "stages/lpcm_stage_output.h",
-    "stages/lpcm_transform_stage.cc",
-    "stages/lpcm_transform_stage.h",
-    "stages/packet_transform_stage.cc",
-    "stages/packet_transform_stage.h",
+    "stages/input.cc",
+    "stages/input.h",
+    "stages/multistream_source_stage.cc",
+    "stages/multistream_source_stage.h",
+    "stages/output.cc",
+    "stages/output.h",
     "stages/stage.cc",
     "stages/stage.h",
-    "stages/stage_input.cc",
-    "stages/stage_input.h",
-    "stages/stage_output.cc",
-    "stages/stage_output.h",
+    "stages/transform_stage.cc",
+    "stages/transform_stage.h",
     "stream_type.cc",
     "stream_type.h",
   ]
diff --git a/services/media/framework/allocator.cc b/services/media/framework/allocator.cc
deleted file mode 100644
index e066599..0000000
--- a/services/media/framework/allocator.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include <cstdlib>
-
-#include "base/logging.h"
-#include "services/media/framework/allocator.h"
-
-namespace mojo {
-namespace media {
-
-namespace {
-
-class DefaultAllocator : public Allocator {
- public:
-  constexpr DefaultAllocator() {}
-
-  // Allocator implementation.
-  void* AllocatePayloadBuffer(uint64_t size) override;
-
-  void ReleasePayloadBuffer(uint64_t size, void* buffer) override;
-};
-
-void* DefaultAllocator::AllocatePayloadBuffer(uint64_t size) {
-  DCHECK(size > 0);
-  return std::malloc(static_cast<size_t>(size));
-}
-
-void DefaultAllocator::ReleasePayloadBuffer(uint64_t size, void* buffer) {
-  DCHECK(size > 0);
-  DCHECK(buffer);
-  std::free(buffer);
-}
-
-static constexpr DefaultAllocator default_allocator;
-
-} // namespace
-
-// static
-Allocator* Allocator::GetDefault() {
-  return const_cast<DefaultAllocator*>(&default_allocator);
-}
-
-} // namespace media
-} // namespace mojo
diff --git a/services/media/framework/allocator.h b/services/media/framework/allocator.h
deleted file mode 100644
index 2bd564e..0000000
--- a/services/media/framework/allocator.h
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ALLOCATOR_H_
-#define SERVICES_MEDIA_FRAMEWORK_ALLOCATOR_H_
-
-#include <cstdint>
-
-namespace mojo {
-namespace media {
-
-// Abstract base class for objects that allocate buffers for packets.
-class Allocator {
- public:
-  // Gets the default allocator, which allocates vanilla memory from the heap.
-  static Allocator* GetDefault();
-
-  // Allocates and returns a buffer of the indicated size or returns nullptr
-  // if the allocation fails.
-  // TODO(dalesat): Use size_t for sizes in units of bytes framework-wide.
-  virtual void* AllocatePayloadBuffer(uint64_t size) = 0;
-
-  // Releases a buffer previously allocated via AllocatePayloadBuffer.
-  virtual void ReleasePayloadBuffer(uint64_t size, void* buffer) = 0;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif // SERVICES_MEDIA_FRAMEWORK_ALLOCATOR_H_
diff --git a/services/media/framework/conversion_pipeline_builder.cc b/services/media/framework/conversion_pipeline_builder.cc
index c1af544..6c9479b 100644
--- a/services/media/framework/conversion_pipeline_builder.cc
+++ b/services/media/framework/conversion_pipeline_builder.cc
@@ -73,12 +73,13 @@
 }
 
 // Finds the media type set that best matches in_type.
-const StreamTypeSetPtr* FindBestLpcm(
+const std::unique_ptr<StreamTypeSet>* FindBestLpcm(
     const LpcmStreamType& in_type,
-    const StreamTypeSetsPtr& out_type_sets) {
-  const StreamTypeSetPtr* best = nullptr;
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        out_type_sets) {
+  const std::unique_ptr<StreamTypeSet>* best = nullptr;
   int best_score = 0;
-  for (const StreamTypeSetPtr& out_type_set : *out_type_sets) {
+  for (const std::unique_ptr<StreamTypeSet>& out_type_set : *out_type_sets) {
     switch (out_type_set->scheme()) {
       case StreamType::Scheme::kAnyElementary:
       case StreamType::Scheme::kAnyAudio:
@@ -106,16 +107,17 @@
 // type. Otherwise, *out_type is set to nullptr.
 AddResult AddTransformsForCompressedAudio(
     const CompressedAudioStreamType& in_type,
-    const StreamTypePtr& in_type_ptr,
-    const StreamTypeSetsPtr& out_type_sets,
-    Engine* engine,
-    Engine::Output* output,
-    StreamTypePtr* out_type) {
+    const std::unique_ptr<StreamType>& in_type_ptr,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        out_type_sets,
+    Graph* graph,
+    OutputRef* output,
+    std::unique_ptr<StreamType>* out_type) {
   DCHECK(out_type);
-  DCHECK(engine);
+  DCHECK(graph);
 
   // See if we have a matching COMPRESSED_AUDIO type.
-  for (const StreamTypeSetPtr& out_type_set : *out_type_sets) {
+  for (const std::unique_ptr<StreamTypeSet>& out_type_set : *out_type_sets) {
     switch (out_type_set->scheme()) {
       case StreamType::Scheme::kAnyElementary:
       case StreamType::Scheme::kAnyAudio:
@@ -138,7 +140,8 @@
   }
 
   // Find the best LPCM output type.
-  const StreamTypeSetPtr* best = FindBestLpcm(in_type, out_type_sets);
+  const std::unique_ptr<StreamTypeSet>* best =
+      FindBestLpcm(in_type, out_type_sets);
   if (best == nullptr) {
     // No candidates found.
     *out_type = nullptr;
@@ -148,7 +151,7 @@
   DCHECK_EQ((*best)->scheme(), StreamType::Scheme::kLpcm);
 
   // Need to decode. Create a decoder and go from there.
-  DecoderPtr decoder;
+  std::shared_ptr<Decoder> decoder;
   Result result = Decoder::Create(in_type_ptr, &decoder);
   if (result !=  Result::kOk) {
     // No decoder found.
@@ -156,7 +159,7 @@
     return AddResult::kFailed;
   }
 
-  *output = engine->ConnectOutputToPart(*output, engine->Add(decoder)).output();
+  *output = graph->ConnectOutputToPart(*output, graph->Add(decoder)).output();
   *out_type = decoder->output_stream_type();
 
   return AddResult::kProgressed;
@@ -169,10 +172,10 @@
 AddResult AddTransformsForLpcm(
     const LpcmStreamType& in_type,
     const LpcmStreamTypeSet& out_type_set,
-    Engine* engine,
-    Engine::Output* output,
-    StreamTypePtr* out_type) {
-  DCHECK(engine);
+    Graph* graph,
+    OutputRef* output,
+    std::unique_ptr<StreamType>* out_type) {
+  DCHECK(graph);
   DCHECK(out_type);
 
   // TODO(dalesat): Room for more intelligence here wrt transform ordering and
@@ -180,9 +183,9 @@
   if (in_type.sample_format() != out_type_set.sample_format() &&
       out_type_set.sample_format() != LpcmStreamType::SampleFormat::kAny) {
     // The reformatter will fix interleave conversion.
-    *output = engine->ConnectOutputToPart(
+    *output = graph->ConnectOutputToPart(
         *output,
-        engine->Add(LpcmReformatter::Create(in_type, out_type_set))).output();
+        graph->Add(LpcmReformatter::Create(in_type, out_type_set))).output();
   }
 
   if (!out_type_set.channels().contains(in_type.channels())) {
@@ -216,14 +219,16 @@
 // type. Otherwise, *out_type is set to nullptr.
 AddResult AddTransformsForLpcm(
     const LpcmStreamType& in_type,
-    const StreamTypeSetsPtr& out_type_sets,
-    Engine* engine,
-    Engine::Output* output,
-    StreamTypePtr* out_type) {
-  DCHECK(engine);
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        out_type_sets,
+    Graph* graph,
+    OutputRef* output,
+    std::unique_ptr<StreamType>* out_type) {
+  DCHECK(graph);
   DCHECK(out_type);
 
-  const StreamTypeSetPtr* best = FindBestLpcm(in_type, out_type_sets);
+  const std::unique_ptr<StreamTypeSet>* best =
+      FindBestLpcm(in_type, out_type_sets);
   if (best == nullptr) {
     // TODO(dalesat): Support a compressed output type by encoding.
     NOTREACHED() << "conversion using encoder not supported";
@@ -242,7 +247,7 @@
       return AddTransformsForLpcm(
           in_type,
           *(*best)->lpcm(),
-          engine,
+          graph,
           output,
           out_type);
     default:
@@ -257,13 +262,14 @@
 // (out_type_sets). If the call succeeds, *out_type is set to the new output
 // type. Otherwise, *out_type is set to nullptr.
 AddResult AddTransforms(
-    const StreamTypePtr& in_type,
-    const StreamTypeSetsPtr& out_type_sets,
-    Engine* engine,
-    Engine::Output* output,
-    StreamTypePtr* out_type) {
+    const std::unique_ptr<StreamType>& in_type,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        out_type_sets,
+    Graph* graph,
+    OutputRef* output,
+    std::unique_ptr<StreamType>* out_type) {
   DCHECK(in_type);
-  DCHECK(engine);
+  DCHECK(graph);
   DCHECK(out_type);
 
   switch (in_type->scheme()) {
@@ -271,7 +277,7 @@
       return AddTransformsForLpcm(
           *in_type->lpcm(),
           out_type_sets,
-          engine,
+          graph,
           output,
           out_type);
     case StreamType::Scheme::kCompressedAudio:
@@ -279,7 +285,7 @@
           *in_type->compressed_audio(),
           in_type,
           out_type_sets,
-          engine,
+          graph,
           output,
           out_type);
     default:
@@ -293,33 +299,34 @@
 }  // namespace
 
 bool BuildConversionPipeline(
-    const StreamTypePtr& in_type,
-    const StreamTypeSetsPtr& out_type_sets,
-    Engine* engine,
-    Engine::Output* output,
-    StreamTypePtr* out_type) {
+    const std::unique_ptr<StreamType>& in_type,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        out_type_sets,
+    Graph* graph,
+    OutputRef* output,
+    std::unique_ptr<StreamType>* out_type) {
   DCHECK(in_type);
   DCHECK(out_type_sets);
-  DCHECK(engine);
+  DCHECK(graph);
   DCHECK(output);
   DCHECK(out_type);
 
-  Engine::Output out = *output;
+  OutputRef out = *output;
 
-  const StreamTypePtr* type_to_convert = &in_type;
-  StreamTypePtr next_in_type;
+  const std::unique_ptr<StreamType>* type_to_convert = &in_type;
+  std::unique_ptr<StreamType> next_in_type;
   while (true) {
-    StreamTypePtr converted_type;
+    std::unique_ptr<StreamType> converted_type;
     switch (AddTransforms(
         *type_to_convert,
         out_type_sets,
-        engine,
+        graph,
         &out,
         &converted_type)) {
       case AddResult::kFailed:
         // Failed to find a suitable conversion. Return the pipeline to its
         // original state.
-        engine->RemovePartsConnectedToOutput(*output);
+        graph->RemovePartsConnectedToOutput(*output);
         *out_type = nullptr;
         return false;
       case AddResult::kProgressed:
diff --git a/services/media/framework/conversion_pipeline_builder.h b/services/media/framework/conversion_pipeline_builder.h
index 988eb5f..6957abc 100644
--- a/services/media/framework/conversion_pipeline_builder.h
+++ b/services/media/framework/conversion_pipeline_builder.h
@@ -5,7 +5,7 @@
 #ifndef SERVICES_MEDIA_FRAMEWORK_CONVERSION_PIPELINE_BUILDER_H_
 #define SERVICES_MEDIA_FRAMEWORK_CONVERSION_PIPELINE_BUILDER_H_
 
-#include "services/media/framework/engine.h"
+#include "services/media/framework/graph.h"
 #include "services/media/framework/packet.h"
 #include "services/media/framework/stream_type.h"
 
@@ -17,11 +17,12 @@
 // *output and delivers the resulting output type via *out_type. If it fails,
 // returns false, sets *out_type to nullptr and leaves *output unchanged.
 bool BuildConversionPipeline(
-    const StreamTypePtr& in_type,
-    const StreamTypeSetsPtr& out_type_sets,
-    Engine* engine,
-    Engine::Output* output,
-    StreamTypePtr* out_type);
+    const std::unique_ptr<StreamType>& in_type,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        out_type_sets,
+    Graph* graph,
+    OutputRef* output,
+    std::unique_ptr<StreamType>* out_type);
 
 }  // namespace media
 }  // namespace mojo
diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc
index 980440c..6c76e0b 100644
--- a/services/media/framework/engine.cc
+++ b/services/media/framework/engine.cc
@@ -7,228 +7,60 @@
 namespace mojo {
 namespace media {
 
-uint32_t Engine::Part::input_count() {
-  DCHECK(stage_ != nullptr);
-  return stage_->input_count();
-}
-
-Engine::Input Engine::Part::input(uint32_t index) {
-  DCHECK(stage_ != nullptr && index < stage_->input_count());
-  return Input(stage_, index);
-}
-
-Engine::Input Engine::Part::input() {
-  DCHECK(stage_ != nullptr && stage_->input_count() == 1);
-  return Input(stage_, 0);
-}
-
-uint32_t Engine::Part::output_count() {
-  DCHECK(stage_ != nullptr);
-  return stage_->output_count();
-}
-
-Engine::Output Engine::Part::output(uint32_t index) {
-  DCHECK(stage_ != nullptr && index < stage_->output_count());
-  return Output(stage_, index);
-}
-
-Engine::Output Engine::Part::output() {
-  DCHECK(stage_ != nullptr && stage_->output_count() == 1);
-  return Output(stage_, 0);
-}
-
-Engine::Part Engine::Part::upstream_part(uint32_t index) {
-  DCHECK(stage_ != nullptr && index < stage_->input_count());
-  return Part(stage_->input(index).upstream_stage());
-}
-
-Engine::Part Engine::Part::upstream_part() {
-  DCHECK(stage_ != nullptr && stage_->input_count() == 1);
-  return Part(stage_->input(0).upstream_stage());
-}
-
-Engine::Part Engine::Part::downstream_part(uint32_t index) {
-  DCHECK(stage_ != nullptr && index < stage_->output_count());
-  return Part(stage_->output(index).downstream_stage());
-}
-
-Engine::Part Engine::Part::downstream_part() {
-  DCHECK(stage_ != nullptr && stage_->output_count() == 1);
-  return Part(stage_->output(0).downstream_stage());
-}
-
-Engine::Engine() {
-  update_function_ = [this](Stage* stage) {
-    DCHECK(stage);
-    base::AutoLock lock(lock_);
-    UpdateUnsafe(stage);
-    UpdateUnsafe();
-  };
-}
+Engine::Engine() {}
 
 Engine::~Engine() {
-  Reset();
-}
-
-void Engine::RemovePart(Part part) {
-  DCHECK(part);
   base::AutoLock lock(lock_);
-  RemoveUnsafe(part.stage_);
 }
 
-Engine::Part Engine::Connect(Output output, Input input) {
-  DCHECK(output);
-  DCHECK(input);
-
-  base::AutoLock lock(lock_);
-
-  if (output.connected()) {
-    DisconnectOutputUnsafe(output.stage_, output.index_);
-  }
-  if (input.connected()) {
-    DisconnectInputUnsafe(input.stage_, input.index_);
-  }
-
-  output.stage_output().connect(input.stage_, input.index_);
-  input.stage_input().connect(output.stage_, output.index_);
-
-  return input.part();
+void Engine::PrepareInput(const InputRef& input) {
+  VisitUpstream(
+      input,
+      [] (const InputRef& input,
+          const OutputRef& output,
+          const Stage::UpstreamCallback& callback) {
+        DCHECK(!input.actual().prepared());
+        PayloadAllocator* allocator = input.stage_->PrepareInput(input.index_);
+        input.actual().set_prepared(true);
+        output.stage_->PrepareOutput(output.index_, allocator, callback);
+      });
 }
 
-Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) {
-  DCHECK(upstream_part);
-  DCHECK(downstream_part);
-  Connect(upstream_part.output(), downstream_part.input());
-  return downstream_part;
+void Engine::UnprepareInput(const InputRef& input) {
+  VisitUpstream(
+      input,
+      [] (const InputRef& input,
+          const OutputRef& output,
+          const Stage::UpstreamCallback& callback) {
+        DCHECK(input.actual().prepared());
+        input.stage_->UnprepareInput(input.index_);
+        output.stage_->UnprepareOutput(output.index_, callback);
+      });
 }
 
-Engine::Part Engine::ConnectOutputToPart(
-    Output output,
-    Part downstream_part) {
-  DCHECK(output);
-  DCHECK(downstream_part);
-  Connect(output, downstream_part.input());
-  return downstream_part;
+void Engine::FlushOutput(const OutputRef& output) {
+  VisitDownstream(
+      output,
+      [] (const OutputRef& output,
+          const InputRef& input,
+          const Stage::DownstreamCallback& callback) {
+        DCHECK(input.actual().prepared());
+        output.stage_->FlushOutput(output.index_);
+        input.stage_->FlushInput(input.index_, callback);
+      });
 }
 
-Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) {
-  DCHECK(upstream_part);
-  DCHECK(input);
-  Connect(upstream_part.output(), input);
-  return input.part();
-}
-
-void Engine::DisconnectOutput(Output output) {
-  DCHECK(output);
-
-  base::AutoLock lock(lock_);
-  DisconnectOutputUnsafe(output.stage_, output.index_);
-}
-
-void Engine::DisconnectInput(Input input) {
-  DCHECK(input);
-
-  base::AutoLock lock(lock_);
-  DisconnectInputUnsafe(input.stage_, input.index_);
-}
-
-void Engine::RemovePartsConnectedToPart(Part part) {
-  DCHECK(part);
-
-  base::AutoLock lock(lock_);
-
-  std::deque<Part> to_remove { part };
-
-  while (!to_remove.empty()) {
-    Part part = to_remove.front();
-    to_remove.pop_front();
-
-    for (uint32_t i = 0; i < part.input_count(); ++i) {
-      to_remove.push_back(part.upstream_part(i));
-    }
-
-    for (uint32_t i = 0; i < part.output_count(); ++i) {
-      to_remove.push_back(part.downstream_part(i));
-    }
-
-    RemoveUnsafe(part.stage_);
-  }
-}
-
-void Engine::RemovePartsConnectedToOutput(Output output) {
-  DCHECK(output);
-
-  if (!output.connected()) {
-    return;
-  }
-
-  Part downstream_part = output.downstream_part();
-  DisconnectOutput(output);
-  RemovePartsConnectedToPart(downstream_part);
-}
-
-void Engine::RemovePartsConnectedToInput(Input input) {
-  DCHECK(input);
-
-  if (!input.connected()) {
-    return;
-  }
-
-  Part upstream_part = input.upstream_part();
-  DisconnectInput(input);
-  RemovePartsConnectedToPart(upstream_part);
-}
-
-void Engine::Prepare() {
-  base::AutoLock lock(lock_);
-  for (Stage* sink : sinks_) {
-    sink->Prepare(update_function_);
-    sink->prepared_ = true;
-    uint32_t input_count = sink->input_count();
-    for (uint32_t input_index = 0; input_index < input_count; input_index++) {
-      MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
-    }
-  }
-}
-
-void Engine::Prepare(Part part) {
-  DCHECK(part);
-  base::AutoLock lock(lock_);
-  MaybePrepareUnsafe(part.stage_);
-}
-
-void Engine::PrimeSinks() {
-  lock_.Acquire();
-  std::list<Stage*> sinks(sinks_);
-  lock_.Release();
-
-  // TODO(dalesat): Threading issue: these sinks may go away during priming.
-  for (Stage* sink : sinks) {
-    sink->Prime();
-  }
-}
-
-void Engine::Reset() {
-  base::AutoLock lock(lock_);
-  while (!supply_backlog_.empty()) {
-    supply_backlog_.pop();
-  }
-  while (!demand_backlog_.empty()) {
-    demand_backlog_.pop();
-  }
-  sources_.clear();
-  sinks_.clear();
-  while (!stages_.empty()) {
-    Stage* stage = stages_.front();
-    stages_.pop_front();
-    delete stage;
-  }
-}
-
-void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
-  lock_.AssertAcquired();
-
+void Engine::RequestUpdate(Stage* stage) {
   DCHECK(stage);
+  base::AutoLock lock(lock_);
+  Update(stage);
+  Update();
+}
+
+void Engine::PushToSupplyBacklog(Stage* stage) {
+  lock_.AssertAcquired();
+  DCHECK(stage);
+
   packets_produced_ = true;
   if (!stage->in_supply_backlog_) {
     supply_backlog_.push(stage);
@@ -236,150 +68,81 @@
   }
 }
 
-void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
+void Engine::PushToDemandBacklog(Stage* stage) {
   lock_.AssertAcquired();
-
   DCHECK(stage);
+
   if (!stage->in_demand_backlog_) {
     demand_backlog_.push(stage);
     stage->in_demand_backlog_ = true;
   }
 }
 
-Engine::Part Engine::Add(Stage* stage) {
+void Engine::VisitUpstream(
+    const InputRef& input,
+    const UpstreamVisitor& vistor) {
   base::AutoLock lock(lock_);
 
-  stages_.push_back(stage);
-  if (stage->input_count() == 0) {
-    sources_.push_back(stage);
-  }
-  if (stage->output_count() == 0) {
-    sinks_.push_back(stage);
-  }
-  return Part(stage);
-}
+  std::queue<InputRef> backlog;
+  backlog.push(input);
 
-void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
-  DCHECK(stage);
-  DCHECK(index < stage->output_count());
+  while (!backlog.empty()) {
+    InputRef input = backlog.front();
+    backlog.pop();
+    DCHECK(input.valid());
+    DCHECK(input.connected());
 
-  lock_.AssertAcquired();
+    const OutputRef& output = input.mate();
+    Stage* output_stage = output.stage_;
 
-  StageOutput& stage_output = stage->output(index);
-
-  if (stage_output.downstream_stage() == nullptr) {
-    return;
-  }
-
-  stage_output.mate().disconnect();
-  stage_output.disconnect();
-}
-
-void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
-  DCHECK(stage);
-  DCHECK(index < stage->input_count());
-
-  lock_.AssertAcquired();
-
-  StageInput& stage_input = stage->input(index);
-
-  if (stage_input.upstream_stage() == nullptr) {
-    return;
-  }
-
-  stage_input.mate().disconnect();
-  stage_input.disconnect();
-}
-
-void Engine::RemoveUnsafe(Stage* stage) {
-  DCHECK(stage);
-
-  lock_.AssertAcquired();
-
-  uint32_t input_count = stage->input_count();
-  for (uint32_t input_index = 0; input_index < input_count; input_index++) {
-    if (stage->input(input_index).connected()) {
-      DisconnectInputUnsafe(stage, input_index);
-    }
-  }
-
-  uint32_t output_count = stage->output_count();
-  for (uint32_t output_index = 0; output_index < output_count; output_index++) {
-    if (stage->output(output_index).connected()) {
-      DisconnectOutputUnsafe(stage, output_index);
-    }
-  }
-
-  sources_.remove(stage);
-  sinks_.remove(stage);
-  stages_.remove(stage);
-  delete stage;
-}
-
-// static
-Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
-  return new DistributorStage(source);
-}
-
-// static
-Stage* Engine::CreateStage(PacketTransformPtr transform) {
-  return new PacketTransformStage(transform);
-}
-
-// static
-Stage* Engine::CreateStage(ActiveSourcePtr source) {
-  return new ActiveSourceStage(source);
-}
-
-// static
-Stage* Engine::CreateStage(ActiveSinkPtr sink) {
-  return new ActiveSinkStage(sink);
-}
-
-// static
-Stage* Engine::CreateStage(LpcmTransformPtr transform) {
-  return new LpcmTransformStage(transform);
-}
-
-void Engine::MaybePrepareUnsafe(Stage* stage) {
-  lock_.AssertAcquired();
-
-  if (stage == nullptr || stage->prepared_) {
-    return;
-  }
-
-  // Make sure all downstream stages have been prepared.
-  uint32_t output_count = stage->output_count();
-  for (uint32_t output_index = 0; output_index < output_count; output_index++) {
-    StageOutput& output = stage->output(output_index);
-    if (output.connected() && !output.downstream_stage()->prepared()) {
-      return;
-    }
-  }
-
-  stage->Prepare(update_function_);
-  stage->prepared_ = true;
-
-  // Prepare all upstream stages.
-  uint32_t input_count = stage->input_count();
-  for (uint32_t input_index = 0; input_index < input_count; input_index++) {
-    MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
+    vistor(
+        input,
+        output,
+        [output_stage, &backlog](size_t input_index) {
+          backlog.push(InputRef(output_stage, input_index));
+        });
   }
 }
 
-void Engine::UpdateUnsafe() {
+void Engine::VisitDownstream(
+    const OutputRef& output,
+    const DownstreamVisitor& vistor) {
+  base::AutoLock lock(lock_);
+
+  std::queue<OutputRef> backlog;
+  backlog.push(output);
+
+  while (!backlog.empty()) {
+    OutputRef output = backlog.front();
+    backlog.pop();
+    DCHECK(output.valid());
+    DCHECK(output.connected());
+
+    const InputRef& input = output.mate();
+    Stage* input_stage = input.stage_;
+
+    vistor(
+        output,
+        input,
+        [input_stage, &backlog](size_t output_index) {
+          backlog.push(OutputRef(input_stage, output_index));
+        });
+  }
+}
+
+void Engine::Update() {
   lock_.AssertAcquired();
 
   while (true) {
-    Stage* stage = PopFromSupplyBacklogUnsafe();
+    Stage* stage = PopFromSupplyBacklog();
     if (stage != nullptr) {
-      UpdateUnsafe(stage);
+      Update(stage);
       continue;
     }
 
-    stage = PopFromDemandBacklogUnsafe();
+    stage = PopFromDemandBacklog();
     if (stage != nullptr) {
-      UpdateUnsafe(stage);
+      Update(stage);
       continue;
     }
 
@@ -387,7 +150,7 @@
   }
 }
 
-void Engine::UpdateUnsafe(Stage *stage) {
+void Engine::Update(Stage *stage) {
   lock_.AssertAcquired();
 
   DCHECK(stage);
@@ -398,11 +161,11 @@
 
   // If the stage produced packets, it may need to reevaluate demand later.
   if (packets_produced_) {
-    PushToDemandBacklogUnsafe(stage);
+    PushToDemandBacklog(stage);
   }
 }
 
-Stage* Engine::PopFromSupplyBacklogUnsafe() {
+Stage* Engine::PopFromSupplyBacklog() {
   lock_.AssertAcquired();
 
   if (supply_backlog_.empty()) {
@@ -416,7 +179,7 @@
   return stage;
 }
 
-Stage* Engine::PopFromDemandBacklogUnsafe() {
+Stage* Engine::PopFromDemandBacklog() {
   lock_.AssertAcquired();
 
   if (demand_backlog_.empty()) {
diff --git a/services/media/framework/engine.h b/services/media/framework/engine.h
index 2ceb0cf..55b5ae5 100644
--- a/services/media/framework/engine.h
+++ b/services/media/framework/engine.h
@@ -8,98 +8,19 @@
 #include <list>
 #include <queue>
 #include <stack>
+#include <unordered_map>
 
 #include "base/synchronization/lock.h"
-#include "services/media/framework/stages/active_sink_stage.h"
-#include "services/media/framework/stages/active_source_stage.h"
-#include "services/media/framework/stages/distributor_stage.h"
-#include "services/media/framework/stages/lpcm_transform_stage.h"
-#include "services/media/framework/stages/packet_transform_stage.h"
+#include "services/media/framework/refs.h"
 #include "services/media/framework/stages/stage.h"
 
 namespace mojo {
 namespace media {
 
 //
-// USAGE
-//
-// TODO(dalesat): Consider adding a suffix to Engine::Part/Input/Output to
-// indicate that they're references.
-// TODO(dalesat): Consider folding PrimeSinks into Prepare.
-//
-// Engine is a container for sources, sinks and transforms ('parts') connected
-// in a graph. Engine::Part, Engine::Input and Engine::Output are all opaque
-// references to parts and their inputs and outputs. Engine provides a variety
-// of methods for adding and removing parts and for connecting inputs and
-// outputs to form a graph.
-//
-// In addition to containing parts and representing their interconnection,
-// Engine manages the coordinated operation of its constituent parts and
-// transports media from part to part. The Prepare method prepares the graph
-// for operation, and the PrimeSinks method tells the sinks in the graph to
-// prime themselves. Any additional actions required to make the graph operate
-// (such as manipulating a rate control interface) is out of scope.
-//
-// Parts added to the engine are referenced using shared pointers. The engine
-// holds pointers to the parts it contains, and the application, in many cases,
-// also holds pointers to the parts so it can call methods that are outside the
-// engine's scope. When a part is added the Engine returns an Engine::Part
-// object, which can be used to reference the part when the graph is modified.
-// Engine::Part objects can be interrogated to retrieve inputs (as Engine::Input
-// objects) and outputs (as Engine::Output objects).
-//
-// Some support is provided for modifying graphs that are operating. This
-// capability isn't fully developed at the moment. Prepare(Part) is an example
-// of a method provided for this purpose.
-//
-// Parts come in various flavors, defined by 'model' abstract classes. The
-// current list of supported models is:
-//
-//  ActiveSink              - a sink that consumes packets asynchronously
-//  ActiveSource            - a source that produces packets asynchronously
-//  LpcmMixer               - a transform that mixes LPCM frames from multiple
-//                            inputs and produces a single stream of LPCM frames
-//                            via one output
-//  LpcmSource              - a source that produces LPCM frames synchronously
-//  LpcmTransform           - a synchronous transform with one LPCM input and
-//                            one LPCM output
-//  MultiStreamPacketSource - a source that produces multiple streams of packets
-//                            synchronously
-//  PacketTransform         - a synchronous transform that consumes and produces
-//                            packets via one input and one output
-//
-// Other models will be defined in the future as needed.
-//
-
-//
 // DESIGN
 //
-// The Engine is implemented as a system of cooperating objects. Of those
-// objects, only the engine itself is of relevance to code that uses Engine and
-// to part implementations. The other objects are:
-//
-// Stage
-// A stage hosts a single part. There are many subclasses of Stage, one for
-// each supported part model. The stage's job is to implement the contract
-// represented by the model so the parts that conform to the model can
-// participate in the operation of the engine. Stages are uniform with respect
-// to how they interact with engine. Engine::Part references a stage.
-//
-// StageInput
-// A stage possesses zero or more StageInput instances. StageInput objects
-// implement the supply of media into the stage and demand for media signalled
-// upstream. StageInputs receive media from StageOutputs in the form of packets
-// (type Packet). LpcmStageInput is a subclass of StageInput that interoperates
-// with LpcmStageInputs in a way that provides optimizations relavant to LPCM
-// audio media. Engine::Input references a StageInput.
-//
-// StageOutput
-// A stage possesses zero or more StageOutput instances. StageOutput objects
-// implement the supply of media output of the stage to a downstream input and
-// demand for media signalled from that input. LpcmStageOutput implements
-// optimized LPCM flow. Engine::Output references a StageOutput.
-//
-// Engine uses a 'work list' algorithm to operate the contained graph. The
+// Engine uses a 'work list' algorithm to operate the graph. The
 // engine has a backlog of stages that need to be updated. To advance the
 // operation of the graph, the engine removes a stage from the backlog and calls
 // the stage's Update method. The Stage::Update may cause stages to be added
@@ -143,7 +64,7 @@
 //    supply and/or demand.
 // 3) Threads used to call update callbacks must be suitable for operating the
 //    engine. There is currently no affordance for processing other tasks on
-//    a thread while the callback is running. A callback may run for a long
+//    the thread while the callback is running. A callback may run for a long
 //    time, depending on how much work needs to be done.
 // 4) Parts cannot rely on being called back on the same thread on which they
 //    invoke update callbacks. This may require additional synchronization and
@@ -161,254 +82,64 @@
 // 2) Marshalling update callbacks to a different thread.
 //
 
-// Host for a source, sink or transform.
+// Manages operation of a Graph.
 class Engine {
  public:
-  class Input;
-  class Output;
-
-  // Opaque Stage pointer used for graph building.
-  class Part {
-   public:
-    Part() : stage_(nullptr) {}
-
-    uint32_t input_count();
-    Input input(uint32_t index);
-    Input input();
-    uint32_t output_count();
-    Output output(uint32_t index);
-    Output output();
-    Part upstream_part(uint32_t index);
-    Part upstream_part();
-    Part downstream_part(uint32_t index);
-    Part downstream_part();
-
-   private:
-    explicit Part(Stage* stage) : stage_(stage) {}
-
-    explicit operator bool() const { return stage_ != nullptr; }
-
-    Stage* stage_;
-
-    friend Engine;
-    friend Input;
-    friend Output;
-  };
-
-  // Opaque StageInput pointer used for graph building.
-  class Input {
-   public:
-    Input() : stage_(nullptr), index_(0) {}
-
-    explicit operator bool() const { return stage_ != nullptr; }
-
-    Part part() { return Part(stage_); }
-
-    bool connected() {
-      DCHECK(stage_);
-      return stage_input().upstream_stage() != nullptr;
-    }
-
-    Part upstream_part() {
-      DCHECK(connected());
-      return Part(stage_input().upstream_stage());
-    }
-
-   private:
-    Input(Stage* stage, uint32_t index) :
-        stage_(stage), index_(index) {
-      DCHECK(stage_);
-      DCHECK(index_ < stage_->input_count());
-    }
-
-    StageInput& stage_input() {
-      DCHECK(stage_);
-      return stage_->input(index_);
-    }
-
-    Stage* stage_;
-    uint32_t index_;
-
-    friend Engine;
-    friend Part;
-    friend Output;
-  };
-
-  // Opaque StageOutput pointer used for graph building.
-  class Output {
-   public:
-    Output() : stage_(nullptr), index_(0) {}
-
-    explicit operator bool() const { return stage_ != nullptr; }
-
-    Part part() { return Part(stage_); }
-
-    bool connected() {
-      DCHECK(stage_);
-      return stage_output().downstream_stage() != nullptr;
-    }
-
-    Part downstream_part() {
-      DCHECK(connected());
-      return Part(stage_output().downstream_stage());
-    }
-
-   private:
-    Output(Stage* stage, uint32_t index) :
-        stage_(stage), index_(index) {
-      DCHECK(stage_);
-      DCHECK(index_ < stage_->output_count());
-    }
-
-    StageOutput& stage_output() {
-      DCHECK(stage_);
-      return stage_->output(index_);
-    }
-
-    Stage* stage_;
-    uint32_t index_;
-
-    friend Engine;
-    friend Part;
-    friend Input;
-  };
-
   Engine();
 
   ~Engine();
 
-  // Adds a part to the engine.
-  template<typename T, typename TBase>
-  Part Add(SharedPtr<T, TBase> t) {
-    DCHECK(t);
-    return Add(CreateStage(std::shared_ptr<TBase>(t)));
-  }
+  // Prepares the input and the subgraph upstream of it.
+  void PrepareInput(const InputRef& input_ref);
 
-  // Removes a part from the engine after disconnecting it from other parts.
-  void RemovePart(Part part);
+  // Unprepares the input and the subgraph upstream of it.
+  void UnprepareInput(const InputRef& input_ref);
 
-  // Connects an output connector to an input connector. Returns the dowstream
-  // part.
-  Part Connect(Output output, Input input);
+  // Flushes the output and the subgraph downstream of it.
+  void FlushOutput(const OutputRef& output_ref);
 
-  // Connects a part with exactly one output to a part with exactly one input.
-  // Returns the downstream part.
-  Part ConnectParts(Part upstream_part, Part downstream_part);
-
-  // Connects an output connector to a part that has exactly one input. Returns
-  // the downstream part.
-  Part ConnectOutputToPart(Output output, Part downstream_part);
-
-  // Connects a part with exactly one output to an input connector. Returns the
-  // downstream part.
-  Part ConnectPartToInput(Part upstream_part, Input input);
-
-  // Disconnects an output connector and the input connector to which it's
-  // connected.
-  void DisconnectOutput(Output output);
-
-  // Disconnects an input connector and the output connector to which it's
-  // connected.
-  void DisconnectInput(Input input);
-
-  // Disconnects and removes part and everything connected to it.
-  void RemovePartsConnectedToPart(Part part);
-
-  // Disconnects and removes everything connected to output.
-  void RemovePartsConnectedToOutput(Output output);
-
-  // Disconnects and removes everything connected to input.
-  void RemovePartsConnectedToInput(Input input);
-
-  // Adds all the parts in t (which must all have one input and one output) and
-  // connects them in sequence to the output connector. Returns the output
-  // connector of the last part or the output parameter if it is empty.
-  template<typename T>
-  Output AddAndConnectAll(
-      Output output,
-      const T& t) {
-    for (auto& element : t) {
-      Part part = Add(CreateStage(element));
-      Connect(output, part.input());
-      output = part.output();
-    }
-    return output;
-  }
-
-  // Prepares the engine.
-  void Prepare();
-
-  // Prepares the part and everything upstream of it. This method is used to
-  // prepare subgraphs added when the rest of the graph is already prepared.
-  void Prepare(Part part);
-
-  // Primes all the sinks in the graph.
-  void PrimeSinks();
-
-  // Removes all parts from the engine.
-  void Reset();
-
- private:
-  // Adds a stage to the engine.
-  Part Add(Stage* stage);
-
-  // Disconnects an output.
-  void DisconnectOutputUnsafe(Stage* stage, uint32_t index);
-
-  // Disconnects an input.
-  void DisconnectInputUnsafe(Stage* stage, uint32_t index);
-
-  // Removes a stage.
-  void RemoveUnsafe(Stage* stage);
-
-  // Creates a stage from a source, sink or transform. A specialization of this
-  // template is defined for each type of source, sink or transform that can be
-  // added to the engine.
-  template<typename T>
-  static Stage* CreateStage(std::shared_ptr<T> t);
-
-  // CreateStage template specialization for MultiStreamPacketSource.
-  static Stage* CreateStage(MultiStreamPacketSourcePtr source);
-
-  // CreateStage template specialization for PacketTransform.
-  static Stage* CreateStage(PacketTransformPtr transform);
-
-  // CreateStage template specialization for ActiveSource.
-  static Stage* CreateStage(ActiveSourcePtr source);
-
-  // CreateStage template specialization for ActiveSink.
-  static Stage* CreateStage(ActiveSinkPtr sink);
-
-  // CreateStage template specialization for LpcmTransform.
-  static Stage* CreateStage(LpcmTransformPtr transform);
-
-  // Prepares a stage if all its downstream stages are prepared.
-  void MaybePrepareUnsafe(Stage* stage);
-
-  // Processes the entire backlog.
-  void UpdateUnsafe();
-
-  // Performs processing for a single stage, updating the backlog accordingly.
-  void UpdateUnsafe(Stage *stage);
+  // Queues the stage for update and winds down the backlog.
+  void RequestUpdate(Stage* stage);
 
   // Pushes the stage to the supply backlog if it isn't already there.
-  void PushToSupplyBacklogUnsafe(Stage* stage);
+  void PushToSupplyBacklog(Stage* stage);
 
   // Pushes the stage to the demand backlog if it isn't already there.
-  void PushToDemandBacklogUnsafe(Stage* stage);
+  void PushToDemandBacklog(Stage* stage);
+
+ private:
+  using UpstreamVisitor = std::function<void(
+      const InputRef& input,
+      const OutputRef& output,
+      const Stage::UpstreamCallback& callback)>;
+  using DownstreamVisitor = std::function<void(
+      const OutputRef& output,
+      const InputRef& input,
+      const Stage::DownstreamCallback& callback)>;
+
+  void VisitUpstream(
+      const InputRef& input,
+      const UpstreamVisitor& vistor);
+
+  void VisitDownstream(
+      const OutputRef& output,
+      const DownstreamVisitor& vistor);
+
+  // Processes the entire backlog.
+  void Update();
+
+  // Performs processing for a single stage, updating the backlog accordingly.
+  void Update(Stage *stage);
 
   // Pops a stage from the supply backlog and returns it or returns nullptr if
   // the supply backlog is empty.
-  Stage* PopFromSupplyBacklogUnsafe();
+  Stage* PopFromSupplyBacklog();
 
   // Pops a stage from the demand backlog and returns it or returns nullptr if
   // the demand backlog is empty.
-  Stage* PopFromDemandBacklogUnsafe();
+  Stage* PopFromDemandBacklog();
 
   mutable base::Lock lock_;
-  std::list<Stage*> stages_;
-  std::list<Stage*> sources_;
-  std::list<Stage*> sinks_;
   // supply_backlog_ contains pointers to all the stages that have been supplied
   // (packets or frames) but have not been updated since. demand_backlog_ does
   // the same for demand. The use of queue vs stack here is a guess as to what
@@ -417,16 +148,10 @@
   // TODO(dalesat): Determine the best ordering and implement it.
   std::queue<Stage*> supply_backlog_;
   std::stack<Stage*> demand_backlog_;
-  Stage::UpdateCallback update_function_;
   bool packets_produced_;
-
-  friend class StageInput;
-  friend class StageOutput;
-  friend class LpcmStageInput;
-  friend class LpcmStageOutput;
 };
 
 }  // namespace media
 }  // namespace mojo
 
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_ENGINE_H_
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
diff --git a/services/media/framework/formatting.cc b/services/media/framework/formatting.cc
index 06ad7ec..cea3aaa 100644
--- a/services/media/framework/formatting.cc
+++ b/services/media/framework/formatting.cc
@@ -65,7 +65,9 @@
   return os;
 }
 
-std::ostream& operator<<(std::ostream& os, const StreamTypePtr& value) {
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<StreamType>& value) {
   if (!value) {
     return os << "<nullptr>" << std::endl;
   } else {
@@ -76,9 +78,10 @@
   os << begl << "Scheme scheme(): " << value->scheme() << std::endl;
   switch (value->scheme()) {
     case StreamType::Scheme::kMultiplexed:
-      os << begl << "StreamTypePtr multiplex_type: "
+      os << begl << "std::unique_ptr<StreamType> multiplex_type: "
           << value->multiplexed()->multiplex_type();
-      os << begl << "StreamTypesPtr substream_types: "
+      os << begl << "std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>"
+          << " substream_types: "
           << value->multiplexed()->substream_types();
       break;
     case StreamType::Scheme::kLpcm:
@@ -98,7 +101,7 @@
           << value->compressed_audio()->channels() << std::endl;
       os << begl << "uint32_t frames_per_second: "
           << value->compressed_audio()->frames_per_second() << std::endl;
-      os << begl << "BytesPtr encoding_details: "
+      os << begl << "std::unique_ptr<Bytes> encoding_details: "
           << value->compressed_audio()->encoding_details() << std::endl;
       break;
     case StreamType::Scheme::kVideo:
@@ -118,7 +121,7 @@
           << value->video()->coded_width() << std::endl;
       os << begl << "uint32_t coded_height: "
           << value->video()->coded_height() << std::endl;
-      os << begl << "BytesPtr encoding_details: "
+      os << begl << "std::unique_ptr<Bytes> encoding_details: "
           << value->video()->encoding_details() << std::endl;
       break;
     default:
@@ -128,7 +131,9 @@
   return os << outdent;
 }
 
-std::ostream& operator<<(std::ostream& os, const StreamTypeSetPtr& value) {
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<StreamTypeSet>& value) {
   if (!value) {
     return os << "<nullptr>" << std::endl;
   } else {
@@ -139,9 +144,10 @@
   os << begl << "Scheme scheme(): " << value->scheme() << std::endl;
   switch (value->scheme()) {
     case StreamType::Scheme::kMultiplexed:
-      os << begl << "StreamTypeSetPtr multiplex_type_set: "
+      os << begl << "std::unique_ptr<StreamTypeSet> multiplex_type_set: "
           << value->multiplexed()->multiplex_type_set();
-      os << begl << "StreamTypeSetsPtr substream_type_sets: "
+      os << begl << "std::unique_ptr<std::vector<std::unique_ptr<"
+          << "StreamTypeSet>>> substream_type_sets: "
           << value->multiplexed()->substream_type_sets();
       break;
     case StreamType::Scheme::kLpcm:
@@ -177,7 +183,9 @@
   return os << outdent;
 }
 
-std::ostream& operator<<(std::ostream& os, const StreamTypesPtr& value) {
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>& value) {
   if (!value) {
     return os << "<nullptr>" << std::endl;
   } else if (value->size() == 0) {
@@ -187,14 +195,16 @@
   }
 
   int index = 0;
-  for (const StreamTypePtr& element : *value) {
+  for (const std::unique_ptr<StreamType>& element : *value) {
     os << "[" << index++ << "]: " << element;
   }
 
   return os;
 }
 
-std::ostream& operator<<(std::ostream& os, const StreamTypeSetsPtr& value) {
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>& value) {
   if (!value) {
     return os << "<nullptr>" << std::endl;
   } else if (value->size() == 0) {
@@ -204,7 +214,7 @@
   }
 
   int index = 0;
-  for (const StreamTypeSetPtr& element : *value) {
+  for (const std::unique_ptr<StreamTypeSet>& element : *value) {
     os << "[" << index++ << "]: " << element;
   }
 
@@ -379,7 +389,9 @@
   return os;
 }
 
-std::ostream& operator<<(std::ostream& os, const BytesPtr& value) {
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<Bytes>& value) {
   if (value == nullptr) {
     return os << "<nullptr>";
   } else {
diff --git a/services/media/framework/formatting.h b/services/media/framework/formatting.h
index d0a1211..2cdbc62 100644
--- a/services/media/framework/formatting.h
+++ b/services/media/framework/formatting.h
@@ -61,7 +61,7 @@
 std::ostream& operator<<(std::ostream& os, VideoStreamType::VideoProfile value);
 std::ostream& operator<<(std::ostream& os, VideoStreamType::PixelFormat value);
 std::ostream& operator<<(std::ostream& os, VideoStreamType::ColorSpace value);
-std::ostream& operator<<(std::ostream& os, const BytesPtr& value);
+std::ostream& operator<<(std::ostream& os, const std::unique_ptr<Bytes>& value);
 
 template<typename T>
 std::ostream& operator<<(std::ostream& os, Range<T> value) {
@@ -72,10 +72,18 @@
 
 // The following overloads add newlines.
 
-std::ostream& operator<<(std::ostream& os, const StreamTypePtr& value);
-std::ostream& operator<<(std::ostream& os, const StreamTypeSetPtr& value);
-std::ostream& operator<<(std::ostream& os, const StreamTypesPtr& value);
-std::ostream& operator<<(std::ostream& os, const StreamTypeSetsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<StreamType>& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<StreamTypeSet>& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>& value);
 
 } // namespace media
 } // namespace mojo
diff --git a/services/media/framework/graph.cc b/services/media/framework/graph.cc
new file mode 100644
index 0000000..794985d
--- /dev/null
+++ b/services/media/framework/graph.cc
@@ -0,0 +1,219 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/graph.h"
+
+namespace mojo {
+namespace media {
+
+Graph::Graph() {
+  update_function_ = [this](Stage* stage) {
+    engine_.RequestUpdate(stage);
+  };
+}
+
+Graph::~Graph() {
+  Reset();
+}
+
+void Graph::RemovePart(PartRef part) {
+  DCHECK(part.valid());
+
+  Stage* stage = part.stage_;
+
+  size_t input_count = stage->input_count();
+  for (size_t input_index = 0; input_index < input_count; input_index++) {
+    if (stage->input(input_index).connected()) {
+      DisconnectInput(InputRef(stage, input_index));
+    }
+  }
+
+  size_t output_count = stage->output_count();
+  for (size_t output_index = 0; output_index < output_count; output_index++) {
+    if (stage->output(output_index).connected()) {
+      DisconnectOutput(OutputRef(stage, output_index));
+    }
+  }
+
+  stage->SetUpdateCallback(nullptr);
+
+  sources_.remove(stage);
+  sinks_.remove(stage);
+  stages_.remove(stage);
+
+  delete stage;
+}
+
+PartRef Graph::Connect(const OutputRef& output, const InputRef& input) {
+  DCHECK(output.valid());
+  DCHECK(input.valid());
+
+  if (output.connected()) {
+    DisconnectOutput(output);
+  }
+  if (input.connected()) {
+    DisconnectInput(input);
+  }
+
+  output.actual().Connect(input);
+  input.actual().Connect(output);
+
+  return input.part();
+}
+
+PartRef Graph::ConnectParts(
+    PartRef upstream_part,
+    PartRef downstream_part) {
+  DCHECK(upstream_part.valid());
+  DCHECK(downstream_part.valid());
+  Connect(upstream_part.output(), downstream_part.input());
+  return downstream_part;
+}
+
+PartRef Graph::ConnectOutputToPart(
+    const OutputRef& output,
+    PartRef downstream_part) {
+  DCHECK(output.valid());
+  DCHECK(downstream_part.valid());
+  Connect(output, downstream_part.input());
+  return downstream_part;
+}
+
+PartRef Graph::ConnectPartToInput(
+    PartRef upstream_part,
+    const InputRef& input) {
+  DCHECK(upstream_part.valid());
+  DCHECK(input.valid());
+  Connect(upstream_part.output(), input);
+  return input.part();
+}
+
+void Graph::DisconnectOutput(const OutputRef& output) {
+  DCHECK(output.valid());
+
+  if (!output.connected()) {
+    return;
+  }
+
+  Input& mate = output.mate().actual();
+
+  if (mate.prepared()) {
+    CHECK(false) << "attempt to disconnect prepared output";
+    return;
+  }
+
+  mate.Disconnect();
+  output.actual().Disconnect();
+}
+
+void Graph::DisconnectInput(const InputRef& input) {
+  DCHECK(input.valid());
+
+  if (!input.connected()) {
+    return;
+  }
+
+  Output& mate = input.mate().actual();
+
+  if (input.actual().prepared()) {
+    CHECK(false) << "attempt to disconnect prepared input";
+    return;
+  }
+
+  mate.Disconnect();
+  input.actual().Disconnect();
+}
+
+void Graph::RemovePartsConnectedToPart(PartRef part) {
+  DCHECK(part.valid());
+
+  std::deque<PartRef> to_remove { part };
+
+  while (!to_remove.empty()) {
+    PartRef part = to_remove.front();
+    to_remove.pop_front();
+
+    for (size_t i = 0; i < part.input_count(); ++i) {
+      to_remove.push_back(part.input(i).part());
+    }
+
+    for (size_t i = 0; i < part.output_count(); ++i) {
+      to_remove.push_back(part.output(i).part());
+    }
+
+    RemovePart(part);
+  }
+}
+
+void Graph::RemovePartsConnectedToOutput(const OutputRef& output) {
+  DCHECK(output.valid());
+
+  if (!output.connected()) {
+    return;
+  }
+
+  PartRef downstream_part = output.mate().part();
+  DisconnectOutput(output);
+  RemovePartsConnectedToPart(downstream_part);
+}
+
+void Graph::RemovePartsConnectedToInput(const InputRef& input) {
+  DCHECK(input.valid());
+
+  if (!input.connected()) {
+    return;
+  }
+
+  PartRef upstream_part = input.mate().part();
+  DisconnectInput(input);
+  RemovePartsConnectedToPart(upstream_part);
+}
+
+void Graph::Reset() {
+  sources_.clear();
+  sinks_.clear();
+  while (!stages_.empty()) {
+    Stage* stage = stages_.front();
+    stages_.pop_front();
+    delete stage;
+  }
+}
+
+void Graph::Prepare() {
+  for (Stage* sink : sinks_) {
+    for (size_t i = 0; i < sink->input_count(); ++i) {
+      engine_.PrepareInput(InputRef(sink, i));
+    }
+  }
+}
+
+void Graph::PrepareInput(const InputRef& input) {
+  DCHECK(input.valid());
+  engine_.PrepareInput(input);
+}
+
+void Graph::PrimeSinks() {
+  for (Stage* sink : sinks_) {
+    sink->Prime();
+  }
+}
+
+PartRef Graph::Add(Stage* stage) {
+  stages_.push_back(stage);
+
+  if (stage->input_count() == 0) {
+    sources_.push_back(stage);
+  }
+
+  if (stage->output_count() == 0) {
+    sinks_.push_back(stage);
+  }
+
+  stage->SetUpdateCallback(update_function_);
+
+  return PartRef(stage);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/graph.h b/services/media/framework/graph.h
new file mode 100644
index 0000000..de4e85e
--- /dev/null
+++ b/services/media/framework/graph.h
@@ -0,0 +1,205 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_GRAPH_H_
+#define SERVICES_MEDIA_FRAMEWORK_GRAPH_H_
+
+#include <list>
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/refs.h"
+#include "services/media/framework/stages/active_sink_stage.h"
+#include "services/media/framework/stages/active_source_stage.h"
+#include "services/media/framework/stages/multistream_source_stage.h"
+#include "services/media/framework/stages/stage.h"
+#include "services/media/framework/stages/transform_stage.h"
+
+namespace mojo {
+namespace media {
+
+namespace {
+
+// StageCreator::Create creates a stage for a part. DEFINE_STAGE_CREATOR defines
+// a specialization for a particular model/stage type pair. Every new
+// model/stage type pair that's defined will need an entry here.
+template<typename T, typename Enable = void> class StageCreator;
+
+#define DEFINE_STAGE_CREATOR(TModel, TStage) \
+template<typename T> \
+class StageCreator<T, typename std::enable_if< \
+    std::is_base_of<TModel, T>::value>::type> { \
+ public: \
+  static inline Stage* Create(std::shared_ptr<T> t_ptr) { \
+    return new TStage(std::shared_ptr<TModel>(t_ptr)); \
+  } \
+};
+
+DEFINE_STAGE_CREATOR(MultistreamSource, MultistreamSourceStage);
+DEFINE_STAGE_CREATOR(Transform, TransformStage);
+DEFINE_STAGE_CREATOR(ActiveSource, ActiveSourceStage);
+DEFINE_STAGE_CREATOR(ActiveSink, ActiveSinkStage);
+
+#undef DEFINE_STAGE_CREATOR
+
+} // namespace
+
+//
+// USAGE
+//
+// Graph is a container for sources, sinks and transforms ('parts') connected
+// in a graph. PartRef, InputRef and OutputRef are all
+// references to parts and their inputs and outputs. Graph provides a variety
+// of methods for adding and removing parts and for connecting inputs and
+// outputs to form a graph.
+//
+// The graph isn't thread-safe. If the graph is to be modified and/or
+// interrogated on multiple threads, the caller must provide its own lock
+// to prevent collisions. In this case, the caller must also acquire the same
+// lock when making calls that cause parts to add or remove inputs or outputs.
+//
+// The graph prevents the disconnection of prepared inputs and outputs. Once
+// a connected input/output pair is prepared, it must be unprepared before
+// disconnection. This allows the engine to operate freely over prepared
+// portions of the graph (prepare and unprepare are synchronized with the
+// engine).
+//
+// Parts added to the graph are referenced using shared pointers. The graph
+// holds pointers to the parts it contains, and the application, in many cases,
+// also holds pointers to the parts so it can call methods that are outside the
+// graph's scope. When a part is added, the graph returns a PartRef
+// object, which can be used to reference the part when the graph is modified.
+// PartRef objects can be interrogated to retrieve inputs (as
+// InputRef objects) and outputs (as OutputRef objects).
+//
+// Parts come in various flavors, defined by 'model' abstract classes. The
+// current list of supported models is:
+//
+//  ActiveSink        - a sink that consumes packets asynchronously
+//  ActiveSource      - a source that produces packets asynchronously
+//  MultistreamSource - a source that produces multiple streams of packets
+//                      synchronously
+//  Transform         - a synchronous transform that consumes and produces
+//                      packets via one input and one output
+//
+// Other models will be defined in the future as needed.
+//
+
+//
+// DESIGN
+//
+// The Graph is implemented as a system of cooperating objects. Of those
+// objects, only the graph itself is of relevance to code that uses Graph and
+// to part implementations. The other objects are:
+//
+// Stage
+// A stage hosts a single part. There are many subclasses of Stage, one for
+// each supported part model. The stage's job is to implement the contract
+// represented by the model so the parts that conform to the model can
+// participate in the operation of the graph. Stages are uniform with respect
+// to how they interact with graph. PartRef references a stage.
+//
+// Input
+// A stage possesses zero or more Input instances. Input objects
+// implement the supply of media into the stage and demand for media signalled
+// upstream. Inputs recieve media from Outputs in the form of packets
+// (type Packet).
+//
+// Output
+// A stage possesses zero or more Output instances. Output objects
+// implement the supply of media output of the stage to a downstream input and
+// demand for media signalled from that input.
+//
+
+// Host for a source, sink or transform.
+class Graph {
+ public:
+  Graph();
+
+  ~Graph();
+
+  // Adds a part to the graph.
+  template<typename T>
+  PartRef Add(std::shared_ptr<T> t_ptr) {
+    DCHECK(t_ptr);
+    return Add(StageCreator<T>::Create(t_ptr));
+  }
+
+  // Removes a part from the graph after disconnecting it from other parts.
+  void RemovePart(PartRef part);
+
+  // Connects an output connector to an input connector. Returns the dowstream
+  // part.
+  PartRef Connect(const OutputRef& output, const InputRef& input);
+
+  // Connects a part with exactly one output to a part with exactly one input.
+  // Returns the downstream part.
+  PartRef ConnectParts(PartRef upstream_part, PartRef downstream_part);
+
+  // Connects an output connector to a part that has exactly one input. Returns
+  // the downstream part.
+  PartRef ConnectOutputToPart(const OutputRef& output, PartRef downstream_part);
+
+  // Connects a part with exactly one output to an input connector. Returns the
+  // downstream part.
+  PartRef ConnectPartToInput(PartRef upstream_part, const InputRef& input);
+
+  // Disconnects an output connector and the input connector to which it's
+  // connected.
+  void DisconnectOutput(const OutputRef& output);
+
+  // Disconnects an input connector and the output connector to which it's
+  // connected.
+  void DisconnectInput(const InputRef& input);
+
+  // Disconnects and removes part and everything connected to it.
+  void RemovePartsConnectedToPart(PartRef part);
+
+  // Disconnects and removes everything connected to output.
+  void RemovePartsConnectedToOutput(const OutputRef& output);
+
+  // Disconnects and removes everything connected to input.
+  void RemovePartsConnectedToInput(const InputRef& input);
+
+  // Adds all the parts in t (which must all have one input and one output) and
+  // connects them in sequence to the output connector. Returns the output
+  // connector of the last part or the output parameter if it is empty.
+  template<typename T>
+  OutputRef AddAndConnectAll(OutputRef output, const T& t) {
+    for (const auto& element : t) {
+      PartRef part = Add(StageCreator<T>::Create(element));
+      Connect(output, part.input());
+      output = part.output();
+    }
+    return output;
+  }
+
+  // Removes all parts from the graph.
+  void Reset();
+
+  // Prepares the graph for operation.
+  void Prepare();
+
+  // Prepares the input and everything upstream of it. This method is used to
+  // prepare subgraphs added when the rest of the graph is already prepared.
+  void PrepareInput(const InputRef& input);
+
+  // Primes all the sinks in the graph.
+  void PrimeSinks();
+
+ private:
+  // Adds a stage to the graph.
+  PartRef Add(Stage* stage);
+
+  std::list<Stage*> stages_;
+  std::list<Stage*> sources_;
+  std::list<Stage*> sinks_;
+
+  Engine engine_;
+  Stage::UpdateCallback update_function_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_GRAPH_H_
diff --git a/services/media/framework/lpcm_util.cc b/services/media/framework/lpcm_util.cc
index ca20807..3b92781 100644
--- a/services/media/framework/lpcm_util.cc
+++ b/services/media/framework/lpcm_util.cc
@@ -15,17 +15,17 @@
  public:
   ~LpcmUtilImpl();
 
-  void Silence(void* buffer, uint64_t frame_count) const override;
+  void Silence(void* buffer, size_t frame_count) const override;
 
-  void Copy(const void* in, void* out, uint64_t frame_count) const override;
+  void Copy(const void* in, void* out, size_t frame_count) const override;
 
-  void Mix(const void* in, void* out, uint64_t frame_count) const override;
+  void Mix(const void* in, void* out, size_t frame_count) const override;
 
   void Interleave(
       const void* in,
-      uint64_t in_byte_count,
+      size_t in_byte_count,
       void* out,
-      uint64_t frame_count) const override;
+      size_t frame_count) const override;
 
  private:
   LpcmUtilImpl(const LpcmStreamType& stream_type);
@@ -69,10 +69,10 @@
 LpcmUtilImpl<T>::~LpcmUtilImpl() {}
 
 template<typename T>
-void LpcmUtilImpl<T>::Silence(void* buffer, uint64_t frame_count) const {
+void LpcmUtilImpl<T>::Silence(void* buffer, size_t frame_count) const {
   T* sample = reinterpret_cast<T*>(buffer);
   for (
-      uint64_t sample_countdown = frame_count * stream_type_.channels();
+      size_t sample_countdown = frame_count * stream_type_.channels();
       sample_countdown != 0;
       --sample_countdown) {
     *sample = 0;
@@ -81,33 +81,33 @@
 }
 
 template<>
-void LpcmUtilImpl<uint8_t>::Silence(void* buffer, uint64_t frame_count) const {
+void LpcmUtilImpl<uint8_t>::Silence(void* buffer, size_t frame_count) const {
   std::memset(buffer, 0x80, frame_count * stream_type_.bytes_per_frame());
 }
 
 template<>
-void LpcmUtilImpl<int16_t>::Silence(void* buffer, uint64_t frame_count) const {
+void LpcmUtilImpl<int16_t>::Silence(void* buffer, size_t frame_count) const {
   std::memset(buffer, 0, frame_count * stream_type_.bytes_per_frame());
 }
 
 template<>
-void LpcmUtilImpl<int32_t>::Silence(void* buffer, uint64_t frame_count) const {
+void LpcmUtilImpl<int32_t>::Silence(void* buffer, size_t frame_count) const {
   std::memset(buffer, 0, frame_count * stream_type_.bytes_per_frame());
 }
 
 template<typename T>
-void LpcmUtilImpl<T>::Copy(const void* in, void* out, uint64_t frame_count)
+void LpcmUtilImpl<T>::Copy(const void* in, void* out, size_t frame_count)
     const {
   std::memcpy(out, in, stream_type_.min_buffer_size(frame_count));
 }
 
 template<typename T>
-void LpcmUtilImpl<T>::Mix(const void* in, void* out, uint64_t frame_count)
+void LpcmUtilImpl<T>::Mix(const void* in, void* out, size_t frame_count)
     const {
   const T* in_sample = reinterpret_cast<const T*>(in);
   T* out_sample = reinterpret_cast<T*>(out);
   for (
-      uint64_t sample_countdown = frame_count * stream_type_.channels();
+      size_t sample_countdown = frame_count * stream_type_.channels();
       sample_countdown != 0;
       --sample_countdown) {
     *out_sample += *in_sample; // TODO(dalesat): Limit.
@@ -117,12 +117,12 @@
 }
 
 template<>
-void LpcmUtilImpl<uint8_t>::Mix(const void* in, void* out, uint64_t frame_count)
+void LpcmUtilImpl<uint8_t>::Mix(const void* in, void* out, size_t frame_count)
     const {
   const uint8_t* in_sample = reinterpret_cast<const uint8_t*>(in);
   uint8_t* out_sample = reinterpret_cast<uint8_t*>(out);
   for (
-      uint64_t sample_countdown = frame_count * stream_type_.channels();
+      size_t sample_countdown = frame_count * stream_type_.channels();
       sample_countdown != 0;
       --sample_countdown) {
     *out_sample = uint8_t(uint16_t(*out_sample) + uint16_t(*in_sample) - 0x80);
@@ -135,9 +135,9 @@
 template<typename T>
 void LpcmUtilImpl<T>::Interleave(
     const void* in,
-    uint64_t in_byte_count,
+    size_t in_byte_count,
     void* out,
-    uint64_t frame_count) const {
+    size_t frame_count) const {
   DCHECK(in);
   DCHECK(in_byte_count);
   DCHECK(out);
diff --git a/services/media/framework/lpcm_util.h b/services/media/framework/lpcm_util.h
index 6758472..888cfc8 100644
--- a/services/media/framework/lpcm_util.h
+++ b/services/media/framework/lpcm_util.h
@@ -20,13 +20,13 @@
   virtual ~LpcmUtil() {}
 
   // Fills the buffer with silence.
-  virtual void Silence(void* buffer, uint64_t frame_count) const = 0;
+  virtual void Silence(void* buffer, size_t frame_count) const = 0;
 
   // Copies samples.
-  virtual void Copy(const void* in, void* out, uint64_t frame_count) const = 0;
+  virtual void Copy(const void* in, void* out, size_t frame_count) const = 0;
 
   // Mixes samples.
-  virtual void Mix(const void* in, void* out, uint64_t frame_count) const = 0;
+  virtual void Mix(const void* in, void* out, size_t frame_count) const = 0;
 
   // Interleaves non-interleaved samples. This assumes ffmpeg non-interleaved
   // ("planar") layout, in which the buffer (in) is divided evenly into a
@@ -35,9 +35,9 @@
   // (hence the in_type_count and the frame_count).
   virtual void Interleave(
       const void* in,
-      uint64_t in_byte_count,
+      size_t in_byte_count,
       void* out,
-      uint64_t frame_count) const = 0;
+      size_t frame_count) const = 0;
 };
 
 }  // namespace media
diff --git a/services/media/framework/metadata.cc b/services/media/framework/metadata.cc
index d62b4b7..bd39df8 100644
--- a/services/media/framework/metadata.cc
+++ b/services/media/framework/metadata.cc
@@ -8,7 +8,7 @@
 namespace media {
 
 // static
-MetadataPtr Metadata::Create(
+std::unique_ptr<Metadata> Metadata::Create(
     uint64_t duration_ns,
     const std::string& title,
     const std::string& artist,
@@ -16,7 +16,7 @@
     const std::string& publisher,
     const std::string& genre,
     const std::string& composer) {
-  return MetadataPtr(new Metadata(
+  return std::unique_ptr<Metadata>(new Metadata(
       duration_ns,
       title,
       artist,
diff --git a/services/media/framework/metadata.h b/services/media/framework/metadata.h
index ee44240..2d46358 100644
--- a/services/media/framework/metadata.h
+++ b/services/media/framework/metadata.h
@@ -9,22 +9,18 @@
 #include <string>
 
 #include "base/macros.h"
-#include "services/media/framework/ptr.h"
+#include "services/media/framework/safe_clone.h"
 
 namespace mojo {
 namespace media {
 
 class Metadata;
 
-// TODO(dalesat): Get rid of typedefs like these.
-typedef UniquePtr<Metadata> MetadataPtr;
-
 // Container for content metadata.
 // TODO(dalesat): Probably needs to be extensible. Consider using map-like.
 class Metadata {
  public:
-  // TODO(dalesat): Rename methods like this 'Create'.
-  static MetadataPtr Create(
+  static std::unique_ptr<Metadata> Create(
       uint64_t duration_ns,
       const std::string& title,
       const std::string& artist,
@@ -49,7 +45,7 @@
 
   const std::string& composer() const { return composer_; }
 
-  MetadataPtr Clone() const {
+  std::unique_ptr<Metadata> Clone() const {
     return Create(
         duration_ns_,
         title_,
diff --git a/services/media/framework/models/active_multistream_sink.h b/services/media/framework/models/active_multistream_sink.h
new file mode 100644
index 0000000..c830bba
--- /dev/null
+++ b/services/media/framework/models/active_multistream_sink.h
@@ -0,0 +1,52 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_ACTIVE_MULTISTREAM_SINK_H_
+#define MOJO_MEDIA_MODELS_ACTIVE_MULTISTREAM_SINK_H_
+
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/models/part.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+// Host for ActiveMultistreamSink.
+class ActiveMultistreamSinkHost {
+ public:
+  virtual ~ActiveMultistreamSinkHost() {}
+
+  // TODO(dalesat): Revisit allocation semantics.
+
+  // Allocates an input and returns its index.
+  virtual size_t AllocateInput() = 0;
+
+  // Releases a previously-allocated input and returns the container size
+  // required to hold the remaining inputs (i.e. max input index + 1). The
+  // return value can be used to resize the caller's input container.
+  virtual size_t ReleaseInput(size_t index) = 0;
+
+  // Updates demand for the specified input.
+  virtual void UpdateDemand(size_t input_index, Demand demand) = 0;
+};
+
+// Synchronous sink of packets for multiple streams.
+class ActiveMultistreamSink : public Part {
+ public:
+  ~ActiveMultistreamSink() override {}
+
+  // Sets the host callback interface.
+  virtual void SetHost(ActiveMultistreamSinkHost* host) = 0;
+
+  // Initiates demand.
+  virtual void Prime() = 0;
+
+  // Supplies a packet to the sink, returning the new demand for the input.
+  virtual Demand SupplyPacket(size_t input_index, PacketPtr packet) = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_ACTIVE_MULTISTREAM_SINK_H_
diff --git a/services/media/framework/models/active_sink.h b/services/media/framework/models/active_sink.h
index 955fe06..51e387b 100644
--- a/services/media/framework/models/active_sink.h
+++ b/services/media/framework/models/active_sink.h
@@ -5,41 +5,35 @@
 #ifndef MOJO_MEDIA_MODELS_ACTIVE_SINK_H_
 #define MOJO_MEDIA_MODELS_ACTIVE_SINK_H_
 
-#include <memory>
-
-#include "services/media/framework/allocator.h"
 #include "services/media/framework/models/demand.h"
+#include "services/media/framework/models/part.h"
 #include "services/media/framework/packet.h"
+#include "services/media/framework/payload_allocator.h"
 
 namespace mojo {
 namespace media {
 
 // Sink that consumes packets asynchronously.
-class ActiveSink {
+class ActiveSink : public Part {
  public:
   using DemandCallback = std::function<void(Demand demand)>;
 
-  virtual ~ActiveSink() {}
+  ~ActiveSink() override {}
 
-  // Indicates whether the sink must allocate.
-  virtual bool must_allocate() const = 0;
-
-  // The consumer's allocator. Can return nullptr, in which case the default
-  // allocator should be used.
-  virtual Allocator* allocator() = 0;
+  // An allocator that must be used for supplied packets or nullptr if there's
+  // no such requirement.
+  virtual PayloadAllocator* allocator() = 0;
 
   // Sets the callback that signals demand asynchronously.
-  virtual void SetDemandCallback(DemandCallback demand_callback) = 0;
+  virtual void SetDemandCallback(const DemandCallback& demand_callback) = 0;
 
   // Initiates demand.
   virtual void Prime() = 0;
 
-  // Supplies a packet to the sink.
+  // Supplies a packet to the sink, returning the new demand for the input.
   virtual Demand SupplyPacket(PacketPtr packet) = 0;
 };
 
-typedef std::shared_ptr<ActiveSink> ActiveSinkPtr;
-
 }  // namespace media
 }  // namespace mojo
 
diff --git a/services/media/framework/models/active_source.h b/services/media/framework/models/active_source.h
index a1b7b82..e286279 100644
--- a/services/media/framework/models/active_source.h
+++ b/services/media/framework/models/active_source.h
@@ -5,37 +5,34 @@
 #ifndef MOJO_MEDIA_MODELS_ACTIVE_SOURCE_H_
 #define MOJO_MEDIA_MODELS_ACTIVE_SOURCE_H_
 
-#include <memory>
-
-#include "services/media/framework/allocator.h"
 #include "services/media/framework/models/demand.h"
+#include "services/media/framework/models/part.h"
 #include "services/media/framework/packet.h"
+#include "services/media/framework/payload_allocator.h"
 
 namespace mojo {
 namespace media {
 
 // Source that produces packets asynchronously.
-class ActiveSource {
+class ActiveSource : public Part {
  public:
   using SupplyCallback = std::function<void(PacketPtr packet)>;
 
-  virtual ~ActiveSource() {}
+  ~ActiveSource() override {}
 
   // Whether the source can accept an allocator.
   virtual bool can_accept_allocator() const = 0;
 
   // Sets the allocator for the source.
-  virtual void set_allocator(Allocator* allocator) = 0;
+  virtual void set_allocator(PayloadAllocator* allocator) = 0;
 
   // Sets the callback that supplies a packet asynchronously.
-  virtual void SetSupplyCallback(SupplyCallback supply_callback) = 0;
+  virtual void SetSupplyCallback(const SupplyCallback& supply_callback) = 0;
 
   // Sets the demand signalled from downstream.
   virtual void SetDownstreamDemand(Demand demand) = 0;
 };
 
-typedef std::shared_ptr<ActiveSource> ActiveSourcePtr;
-
 }  // namespace media
 }  // namespace mojo
 
diff --git a/services/media/framework/models/lpcm_frame_buffer.cc b/services/media/framework/models/lpcm_frame_buffer.cc
deleted file mode 100644
index a99696c..0000000
--- a/services/media/framework/models/lpcm_frame_buffer.cc
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/models/lpcm_frame_buffer.h"
-
-namespace mojo {
-namespace media {
-
-LpcmFrameBuffer::LpcmFrameBuffer() :
-    bytes_per_frame_(0),
-    remaining_buffer_(nullptr),
-    remaining_frame_count_(0),
-    exhausted_callback_(nullptr) {}
-
-LpcmFrameBuffer::~LpcmFrameBuffer() {}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/models/lpcm_frame_buffer.h b/services/media/framework/models/lpcm_frame_buffer.h
deleted file mode 100644
index e9db867..0000000
--- a/services/media/framework/models/lpcm_frame_buffer.h
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef MOJO_MEDIA_MODELS_LPCM_FRAME_BUFFER_H_
-#define MOJO_MEDIA_MODELS_LPCM_FRAME_BUFFER_H_
-
-#include <cstdint>
-#include <functional>
-
-#include "base/logging.h"
-
-namespace mojo {
-namespace media {
-
-// References an LPCM frame buffer and implements advancement through it.
-class LpcmFrameBuffer {
- public:
-  using ExhaustedCallback = std::function<void()>;
-
-  LpcmFrameBuffer();
-
-  ~LpcmFrameBuffer();
-
-  void set_bytes_per_frame(uint32_t bytes_per_frame) {
-    bytes_per_frame_ = bytes_per_frame;
-  }
-
-  uint32_t bytes_per_frame() const {
-    return bytes_per_frame_;
-  }
-
-  // The remaining frame buffer.
-  void* buffer() const {
-    return remaining_buffer_;
-  }
-
-  // The remaining number of frames accommodated by the frame buffer.
-  uint64_t frame_count() const {
-    return remaining_frame_count_;
-  }
-
-  // Resets the buffer and frame.
-  void Reset() {
-    remaining_buffer_ = nullptr;
-    remaining_frame_count_ = 0;
-    exhausted_callback_ = nullptr;
-  }
-
-  // Sets the buffer and frame count.
-  void Set(
-      void* buffer,
-      uint64_t frame_count,
-      ExhaustedCallback exhausted_callback = nullptr) {
-    remaining_buffer_ = buffer;
-    remaining_frame_count_ = frame_count;
-    exhausted_callback_ = exhausted_callback;
-  }
-
-  // Updates buffer and frame_count to reflect use of the buffer.
-  void Advance(uint64_t frame_count) {
-    DCHECK(remaining_buffer_);
-    DCHECK(frame_count <= remaining_frame_count_);
-    remaining_buffer_ = reinterpret_cast<uint8_t*>(remaining_buffer_) +
-        (frame_count * bytes_per_frame_);
-    remaining_frame_count_ -= frame_count;
-    if (remaining_frame_count_ == 0 && exhausted_callback_ != nullptr) {
-      exhausted_callback_();
-      exhausted_callback_ = nullptr;
-    }
-  }
-
- private:
-  uint32_t bytes_per_frame_;
-  void* remaining_buffer_;
-  uint64_t remaining_frame_count_;
-  ExhaustedCallback exhausted_callback_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif // MOJO_MEDIA_MODELS_LPCM_FRAME_BUFFER_H_
diff --git a/services/media/framework/models/lpcm_transform.h b/services/media/framework/models/lpcm_transform.h
deleted file mode 100644
index e1ec0c1..0000000
--- a/services/media/framework/models/lpcm_transform.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef MOJO_MEDIA_MODELS_LPCM_TRANSFORM_H_
-#define MOJO_MEDIA_MODELS_LPCM_TRANSFORM_H_
-
-#include <memory>
-
-#include "services/media/framework/models/lpcm_frame_buffer.h"
-#include "services/media/framework/stream_type.h"
-
-namespace mojo {
-namespace media {
-
-// Synchronous lpcm transform.
-class LpcmTransform {
- public:
-  virtual ~LpcmTransform() {}
-
-  // Gets the input stream type.
-  virtual const LpcmStreamType& input_stream_type() const = 0;
-
-  // Gets the output stream type.
-  virtual const LpcmStreamType& output_stream_type() const = 0;
-
-  // Processes frames.
-  virtual void TransformFrames(
-      LpcmFrameBuffer* source,
-      LpcmFrameBuffer* dest,
-      bool mix) = 0;
-};
-
-typedef std::shared_ptr<LpcmTransform> LpcmTransformPtr;
-
-}  // namespace media
-}  // namespace mojo
-
-#endif // MOJO_MEDIA_MODELS_LPCM_TRANSFORM_H_
diff --git a/services/media/framework/models/multistream_packet_source.h b/services/media/framework/models/multistream_source.h
similarity index 61%
rename from services/media/framework/models/multistream_packet_source.h
rename to services/media/framework/models/multistream_source.h
index b5072f1..01a82e9 100644
--- a/services/media/framework/models/multistream_packet_source.h
+++ b/services/media/framework/models/multistream_source.h
@@ -2,11 +2,10 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#ifndef MOJO_MEDIA_MODELS_MULTISTREAM_PACKET_SOURCE_H_
-#define MOJO_MEDIA_MODELS_MULTISTREAM_PACKET_SOURCE_H_
+#ifndef MOJO_MEDIA_MODELS_MULTISTREAM_SOURCE_H_
+#define MOJO_MEDIA_MODELS_MULTISTREAM_SOURCE_H_
 
-#include <memory>
-
+#include "services/media/framework/models/part.h"
 #include "services/media/framework/packet.h"
 
 namespace mojo {
@@ -14,22 +13,20 @@
 
 // Synchronous source of packets for multiple streams. This is currently used
 // by Demux, though it would be better if Demux were asynchronous.
-class MultiStreamPacketSource {
+class MultistreamSource : public Part {
  public:
-  virtual ~MultiStreamPacketSource() {}
+  ~MultistreamSource() override {}
 
   // Returns the number of streams the source produces.
-  virtual uint32_t stream_count() const = 0;
+  virtual size_t stream_count() const = 0;
 
   // Gets a packet for the stream indicated via stream_index_out. This call
   // should always produce a packet until end-of-stream. The caller is
   // responsible for releasing the packet.
-  virtual PacketPtr PullPacket(uint32_t *stream_index_out) = 0;
+  virtual PacketPtr PullPacket(size_t *stream_index_out) = 0;
 };
 
-typedef std::shared_ptr<MultiStreamPacketSource> MultiStreamPacketSourcePtr;
-
 }  // namespace media
 }  // namespace mojo
 
-#endif // MOJO_MEDIA_MODELS_MULTISTREAM_PACKET_SOURCE_H_
+#endif // MOJO_MEDIA_MODELS_MULTISTREAM_SOURCE_H_
diff --git a/services/media/framework/models/part.h b/services/media/framework/models/part.h
new file mode 100644
index 0000000..afa7651
--- /dev/null
+++ b/services/media/framework/models/part.h
@@ -0,0 +1,23 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MODELS_PART_H_
+#define SERVICES_MEDIA_FRAMEWORK_MODELS_PART_H_
+
+namespace mojo {
+namespace media {
+
+// Host for a source, sink or transform.
+class Part {
+ public:
+  virtual ~Part() {}
+
+  // Flushes media state.
+  virtual void Flush() {}
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MODELS_PART_H_
diff --git a/services/media/framework/models/packet_transform.h b/services/media/framework/models/transform.h
similarity index 70%
rename from services/media/framework/models/packet_transform.h
rename to services/media/framework/models/transform.h
index b25069b..0a17d32 100644
--- a/services/media/framework/models/packet_transform.h
+++ b/services/media/framework/models/transform.h
@@ -2,21 +2,20 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#ifndef MOJO_MEDIA_MODELS_PACKET_TRANSFORM_H_
-#define MOJO_MEDIA_MODELS_PACKET_TRANSFORM_H_
+#ifndef MOJO_MEDIA_MODELS_TRANSFORM_H_
+#define MOJO_MEDIA_MODELS_TRANSFORM_H_
 
-#include <memory>
-
-#include "services/media/framework/allocator.h"
+#include "services/media/framework/models/part.h"
 #include "services/media/framework/packet.h"
+#include "services/media/framework/payload_allocator.h"
 
 namespace mojo {
 namespace media {
 
 // Synchronous packet transform.
-class PacketTransform {
+class Transform : public Part {
  public:
-  virtual ~PacketTransform() {}
+  ~Transform() override {}
 
   // Processes a packet. Returns true to indicate the transform is done
   // processing the input packet. Returns false to indicate the input
@@ -26,13 +25,11 @@
   virtual bool TransformPacket(
       const PacketPtr& input,
       bool new_input,
-      Allocator* allocator,
+      PayloadAllocator* allocator,
       PacketPtr* output) = 0;
 };
 
-typedef std::shared_ptr<PacketTransform> PacketTransformPtr;
-
 }  // namespace media
 }  // namespace mojo
 
-#endif // MOJO_MEDIA_MODELS_PACKET_TRANSFORM_H_
+#endif // MOJO_MEDIA_MODELS_TRANSFORM_H_
diff --git a/services/media/framework/packet.cc b/services/media/framework/packet.cc
index 9b02493..32204bc 100644
--- a/services/media/framework/packet.cc
+++ b/services/media/framework/packet.cc
@@ -3,8 +3,8 @@
 // found in the LICENSE file.
 
 #include "base/logging.h"
-#include "services/media/framework/allocator.h"
 #include "services/media/framework/packet.h"
+#include "services/media/framework/payload_allocator.h"
 
 namespace mojo {
 namespace media {
@@ -15,9 +15,9 @@
       int64_t presentation_time,
       uint64_t duration,
       bool end_of_stream,
-      uint64_t size,
+      size_t size,
       void* payload,
-      Allocator* allocator) :
+      PayloadAllocator* allocator) :
       presentation_time_(presentation_time),
       duration_(duration),
       end_of_stream_(end_of_stream),
@@ -35,7 +35,7 @@
 
   bool end_of_stream() const override { return end_of_stream_; }
 
-  uint64_t size() const override { return size_; }
+  size_t size() const override { return size_; }
 
   void* payload() const override { return payload_; }
 
@@ -52,9 +52,9 @@
   int64_t presentation_time_;
   uint64_t duration_;
   bool end_of_stream_;
-  uint64_t size_;
+  size_t size_;
   void* payload_;
-  Allocator* allocator_;
+  PayloadAllocator* allocator_;
 };
 
 // static
@@ -62,9 +62,9 @@
     int64_t presentation_time,
     uint64_t duration,
     bool end_of_stream,
-    uint64_t size,
+    size_t size,
     void* payload,
-    Allocator* allocator) {
+    PayloadAllocator* allocator) {
   DCHECK(payload == nullptr || allocator != nullptr);
   return PacketPtr(new PacketImpl(
       presentation_time,
@@ -80,7 +80,7 @@
     int64_t presentation_time,
     uint64_t duration,
     bool end_of_stream,
-    uint64_t size,
+    size_t size,
     void* payload) {
   return PacketPtr(new PacketImpl(
       presentation_time,
diff --git a/services/media/framework/packet.h b/services/media/framework/packet.h
index bbc0125..21caa4c 100644
--- a/services/media/framework/packet.h
+++ b/services/media/framework/packet.h
@@ -8,8 +8,7 @@
 #include <memory>
 
 #include "base/logging.h"
-#include "services/media/framework/allocator.h"
-#include "services/media/framework/ptr.h"
+#include "services/media/framework/payload_allocator.h"
 
 namespace mojo {
 namespace media {
@@ -22,7 +21,7 @@
 };
 
 // Unique pointer for packets.
-typedef UniquePtr<Packet, PacketDeleter> PacketPtr;
+typedef std::unique_ptr<Packet, PacketDeleter> PacketPtr;
 
 // Media packet abstract base class. Subclasses may be defined as needed.
 // Packet::Create and Packet::CreateEndOfStream use an implementation with
@@ -38,9 +37,9 @@
       int64_t presentation_time,
       uint64_t duration,
       bool end_of_stream,
-      uint64_t size,
+      size_t size,
       void* payload,
-      Allocator* allocator);
+      PayloadAllocator* allocator);
 
   // Creates a packet. If size is 0, payload must be nullptr and vice-versa.
   // No allocator is provided, and the payload will not be released when the
@@ -49,7 +48,7 @@
       int64_t presentation_time,
       uint64_t duration,
       bool end_of_stream,
-      uint64_t size,
+      size_t size,
       void* payload);
 
   // Creates an end-of-stream packet with no payload.
@@ -61,7 +60,7 @@
 
   virtual bool end_of_stream() const = 0;
 
-  virtual uint64_t size() const = 0;
+  virtual size_t size() const = 0;
 
   virtual void* payload() const = 0;
 
diff --git a/services/media/framework/parts/decoder.h b/services/media/framework/parts/decoder.h
index 1818dc6..0a9bf00 100644
--- a/services/media/framework/parts/decoder.h
+++ b/services/media/framework/parts/decoder.h
@@ -5,35 +5,31 @@
 #ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_DECODER_H_
 #define SERVICES_MEDIA_FRAMEWORK_PARTS_DECODER_H_
 
-#include "services/media/framework/allocator.h"
-#include "services/media/framework/models/packet_transform.h"
+#include "services/media/framework/models/transform.h"
 #include "services/media/framework/packet.h"
+#include "services/media/framework/payload_allocator.h"
 #include "services/media/framework/result.h"
 #include "services/media/framework/stream_type.h"
 
 namespace mojo {
 namespace media {
 
-class Decoder;
-
-typedef SharedPtr<Decoder, PacketTransform> DecoderPtr;
-
 // Abstract base class for transforms that decode compressed media.
-class Decoder : public PacketTransform {
+class Decoder : public Transform {
  public:
   // Creates a Decoder object for a given stream type.
   static Result Create(
-      const StreamTypePtr& stream_type,
-      DecoderPtr* decoder_out);
+      const std::unique_ptr<StreamType>& stream_type,
+      std::shared_ptr<Decoder>* decoder_out);
 
   ~Decoder() override {}
 
   // Returns the type of the stream the decoder will produce.
-  virtual StreamTypePtr output_stream_type() = 0;
+  virtual std::unique_ptr<StreamType> output_stream_type() = 0;
 
  protected:
   // Initializes the decoder. Called by Decoder::Create.
-  virtual Result Init(const StreamTypePtr& stream_type) = 0;
+  virtual Result Init(const std::unique_ptr<StreamType>& stream_type) = 0;
 };
 
 }  // namespace media
diff --git a/services/media/framework/parts/demux.h b/services/media/framework/parts/demux.h
index a522cda..b0afd98 100644
--- a/services/media/framework/parts/demux.h
+++ b/services/media/framework/parts/demux.h
@@ -9,7 +9,7 @@
 #include <vector>
 
 #include "services/media/framework/metadata.h"
-#include "services/media/framework/models/multistream_packet_source.h"
+#include "services/media/framework/models/multistream_source.h"
 #include "services/media/framework/packet.h"
 #include "services/media/framework/parts/reader.h"
 #include "services/media/framework/result.h"
@@ -18,13 +18,9 @@
 namespace mojo {
 namespace media {
 
-class Demux;
-
-typedef SharedPtr<Demux, MultiStreamPacketSource> DemuxPtr;
-
 // Abstract base class for sources that parse input from a reader and
 // produce one or more output streams.
-class Demux : public MultiStreamPacketSource {
+class Demux : public MultistreamSource {
  public:
   // Represents a stream produced by the demux.
   class DemuxStream {
@@ -33,13 +29,15 @@
    public:
     virtual ~DemuxStream() {}
 
-    virtual uint32_t index() const = 0;
+    virtual size_t index() const = 0;
 
-    virtual StreamTypePtr stream_type() const = 0;
+    virtual std::unique_ptr<StreamType> stream_type() const = 0;
   };
 
   // Creates a Demux object for a given reader.
-  static Result Create(ReaderPtr reader, DemuxPtr* demux_out);
+  static Result Create(
+      std::shared_ptr<Reader> reader,
+      std::shared_ptr<Demux>* demux_out);
 
   ~Demux() override {}
 
@@ -54,21 +52,13 @@
   // support this.
 
   // Initializes the demux.
-  virtual Result Init(ReaderPtr reader) = 0;
+  virtual Result Init(std::shared_ptr<Reader> reader) = 0;
 
   // Gets the current metadata.
-  virtual MetadataPtr metadata() const = 0;
+  virtual std::unique_ptr<Metadata> metadata() const = 0;
 
   // Gets the stream collection.
   virtual const std::vector<DemuxStream*>& streams() const = 0;
-
-  // MultiStreamProducer implementation (deferred to subclasses).
-  // bool can_accept_allocator(uint32_t stream_index) const override;
-  // void set_allocator(uint32_t stream_index, Allocator* allocator) override;
-
-  // MultiStreamPacketSource implementation (deferred to subclasses).
-  // uint32_t stream_count() const override;
-  // PacketPtr PullPacket(uint32_t* stream_index_out) override;
 };
 
 }  // namespace media
diff --git a/services/media/framework/parts/file_reader.h b/services/media/framework/parts/file_reader.h
index ba7c588..5449286 100644
--- a/services/media/framework/parts/file_reader.h
+++ b/services/media/framework/parts/file_reader.h
@@ -13,8 +13,8 @@
 // Reads raw data from a file.
 class FileReader : public Reader {
  public:
-  static ReaderPtr Create() {
-    return ReaderPtr(new FileReader());
+  static std::shared_ptr<Reader> Create() {
+    return std::shared_ptr<Reader>(new FileReader());
   }
 
   ~FileReader() override;
diff --git a/services/media/framework/parts/lpcm_reformatter.cc b/services/media/framework/parts/lpcm_reformatter.cc
index 03beb25..91c86f2 100644
--- a/services/media/framework/parts/lpcm_reformatter.cc
+++ b/services/media/framework/parts/lpcm_reformatter.cc
@@ -19,22 +19,19 @@
 
   ~LpcmReformatterImpl() override;
 
-  // LpcmTransform implementation.
-  const LpcmStreamType& input_stream_type() const override;
-
-  const LpcmStreamType& output_stream_type() const override;
-
-  void TransformFrames(
-      LpcmFrameBuffer* source,
-      LpcmFrameBuffer* dest,
-      bool mix) override;
+  // Transform implementation.
+  bool TransformPacket(
+      const PacketPtr& input,
+      bool new_input,
+      PayloadAllocator* allocator,
+      PacketPtr* output) override;
 
  private:
   LpcmStreamType in_type_;
   LpcmStreamType out_type_;
 };
 
-LpcmReformatterPtr LpcmReformatter::Create(
+std::shared_ptr<LpcmReformatter> LpcmReformatter::Create(
     const LpcmStreamType& in_type,
     const LpcmStreamTypeSet& out_type) {
   LpcmReformatter* result = nullptr;
@@ -145,7 +142,7 @@
       break;
   }
 
-  return LpcmReformatterPtr(result);
+  return std::shared_ptr<LpcmReformatter>(result);
 }
 
 template<typename TIn, typename TOut>
@@ -184,164 +181,118 @@
 }
 
 template<typename TIn, typename TOut>
-inline void CopySample(TOut* dest, TIn* source) {
+inline void CopySample(TOut* dest, const TIn* source) {
   *dest = static_cast<TOut>(*source);
 }
 
-inline void CopySample(uint8_t* dest, int16_t* source) {
+inline void CopySample(uint8_t* dest, const int16_t* source) {
   *dest = static_cast<uint8_t>((*source >> 8) ^ 0x80);
 }
 
-inline void CopySample(uint8_t* dest, int32_t* source) {
+inline void CopySample(uint8_t* dest, const int32_t* source) {
   *dest = static_cast<uint8_t>((Clamp(*source) >> 16) ^ 0x80);
 }
 
-inline void CopySample(uint8_t* dest, float* source) {
+inline void CopySample(uint8_t* dest, const float* source) {
   *dest = static_cast<uint8_t>((Clamp(*source) * 0x7f) + 128);
 }
 
-inline void CopySample(int16_t* dest, uint8_t* source) {
+inline void CopySample(int16_t* dest, const uint8_t* source) {
   *dest = static_cast<int16_t>(*source ^ 0x80) << 8;
 }
 
-inline void CopySample(int16_t* dest, int32_t* source) {
+inline void CopySample(int16_t* dest, const int32_t* source) {
   *dest = static_cast<int16_t>(Clamp(*source) >> 8);
 }
 
-inline void CopySample(int16_t* dest, float* source) {
+inline void CopySample(int16_t* dest, const float* source) {
   *dest = static_cast<int16_t>(Clamp(*source) * 0x7fff);
 }
 
-inline void CopySample(int32_t* dest, uint8_t* source) {
+inline void CopySample(int32_t* dest, const uint8_t* source) {
   *dest = static_cast<int32_t>(*source ^ 0x80) << 16;
 }
 
-inline void CopySample(int32_t* dest, int16_t* source) {
+inline void CopySample(int32_t* dest, const int16_t* source) {
   *dest = static_cast<int32_t>(*source << 8);
 }
 
-inline void CopySample(int32_t* dest, float* source) {
+inline void CopySample(int32_t* dest, const float* source) {
   *dest = static_cast<int32_t>(Clamp(*source) * 0x7fffff);
 }
 
-inline void CopySample(float* dest, uint8_t* source) {
+inline void CopySample(float* dest, const uint8_t* source) {
   *dest = static_cast<float>(*source ^ 0x80) / 0x80;
 }
 
-inline void CopySample(float* dest, int16_t* source) {
+inline void CopySample(float* dest, const int16_t* source) {
   *dest = static_cast<float>(*source) / 0x8000;
 }
 
-inline void CopySample(float* dest, int32_t* source) {
+inline void CopySample(float* dest, const int32_t* source) {
   *dest = static_cast<float>(Clamp(*source)) / 0x800000;
 }
 
-template<typename TIn, typename TOut>
-inline void MixSample(TOut* dest, TIn* source) {
-  *dest += static_cast<TOut>(*source);
-}
-
-inline void MixSample(uint8_t* dest, int16_t* source) {
-  *dest += static_cast<uint8_t>((*source >> 8) ^ 0x80);
-}
-
-inline void MixSample(uint8_t* dest, int32_t* source) {
-  *dest += static_cast<uint8_t>((Clamp(*source) >> 16) ^ 0x80);
-}
-
-inline void MixSample(uint8_t* dest, float* source) {
-  *dest += static_cast<uint8_t>((Clamp(*source) * 0x7f) + 128);
-}
-
-inline void MixSample(int16_t* dest, uint8_t* source) {
-  *dest += static_cast<int16_t>(*source ^ 0x80) << 8;
-}
-
-inline void MixSample(int16_t* dest, int32_t* source) {
-  *dest += static_cast<int16_t>(Clamp(*source) >> 8);
-}
-
-inline void MixSample(int16_t* dest, float* source) {
-  *dest += static_cast<int16_t>(Clamp(*source) * 0x7fff);
-}
-
-inline void MixSample(int32_t* dest, uint8_t* source) {
-  *dest += static_cast<int32_t>(*source ^ 0x80) << 16;
-}
-
-inline void MixSample(int32_t* dest, int16_t* source) {
-  *dest += static_cast<int32_t>(*source << 8);
-}
-
-inline void MixSample(int32_t* dest, float* source) {
-  *dest += static_cast<int32_t>(Clamp(*source) * 0x7fffff);
-}
-
-inline void MixSample(float* dest, uint8_t* source) {
-  *dest += static_cast<float>(*source ^ 0x80) / 0x80;
-}
-
-inline void MixSample(float* dest, int16_t* source) {
-  *dest += static_cast<float>(*source) / 0x8000;
-}
-
-inline void MixSample(float* dest, int32_t* source) {
-  *dest += static_cast<float>(Clamp(*source)) / 0x800000;
-}
-
 } // namespace
 
 template<typename TIn, typename TOut>
-const LpcmStreamType& LpcmReformatterImpl<TIn, TOut>::input_stream_type()
-    const {
-  return in_type_;
-}
+bool LpcmReformatterImpl<TIn, TOut>::TransformPacket(
+    const PacketPtr& input,
+    bool new_input,
+    PayloadAllocator* allocator,
+    PacketPtr* output) {
+  DCHECK(input);
+  DCHECK(allocator);
+  DCHECK(output);
 
-template<typename TIn, typename TOut>
-const LpcmStreamType& LpcmReformatterImpl<TIn, TOut>::output_stream_type()
-    const {
-  return out_type_;
-}
-
-template<typename TIn, typename TOut>
-void LpcmReformatterImpl<TIn, TOut>::TransformFrames(
-    LpcmFrameBuffer* source,
-    LpcmFrameBuffer* dest,
-    bool mix) {
-  DCHECK(source);
-  DCHECK(dest);
-  DCHECK(source->buffer());
-  DCHECK(source->frame_count());
-  DCHECK(dest->buffer());
-  DCHECK(dest->frame_count());
-
-  uint64_t frame_count = std::min(source->frame_count(), dest->frame_count());
-
-  uint8_t* in_channel = static_cast<uint8_t*>(source->buffer());
-  uint8_t* out_channel = static_cast<uint8_t*>(dest->buffer());
-
-  for (uint32_t channel = 0; channel < in_type_.channels(); channel++) {
-    TIn* in_sample = reinterpret_cast<TIn*>(in_channel);
-    TOut* out_sample = reinterpret_cast<TOut*>(out_channel);
-    if (mix) {
-      for (uint64_t sample = 0; sample < frame_count; sample++) {
-        MixSample(out_sample, in_sample);
-        in_sample += in_type_.channels();
-        out_sample += out_type_.channels();
-      }
-    } else {
-      for (uint64_t sample = 0; sample < frame_count; sample++) {
-        CopySample(out_sample, in_sample);
-        in_sample += in_type_.channels();
-        out_sample += out_type_.channels();
-      }
-    }
-    in_channel += in_type_.sample_size();
-    out_channel += out_type_.sample_size();
+  uint64_t in_size = input->size();
+  if (in_size == 0) {
+    // Zero-sized input packet. Make a copy.
+    *output = Packet::Create(
+        input->presentation_time(),
+        input->duration(),
+        input->end_of_stream(),
+        0,
+        nullptr,
+        nullptr);
+    return true;
   }
 
-  source->Advance(frame_count);
-  dest->Advance(frame_count);
+  size_t frame_count = input->duration();
+  uint64_t out_size = out_type_.min_buffer_size(frame_count);
+
+  void* buffer = allocator->AllocatePayloadBuffer(out_size);
+  if (buffer == nullptr) {
+    LOG(WARNING) << "lpcm reformatter starved for buffers";
+    // Starved for buffer space. Can't process now.
+    *output = nullptr;
+    return false;
+  }
+
+  const TIn* in_channel = static_cast<const TIn*>(input->payload());
+  TOut* out_channel = static_cast<TOut*>(buffer);
+
+  for (uint32_t channel = 0; channel < in_type_.channels(); channel++) {
+    const TIn* in_sample = in_channel;
+    TOut* out_sample = out_channel;
+    for (size_t sample = 0; sample < frame_count; sample++) {
+      CopySample(out_sample, in_sample);
+      in_sample += in_type_.channels();
+      out_sample += out_type_.channels();
+    }
+    ++in_channel;
+    ++out_channel;
+  }
+
+  *output = Packet::Create(
+      input->presentation_time(),
+      frame_count,
+      input->end_of_stream(),
+      out_size,
+      buffer,
+      allocator);
+
+  return true;
 }
 
 }  // namespace media
diff --git a/services/media/framework/parts/lpcm_reformatter.h b/services/media/framework/parts/lpcm_reformatter.h
index b275303..06b6ab6 100644
--- a/services/media/framework/parts/lpcm_reformatter.h
+++ b/services/media/framework/parts/lpcm_reformatter.h
@@ -5,21 +5,17 @@
 #ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_LPCM_REFORMATTER_H_
 #define SERVICES_MEDIA_FRAMEWORK_PARTS_LPCM_REFORMATTER_H_
 
-#include "services/media/framework/models/lpcm_transform.h"
+#include "services/media/framework/models/transform.h"
 #include "services/media/framework/stream_type.h"
 
 namespace mojo {
 namespace media {
 
-class LpcmReformatter;
-
-typedef SharedPtr<LpcmReformatter, LpcmTransform> LpcmReformatterPtr;
-
 // A transform that reformats samples.
-// TODO(dalesat): Some variations on this could be InPlacePacketTransforms.
-class LpcmReformatter : public LpcmTransform {
+// TODO(dalesat): Some variations on this could be InPlaceTransforms.
+class LpcmReformatter : public Transform {
  public:
-  static LpcmReformatterPtr Create(
+  static std::shared_ptr<LpcmReformatter> Create(
       const LpcmStreamType& in_type,
       const LpcmStreamTypeSet& out_type);
 };
diff --git a/services/media/framework/parts/null_sink.cc b/services/media/framework/parts/null_sink.cc
index 9c9b29c..eb693a8 100644
--- a/services/media/framework/parts/null_sink.cc
+++ b/services/media/framework/parts/null_sink.cc
@@ -11,15 +11,11 @@
 
 NullSink::~NullSink() {}
 
-bool NullSink::must_allocate() const {
-  return false;
-}
-
-Allocator* NullSink::allocator() {
+PayloadAllocator* NullSink::allocator() {
   return nullptr;
 }
 
-void NullSink::SetDemandCallback(DemandCallback demand_callback) {
+void NullSink::SetDemandCallback(const DemandCallback& demand_callback) {
   demand_callback_ = demand_callback;
 }
 
diff --git a/services/media/framework/parts/null_sink.h b/services/media/framework/parts/null_sink.h
index 41846b5..1e75917 100644
--- a/services/media/framework/parts/null_sink.h
+++ b/services/media/framework/parts/null_sink.h
@@ -10,23 +10,19 @@
 namespace mojo {
 namespace media {
 
-class NullSink;
-
-typedef SharedPtr<NullSink, ActiveSink> NullSinkPtr;
-
 // Sink that throws packets away.
 class NullSink : public ActiveSink {
  public:
-  static NullSinkPtr Create() { return NullSinkPtr(new NullSink()); }
+  static std::shared_ptr<NullSink> Create() {
+      return std::shared_ptr<NullSink>(new NullSink());
+  }
 
   ~NullSink() override;
 
   // ActiveSink implementation.
-  bool must_allocate() const override;
+  PayloadAllocator* allocator() override;
 
-  Allocator* allocator() override;
-
-  void SetDemandCallback(DemandCallback demand_callback) override;
+  void SetDemandCallback(const DemandCallback& demand_callback) override;
 
   void Prime() override;
 
diff --git a/services/media/framework/parts/reader.cc b/services/media/framework/parts/reader.cc
index 31da7fc..d1d992e 100644
--- a/services/media/framework/parts/reader.cc
+++ b/services/media/framework/parts/reader.cc
@@ -10,12 +10,12 @@
 namespace mojo {
 namespace media {
 
-Result Reader::Create(const GURL& gurl, ReaderPtr* reader_out) {
+Result Reader::Create(const GURL& gurl, std::shared_ptr<Reader>* reader_out) {
   if (!gurl.is_valid()) {
     return Result::kInvalidArgument;
   }
 
-  ReaderPtr reader = nullptr;
+  std::shared_ptr<Reader> reader = nullptr;
   if (gurl.SchemeIsFile()) {
     reader = FileReader::Create();
   }
diff --git a/services/media/framework/parts/reader.h b/services/media/framework/parts/reader.h
index 4aaa982..dc07ad8 100644
--- a/services/media/framework/parts/reader.h
+++ b/services/media/framework/parts/reader.h
@@ -13,16 +13,12 @@
 namespace mojo {
 namespace media {
 
-class Reader;
-
-typedef std::shared_ptr<Reader> ReaderPtr;
-
 // Abstract base class for objects that read raw data on behalf of demuxes.
 // This model is synchronous, because that's how ffmpeg works.
 class Reader {
  public:
   // Creates a Reader object for a given url.
-  static Result Create(const GURL& gurl, ReaderPtr* reader_out);
+  static Result Create(const GURL& gurl, std::shared_ptr<Reader>* reader_out);
 
   virtual ~Reader() {}
 
diff --git a/services/media/framework/payload_allocator.cc b/services/media/framework/payload_allocator.cc
new file mode 100644
index 0000000..94938f7
--- /dev/null
+++ b/services/media/framework/payload_allocator.cc
@@ -0,0 +1,46 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdlib>
+
+#include "base/logging.h"
+#include "services/media/framework/payload_allocator.h"
+
+namespace mojo {
+namespace media {
+
+namespace {
+
+class DefaultAllocator : public PayloadAllocator {
+ public:
+  constexpr DefaultAllocator() {}
+
+  // PayloadAllocator implementation.
+  void* AllocatePayloadBuffer(size_t size) override;
+
+  void ReleasePayloadBuffer(size_t size, void* buffer) override;
+};
+
+void* DefaultAllocator::AllocatePayloadBuffer(size_t size) {
+  DCHECK(size > 0);
+  return std::malloc(static_cast<size_t>(size));
+}
+
+void DefaultAllocator::ReleasePayloadBuffer(size_t size, void* buffer) {
+  DCHECK(size > 0);
+  DCHECK(buffer);
+  std::free(buffer);
+}
+
+static constexpr DefaultAllocator default_allocator;
+
+} // namespace
+
+// static
+PayloadAllocator* PayloadAllocator::GetDefault() {
+  return const_cast<DefaultAllocator*>(&default_allocator);
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/payload_allocator.h b/services/media/framework/payload_allocator.h
new file mode 100644
index 0000000..c5ca645
--- /dev/null
+++ b/services/media/framework/payload_allocator.h
@@ -0,0 +1,30 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PAYLOAD_ALLOCATOR_H_
+#define SERVICES_MEDIA_FRAMEWORK_PAYLOAD_ALLOCATOR_H_
+
+#include <stddef.h>
+
+namespace mojo {
+namespace media {
+
+// Abstract base class for objects that allocate buffers for packets.
+class PayloadAllocator {
+ public:
+  // Gets the default allocator, which allocates vanilla memory from the heap.
+  static PayloadAllocator* GetDefault();
+
+  // Allocates and returns a buffer of the indicated size or returns nullptr
+  // if the allocation fails.
+  virtual void* AllocatePayloadBuffer(size_t size) = 0;
+
+  // Releases a buffer previously allocated via AllocatePayloadBuffer.
+  virtual void ReleasePayloadBuffer(size_t size, void* buffer) = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PAYLOAD_ALLOCATOR_H_
diff --git a/services/media/framework/ptr.h b/services/media/framework/ptr.h
deleted file mode 100644
index 824f1a7..0000000
--- a/services/media/framework/ptr.h
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_PTR_H_
-#define SERVICES_MEDIA_FRAMEWORK_PTR_H_
-
-#include <memory>
-
-namespace mojo {
-namespace media {
-
-// unique_ptr with Clone.
-// TODO(dalesat): Remove in favor of unique_ptr and a Clone template function.
-template<class T, class Deleter = std::default_delete<T>>
-class UniquePtr : public std::unique_ptr<T, Deleter> {
- public:
-  UniquePtr() : std::unique_ptr<T, Deleter>() {}
-
-  UniquePtr(std::nullptr_t) : std::unique_ptr<T, Deleter>() {}
-
-  explicit UniquePtr(T* ptr) : std::unique_ptr<T, Deleter>(ptr) {}
-
-  UniquePtr(UniquePtr&& other) :
-      std::unique_ptr<T, Deleter>(std::move(other)) {}
-
-  UniquePtr& operator=(std::nullptr_t) {
-    this->reset();
-    return *this;
-  }
-
-  UniquePtr& operator=(UniquePtr&& other) {
-    *static_cast<std::unique_ptr<T, Deleter>*>(this) = std::move(other);
-    return *this;
-  }
-
-  UniquePtr Clone() const { return *this ? this->get()->Clone() : UniquePtr(); }
-};
-
-// shared_ptr with upcast to TBase.
-// TODO(dalesat): Remove in favor of shared_ptr.
-template<class T, typename TBase>
-class SharedPtr : public std::shared_ptr<T> {
- public:
-  SharedPtr() : std::shared_ptr<T>() {}
-
-  SharedPtr(std::nullptr_t) : std::shared_ptr<T>() {}
-
-  explicit SharedPtr(T* ptr) : std::shared_ptr<T>(ptr) {}
-
-  SharedPtr& operator=(std::nullptr_t) {
-    this->reset();
-    return *this;
-  }
-
-  operator std::shared_ptr<TBase>() const {
-    return std::shared_ptr<TBase>(*this, this->get());
-  }
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif // SERVICES_MEDIA_FRAMEWORK_PTR_H_
diff --git a/services/media/framework/refs.cc b/services/media/framework/refs.cc
new file mode 100644
index 0000000..c2670a2
--- /dev/null
+++ b/services/media/framework/refs.cc
@@ -0,0 +1,97 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/refs.h"
+#include "services/media/framework/stages/input.h"
+#include "services/media/framework/stages/output.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+size_t PartRef::input_count() const {
+  DCHECK(valid());
+  return stage_->input_count();
+}
+
+InputRef PartRef::input(size_t index) const {
+  DCHECK(valid());
+  DCHECK(index < stage_->input_count());
+  return InputRef(stage_, index);
+}
+
+InputRef PartRef::input() const {
+  DCHECK(valid());
+  DCHECK(stage_->input_count() == 1);
+  return InputRef(stage_, 0);
+}
+
+size_t PartRef::output_count() const {
+  DCHECK(valid());
+  return stage_->output_count();
+}
+
+OutputRef PartRef::output(size_t index) const {
+  DCHECK(valid());
+  DCHECK(index < stage_->output_count());
+  return OutputRef(stage_, index);
+}
+
+OutputRef PartRef::output() const {
+  DCHECK(valid());
+  DCHECK(stage_->output_count() == 1);
+  return OutputRef(stage_, 0);
+}
+
+bool InputRef::connected() const {
+  DCHECK(valid());
+  return actual().connected();
+}
+
+const OutputRef& InputRef::mate() const {
+  DCHECK(valid());
+  return actual().mate();
+}
+
+InputRef::InputRef(Stage* stage, size_t index) :
+    stage_(stage), index_(index) {
+  DCHECK(valid());
+}
+
+Input& InputRef::actual() const {
+  DCHECK(valid());
+  return stage_->input(index_);
+}
+
+bool InputRef::valid() const {
+  return stage_ != nullptr && index_ < stage_->input_count();
+}
+
+bool OutputRef::connected() const {
+  DCHECK(valid());
+  return actual().connected();
+}
+
+const InputRef& OutputRef::mate() const {
+  DCHECK(valid());
+  return actual().mate();
+}
+
+OutputRef::OutputRef(Stage* stage, size_t index) :
+    stage_(stage), index_(index) {
+  DCHECK(valid());
+}
+
+Output& OutputRef::actual() const {
+  DCHECK(valid());
+  return stage_->output(index_);
+}
+
+bool OutputRef::valid() const {
+  return stage_ != nullptr && index_ < stage_->output_count();
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/refs.h b/services/media/framework/refs.h
new file mode 100644
index 0000000..8ff1164
--- /dev/null
+++ b/services/media/framework/refs.h
@@ -0,0 +1,162 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_REFS_H_
+#define SERVICES_MEDIA_FRAMEWORK_REFS_H_
+
+#include <stdint.h>
+
+namespace mojo {
+namespace media {
+
+class Graph;
+class Stage;
+class Input;
+class Output;
+class Engine;
+class InputRef;
+class OutputRef;
+
+// Opaque Stage pointer used for graph building.
+class PartRef {
+ public:
+  PartRef() : stage_(nullptr) {}
+
+  PartRef& operator=(std::nullptr_t) {
+    stage_ = nullptr;
+    return *this;
+  }
+
+  // Returns the number of inputs the part has.
+  size_t input_count() const;
+
+  // Returns a reference to the specified input.
+  InputRef input(size_t index) const;
+
+  // Returns a reference to the only input. input_count must return 1 for this
+  // call to be valid.
+  InputRef input() const;
+
+  // Returns the number of outputs the part has.
+  size_t output_count() const;
+
+  // Returns a reference to the specified output.
+  OutputRef output(size_t index) const;
+
+  // Returns a reference to the only output. output_count must return 1 for this
+  // call to be valid.
+  OutputRef output() const;
+
+  // Returns true if the reference refers to a part, false if it's null.
+  explicit operator bool() const { return stage_ != nullptr; }
+
+ private:
+  explicit PartRef(Stage* stage) : stage_(stage) {}
+
+  // Determines if the reference is non-null and otherwise valid. Useful for
+  // DCHECKs.
+  bool valid() const { return stage_ != nullptr; }
+
+  Stage* stage_;
+
+  friend Graph;
+  friend InputRef;
+  friend OutputRef;
+  friend Engine;
+};
+
+// Opaque Input pointer used for graph building.
+class InputRef {
+ public:
+  InputRef() : stage_(nullptr), index_(0) {}
+
+  InputRef& operator=(std::nullptr_t) {
+    stage_ = nullptr;
+    index_ = 0;
+    return *this;
+  }
+
+  // Returns true if the reference refers to an input, false if it's null.
+  explicit operator bool() const { return stage_ != nullptr; }
+
+  // Returns a reference to the part that owns this input. Returns a null
+  // reference if this reference is null.
+  PartRef part() const { return PartRef(stage_); }
+
+  // Indicates whether this input is connected to an output.
+  bool connected() const;
+
+  // Returns a reference to the output to which this input is connected. Returns
+  // an invalid reference if this input isn't connected to an output.
+  const OutputRef& mate() const;
+
+ private:
+  InputRef(Stage* stage, size_t index);
+
+  // Returns the actual input referenced by this object.
+  Input& actual() const;
+
+  // Determines if the reference is non-null and otherwise valid. Useful for
+  // DCHECKs.
+  bool valid() const;
+
+  Stage* stage_;
+  size_t index_;
+
+  friend Graph;
+  friend PartRef;
+  friend OutputRef;
+  friend Output;
+  friend Engine;
+};
+
+// Opaque Output pointer used for graph building.
+class OutputRef {
+ public:
+  OutputRef() : stage_(nullptr), index_(0) {}
+
+  OutputRef& operator=(std::nullptr_t) {
+    stage_ = nullptr;
+    index_ = 0;
+    return *this;
+  }
+
+  // Returns true if the reference refers to an output, false if it's null.
+  explicit operator bool() const { return stage_ != nullptr; }
+
+  // Returns a reference to the part that owns this output. Returns a null
+  // reference if this reference is null.
+  PartRef part() const { return PartRef(stage_); }
+
+  // Indicates whether this output is connected to an input.
+  bool connected() const;
+
+  // Returns a reference to the input to which this output is connected. Returns
+  // an invalid reference if this output isn't connected to an input.
+  const InputRef& mate() const;
+
+ private:
+  OutputRef(Stage* stage, size_t index);
+
+  // Returns the actual input referenced by this object.
+  Output& actual() const;
+
+  // Determines if the reference is non-null and otherwise valid. Useful for
+  // DCHECKs.
+  bool valid() const;
+
+  Stage* stage_;
+  size_t index_;
+
+  friend Graph;
+  friend PartRef;
+  friend InputRef;
+  friend Input;
+  friend Engine;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_REFS_H_
diff --git a/services/media/framework/safe_clone.h b/services/media/framework/safe_clone.h
new file mode 100644
index 0000000..1e4546f
--- /dev/null
+++ b/services/media/framework/safe_clone.h
@@ -0,0 +1,40 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_SAFE_CLONE_H_
+#define SERVICES_MEDIA_FRAMEWORK_SAFE_CLONE_H_
+
+#include <memory>
+#include <vector>
+
+namespace mojo {
+namespace media {
+
+template<typename T>
+std::unique_ptr<T> SafeClone(const std::unique_ptr<T>& t_ptr) {
+  return t_ptr ? t_ptr.get()->Clone() : nullptr;
+}
+
+template<typename T>
+std::unique_ptr<std::vector<std::unique_ptr<T>>> SafeClone(
+    const std::unique_ptr<std::vector<std::unique_ptr<T>>>& vec) {
+  if (vec == nullptr) {
+    return nullptr;
+  }
+
+  std::unique_ptr<std::vector<std::unique_ptr<T>>> result =
+      std::unique_ptr<std::vector<std::unique_ptr<T>>>(
+          new std::vector<std::unique_ptr<T>>(vec->size()));
+
+  for (const std::unique_ptr<T>& t_ptr : *vec.get()) {
+    result->push_back(SafeClone(t_ptr));
+  }
+
+  return result;
+}
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_SAFE_CLONE_H_
diff --git a/services/media/framework/stages/active_multistream_sink_stage.cc b/services/media/framework/stages/active_multistream_sink_stage.cc
new file mode 100644
index 0000000..cb3c4ad
--- /dev/null
+++ b/services/media/framework/stages/active_multistream_sink_stage.cc
@@ -0,0 +1,165 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/active_multistream_sink_stage.h"
+
+namespace mojo {
+namespace media {
+
+ActiveMultistreamSinkStage::ActiveMultistreamSinkStage(
+      std::shared_ptr<ActiveMultistreamSink> sink) : sink_(sink) {
+  DCHECK(sink_);
+  sink_->SetHost(this);
+  // Add one unallocated input so this stage isn't misidentified as a source.
+  ReleaseInput(AllocateInput());
+}
+
+ActiveMultistreamSinkStage::~ActiveMultistreamSinkStage() {
+  base::AutoLock lock(lock_);
+}
+
+size_t ActiveMultistreamSinkStage::input_count() const {
+  base::AutoLock lock(lock_);
+  return inputs_.size();
+};
+
+Input& ActiveMultistreamSinkStage::input(size_t index) {
+  base::AutoLock lock(lock_);
+  DCHECK_LT(index, inputs_.size());
+  return inputs_[index]->input_;
+}
+
+size_t ActiveMultistreamSinkStage::output_count() const {
+  return 0;
+}
+
+Output& ActiveMultistreamSinkStage::output(size_t index) {
+  CHECK(false) << "output requested from sink";
+  return *(static_cast<Output*>(nullptr));
+}
+
+PayloadAllocator* ActiveMultistreamSinkStage::PrepareInput(size_t index) {
+  return nullptr;
+}
+
+void ActiveMultistreamSinkStage::PrepareOutput(
+    size_t index,
+    PayloadAllocator* allocator,
+    const UpstreamCallback& callback) {
+  CHECK(false) << "PrepareOutput called on sink";
+}
+
+void ActiveMultistreamSinkStage::Prime() {
+  DCHECK(sink_);
+  sink_->Prime();
+}
+
+void ActiveMultistreamSinkStage::Update(Engine* engine) {
+  DCHECK(engine);
+  DCHECK(sink_);
+
+  base::AutoLock lock(lock_);
+
+  for (auto iter = pending_inputs_.begin(); iter != pending_inputs_.end(); ) {
+    DCHECK(*iter < inputs_.size());
+    StageInput* input = inputs_[*iter].get();
+    if (input->input_.packet_from_upstream()) {
+      input->demand_ = sink_->SupplyPacket(
+          input->index_,
+          std::move(input->input_.packet_from_upstream()));
+
+      if (input->demand_ == Demand::kNegative) {
+        auto remove_iter = iter;
+        ++iter;
+        pending_inputs_.erase(remove_iter);
+      }
+    } else {
+      ++iter;
+    }
+
+    input->input_.SetDemand(input->demand_, engine);
+  }
+}
+
+void ActiveMultistreamSinkStage::FlushInput(
+    size_t index,
+    const DownstreamCallback& callback) {
+  DCHECK(sink_);
+
+  sink_->Flush();
+
+  base::AutoLock lock(lock_);
+  inputs_[index]->demand_ = Demand::kNegative;
+  inputs_[index]->input_.Flush();
+
+  pending_inputs_.remove(index);
+}
+
+void ActiveMultistreamSinkStage::FlushOutput(size_t index) {
+  CHECK(false) << "FlushOutput called on sink";
+}
+
+size_t ActiveMultistreamSinkStage::AllocateInput() {
+  base::AutoLock lock(lock_);
+
+  StageInput* input;
+  if (unallocated_inputs_.empty()) {
+    input = new StageInput(inputs_.size());
+    inputs_.emplace_back(std::unique_ptr<StageInput>(input));
+  } else {
+    // Allocate lowest indices first.
+    auto iter = unallocated_inputs_.lower_bound(0);
+    input = inputs_[*iter].get();
+    DCHECK(!input->allocated_);
+    unallocated_inputs_.erase(iter);
+  }
+
+  input->allocated_ = true;
+
+  return input->index_;
+}
+
+size_t ActiveMultistreamSinkStage::ReleaseInput(size_t index) {
+  base::AutoLock lock(lock_);
+  DCHECK(index < inputs_.size());
+
+  StageInput* input = inputs_[index].get();
+  DCHECK(input);
+  DCHECK(input->allocated_);
+  DCHECK(!input->input_.connected());
+
+  input->allocated_ = false;
+
+  // Pop input if it's at the end of inputs_. Otherwise, add it to
+  // unallocated_inputs_. We never pop the last input so the stage can't be
+  // misidentified as a source.
+  if (index != 0 && index == inputs_.size() - 1) {
+    while (inputs_.size() > 1 && !inputs_.back()->allocated_) {
+      unallocated_inputs_.erase(inputs_.size() - 1);
+      inputs_.pop_back();
+    }
+  } else {
+    unallocated_inputs_.insert(input->index_);
+  }
+
+  return inputs_.size();
+}
+
+void ActiveMultistreamSinkStage::UpdateDemand(
+    size_t input_index,
+    Demand demand) {
+  lock_.Acquire();
+  DCHECK(input_index < inputs_.size());
+  DCHECK(demand != Demand::kNegative);
+
+  StageInput* input = inputs_[input_index].get();
+  DCHECK(input);
+  input->demand_ = demand;
+  pending_inputs_.push_back(input_index);
+  lock_.Release();
+  RequestUpdate();
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/active_multistream_sink_stage.h b/services/media/framework/stages/active_multistream_sink_stage.h
new file mode 100644
index 0000000..53cb6cd
--- /dev/null
+++ b/services/media/framework/stages/active_multistream_sink_stage.h
@@ -0,0 +1,84 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_MULTISTREAM_SINK_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_MULTISTREAM_SINK_STAGE_H_
+
+#include <list>
+#include <set>
+#include <vector>
+
+#include "base/synchronization/lock.h"
+#include "services/media/framework/models/active_multistream_sink.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts an ActiveSink.
+class ActiveMultistreamSinkStage :
+    public Stage,
+    public ActiveMultistreamSinkHost {
+ public:
+  ActiveMultistreamSinkStage(std::shared_ptr<ActiveMultistreamSink> sink);
+
+  ~ActiveMultistreamSinkStage() override;
+
+  // Stage implementation.
+  size_t input_count() const override;
+
+  Input& input(size_t index) override;
+
+  size_t output_count() const override;
+
+  Output& output(size_t index) override;
+
+  PayloadAllocator* PrepareInput(size_t index) override;
+
+  void PrepareOutput(
+      size_t index,
+      PayloadAllocator* allocator,
+      const UpstreamCallback& callback) override;
+
+  void Prime() override;
+
+  void Update(Engine* engine) override;
+
+  void FlushInput(
+      size_t index,
+      const DownstreamCallback& callback) override;
+
+  void FlushOutput(size_t index) override;
+
+  // ActiveMultistreamSinkHost implementation.
+  size_t AllocateInput() override;
+
+  size_t ReleaseInput(size_t index) override;
+
+  void UpdateDemand(size_t input_index, Demand demand) override;
+
+ private:
+  struct StageInput {
+    StageInput(size_t index) :
+        index_(index),
+        allocated_(false),
+        demand_(Demand::kNegative) {}
+    Input input_;
+    size_t index_;
+    bool allocated_;
+    Demand demand_;
+  };
+
+  std::shared_ptr<ActiveMultistreamSink> sink_;
+
+  mutable base::Lock lock_;
+  std::vector<std::unique_ptr<StageInput>> inputs_;
+  std::set<size_t> unallocated_inputs_;
+  std::list<size_t> pending_inputs_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_MULTISTREAM_SINK_STAGE_H_
diff --git a/services/media/framework/stages/active_sink_stage.cc b/services/media/framework/stages/active_sink_stage.cc
index c49b55e..9aa6182 100644
--- a/services/media/framework/stages/active_sink_stage.cc
+++ b/services/media/framework/stages/active_sink_stage.cc
@@ -7,14 +7,14 @@
 namespace mojo {
 namespace media {
 
-ActiveSinkStage::ActiveSinkStage(ActiveSinkPtr sink) : sink_(sink) {
+ActiveSinkStage::ActiveSinkStage(std::shared_ptr<ActiveSink> sink) :
+    sink_(sink) {
   DCHECK(sink_);
 
   demand_function_ = [this](Demand demand) {
-    DCHECK(update_callback_);
     if (sink_demand_ != demand) {
       sink_demand_ = demand;
-      update_callback_(this);
+      RequestUpdate();
     }
   };
 
@@ -23,37 +23,44 @@
 
 ActiveSinkStage::~ActiveSinkStage() {}
 
-uint32_t ActiveSinkStage::input_count() const {
+size_t ActiveSinkStage::input_count() const {
   return 1;
 };
 
-StageInput& ActiveSinkStage::input(uint32_t index) {
+Input& ActiveSinkStage::input(size_t index) {
   DCHECK_EQ(index, 0u);
   return input_;
 }
 
-uint32_t ActiveSinkStage::output_count() const {
+size_t ActiveSinkStage::output_count() const {
   return 0;
 }
 
-StageOutput& ActiveSinkStage::output(uint32_t index) {
-  NOTREACHED();
-  static StageOutput result;
-  return result;
+Output& ActiveSinkStage::output(size_t index) {
+  CHECK(false) << "output requested from sink";
+  return *(static_cast<Output*>(nullptr));
 }
 
-bool ActiveSinkStage::Prepare(UpdateCallback update_callback) {
-  input_.Prepare(sink_->allocator(), sink_->must_allocate());
-  update_callback_ = update_callback;
-  return true;
+PayloadAllocator* ActiveSinkStage::PrepareInput(size_t index) {
+  DCHECK_EQ(index, 0u);
+  return sink_->allocator();
+}
+
+void ActiveSinkStage::PrepareOutput(
+    size_t index,
+    PayloadAllocator* allocator,
+    const UpstreamCallback& callback) {
+  CHECK(false) << "PrepareOutput called on sink";
 }
 
 void ActiveSinkStage::Prime() {
+  DCHECK(sink_);
   sink_->Prime();
 }
 
 void ActiveSinkStage::Update(Engine* engine) {
   DCHECK(engine);
+  DCHECK(sink_);
 
   if (input_.packet_from_upstream()) {
     sink_demand_ =
@@ -63,5 +70,18 @@
   input_.SetDemand(sink_demand_, engine);
 }
 
+void ActiveSinkStage::FlushInput(
+    size_t index,
+    const DownstreamCallback& callback) {
+  DCHECK(sink_);
+  input_.Flush();
+  sink_->Flush();
+  sink_demand_ = Demand::kNegative;
+}
+
+void ActiveSinkStage::FlushOutput(size_t index) {
+  CHECK(false) << "FlushOutput called on sink";
+}
+
 }  // namespace media
 }  // namespace mojo
diff --git a/services/media/framework/stages/active_sink_stage.h b/services/media/framework/stages/active_sink_stage.h
index 039698a..e9bc19f 100644
--- a/services/media/framework/stages/active_sink_stage.h
+++ b/services/media/framework/stages/active_sink_stage.h
@@ -2,8 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SINK_STAGE_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SINK_STAGE_H_
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_SINK_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_SINK_STAGE_H_
 
 #include <deque>
 
@@ -16,34 +16,44 @@
 // A stage that hosts an ActiveSink.
 class ActiveSinkStage : public Stage {
  public:
-  ActiveSinkStage(ActiveSinkPtr source);
+  ActiveSinkStage(std::shared_ptr<ActiveSink> sink);
 
   ~ActiveSinkStage() override;
 
   // Stage implementation.
-  uint32_t input_count() const override;
+  size_t input_count() const override;
 
-  StageInput& input(uint32_t index) override;
+  Input& input(size_t index) override;
 
-  uint32_t output_count() const override;
+  size_t output_count() const override;
 
-  StageOutput& output(uint32_t index) override;
+  Output& output(size_t index) override;
 
-  bool Prepare(UpdateCallback update_callback) override;
+  PayloadAllocator* PrepareInput(size_t index) override;
+
+  void PrepareOutput(
+      size_t index,
+      PayloadAllocator* allocator,
+      const UpstreamCallback& callback) override;
 
   void Prime() override;
 
   void Update(Engine* engine) override;
 
+  void FlushInput(
+      size_t index,
+      const DownstreamCallback& callback) override;
+
+  void FlushOutput(size_t index) override;
+
  private:
-  StageInput input_;
-  ActiveSinkPtr sink_;
+  Input input_;
+  std::shared_ptr<ActiveSink> sink_;
   ActiveSink::DemandCallback demand_function_;
-  Stage::UpdateCallback update_callback_;
   Demand sink_demand_;
 };
 
 }  // namespace media
 }  // namespace mojo
 
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SINK_STAGE_H_
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_SINK_STAGE_H_
diff --git a/services/media/framework/stages/active_source_stage.cc b/services/media/framework/stages/active_source_stage.cc
index 6dd2fc9..190b42c 100644
--- a/services/media/framework/stages/active_source_stage.cc
+++ b/services/media/framework/stages/active_source_stage.cc
@@ -7,14 +7,16 @@
 namespace mojo {
 namespace media {
 
-ActiveSourceStage::ActiveSourceStage(ActiveSourcePtr source) : source_(source) {
+ActiveSourceStage::ActiveSourceStage(std::shared_ptr<ActiveSource> source) :
+    source_(source),
+    prepared_(false) {
   DCHECK(source_);
 
   supply_function_ = [this](PacketPtr packet) {
     bool packets_was_empty_ = packets_.empty();
     packets_.push_back(std::move(packet));
-    if (packets_was_empty_ && update_callback_) {
-      update_callback_(this);
+    if (packets_was_empty_ && prepared_) {
+      RequestUpdate();
     }
   };
 
@@ -23,33 +25,58 @@
 
 ActiveSourceStage::~ActiveSourceStage() {}
 
-uint32_t ActiveSourceStage::input_count() const {
+size_t ActiveSourceStage::input_count() const {
   return 0;
 };
 
-StageInput& ActiveSourceStage::input(uint32_t index) {
-  NOTREACHED();
-  static StageInput result;
-  return result;
+Input& ActiveSourceStage::input(size_t index) {
+  CHECK(false) << "input requested from source";
+  return *(static_cast<Input*>(nullptr));
 }
 
-uint32_t ActiveSourceStage::output_count() const {
+size_t ActiveSourceStage::output_count() const {
   return 1;
 }
 
-StageOutput& ActiveSourceStage::output(uint32_t index) {
+Output& ActiveSourceStage::output(size_t index) {
   DCHECK_EQ(index, 0u);
   return output_;
 }
 
-bool ActiveSourceStage::Prepare(UpdateCallback update_callback) {
-  update_callback_ = update_callback;
-  Allocator* allocator = output_.Prepare(source_->can_accept_allocator());
-  if (allocator) {
-    DCHECK(source_->can_accept_allocator());
-    source_->set_allocator(allocator);
+PayloadAllocator* ActiveSourceStage::PrepareInput(size_t index) {
+  CHECK(false) << "PrepareInput called on source";
+  return nullptr;
+}
+
+void ActiveSourceStage::PrepareOutput(
+    size_t index,
+    PayloadAllocator* allocator,
+    const UpstreamCallback& callback) {
+  DCHECK_EQ(index, 0u);
+  DCHECK(source_);
+
+  if (source_->can_accept_allocator()) {
+    // Give the source the provided allocator or the default if non was
+    // provided.
+    source_->set_allocator(
+        allocator == nullptr ? PayloadAllocator::GetDefault() : allocator);
+  } else if (allocator){
+    // The source can't use the provided allocator, so the output must copy
+    // packets.
+    output_.SetCopyAllocator(allocator);
   }
-  return true;
+
+  prepared_ = true;
+}
+
+void ActiveSourceStage::UnprepareOutput(
+    size_t index,
+    const UpstreamCallback& callback) {
+  DCHECK_EQ(index, 0u);
+  DCHECK(source_);
+
+  source_->set_allocator(nullptr);
+  output_.SetCopyAllocator(nullptr);
 }
 
 void ActiveSourceStage::Update(Engine* engine) {
@@ -65,5 +92,18 @@
   }
 }
 
+void ActiveSourceStage::FlushInput(
+    size_t index,
+    const DownstreamCallback& callback) {
+  CHECK(false) << "FlushInput called on source";
+}
+
+void ActiveSourceStage::FlushOutput(size_t index) {
+  DCHECK(source_);
+  output_.Flush();
+  source_->Flush();
+  packets_.clear();
+}
+
 }  // namespace media
 }  // namespace mojo
diff --git a/services/media/framework/stages/active_source_stage.h b/services/media/framework/stages/active_source_stage.h
index c89ef25..1714be1 100644
--- a/services/media/framework/stages/active_source_stage.h
+++ b/services/media/framework/stages/active_source_stage.h
@@ -2,8 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SOURCE_STAGE_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SOURCE_STAGE_H_
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_SOURCE_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_SOURCE_STAGE_H_
 
 #include <deque>
 
@@ -16,32 +16,47 @@
 // A stage that hosts an ActiveSource.
 class ActiveSourceStage : public Stage {
  public:
-  ActiveSourceStage(ActiveSourcePtr source);
+  ActiveSourceStage(std::shared_ptr<ActiveSource> source);
 
   ~ActiveSourceStage() override;
 
   // Stage implementation.
-  uint32_t input_count() const override;
+  size_t input_count() const override;
 
-  StageInput& input(uint32_t index) override;
+  Input& input(size_t index) override;
 
-  uint32_t output_count() const override;
+  size_t output_count() const override;
 
-  StageOutput& output(uint32_t index) override;
+  Output& output(size_t index) override;
 
-  bool Prepare(UpdateCallback update_callback) override;
+  PayloadAllocator* PrepareInput(size_t index) override;
+
+  void PrepareOutput(
+      size_t index,
+      PayloadAllocator* allocator,
+      const UpstreamCallback& callback) override;
+
+  void UnprepareOutput(
+      size_t index,
+      const UpstreamCallback& callback) override;
 
   void Update(Engine* engine) override;
 
+  void FlushInput(
+      size_t index,
+      const DownstreamCallback& callback) override;
+
+  void FlushOutput(size_t index) override;
+
  private:
-  StageOutput output_;
-  ActiveSourcePtr source_;
+  Output output_;
+  std::shared_ptr<ActiveSource> source_;
+  bool prepared_;
   ActiveSource::SupplyCallback supply_function_;
-  Stage::UpdateCallback update_callback_;
   std::deque<PacketPtr> packets_;
 };
 
 }  // namespace media
 }  // namespace mojo
 
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SOURCE_STAGE_H_
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_SOURCE_STAGE_H_
diff --git a/services/media/framework/stages/distributor_stage.cc b/services/media/framework/stages/distributor_stage.cc
deleted file mode 100644
index 749de98..0000000
--- a/services/media/framework/stages/distributor_stage.cc
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/stages/distributor_stage.h"
-
-namespace mojo {
-namespace media {
-
-DistributorStage::DistributorStage(
-    MultiStreamPacketSourcePtr packet_source) :
-    packet_source_(packet_source),
-    ended_streams_(0) {
-  DCHECK(packet_source);
-  outputs_.resize(packet_source->stream_count());
-}
-
-DistributorStage::~DistributorStage() {}
-
-uint32_t DistributorStage::input_count() const {
-  return 0;
-};
-
-StageInput& DistributorStage::input(uint32_t index) {
-  NOTREACHED();
-  static StageInput result;
-  return result;
-}
-
-uint32_t DistributorStage::output_count() const {
-  return outputs_.size();
-}
-
-StageOutput& DistributorStage::output(uint32_t index) {
-  DCHECK(index < outputs_.size());
-  return outputs_[index];
-}
-
-bool DistributorStage::Prepare(UpdateCallback update_callback) {
-  for (StageOutput& output : outputs_) {
-    output.Prepare(false);
-  }
-  return false;
-}
-
-void DistributorStage::Update(Engine* engine) {
-  DCHECK(engine);
-
-  bool has_positive_demand = false;
-  for (StageOutput& output : outputs_) {
-    if (output.demand() == Demand::kPositive) {
-      has_positive_demand = true;
-      break;
-    }
-  }
-
-  while (true) {
-    if (cached_packet_ && has_positive_demand) {
-      DCHECK(cached_packet_output_index_ < outputs_.size());
-      StageOutput& output = outputs_[cached_packet_output_index_];
-
-      if (output.demand() != Demand::kNegative) {
-        // cached_packet_ is intended for an output which will accept packets.
-        output.SupplyPacket(std::move(cached_packet_), engine);
-      } else {
-      }
-    }
-
-    if (cached_packet_) {
-      // There's still a cached packet. We're done for now.
-      return;
-    }
-
-    if (ended_streams_ == outputs_.size()) {
-      // We've seen end-of-stream for all streams. All done.
-      return;
-    }
-
-    // Pull a packet from the source.
-    cached_packet_ = packet_source_->PullPacket(&cached_packet_output_index_);
-    DCHECK(cached_packet_);
-    DCHECK(cached_packet_output_index_ < outputs_.size());
-
-    if (cached_packet_->end_of_stream()) {
-      ended_streams_++;
-    }
-  }
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/distributor_stage.h b/services/media/framework/stages/distributor_stage.h
deleted file mode 100644
index c608ffe..0000000
--- a/services/media/framework/stages/distributor_stage.h
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_DISTRIBUTOR_STAGE_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_DISTRIBUTOR_STAGE_H_
-
-#include <vector>
-
-#include "services/media/framework/models/multistream_packet_source.h"
-#include "services/media/framework/stages/stage.h"
-
-namespace mojo {
-namespace media {
-
-// A stage that hosts a MultiStreamPacketSource.
-class DistributorStage : public Stage {
- public:
-  DistributorStage(MultiStreamPacketSourcePtr packet_source);
-
-  ~DistributorStage() override;
-
-  // Stage implementation.
-  uint32_t input_count() const override;
-
-  StageInput& input(uint32_t index) override;
-
-  uint32_t output_count() const override;
-
-  StageOutput& output(uint32_t index) override;
-
-  bool Prepare(UpdateCallback update_callback) override;
-
-  void Update(Engine* engine) override;
-
- private:
-  std::vector<StageOutput> outputs_;
-  MultiStreamPacketSourcePtr packet_source_;
-  PacketPtr cached_packet_;
-  uint32_t cached_packet_output_index_;
-  uint32_t ended_streams_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_DISTRIBUTOR_STAGE_H_
diff --git a/services/media/framework/stages/input.cc b/services/media/framework/stages/input.cc
new file mode 100644
index 0000000..b8d466a
--- /dev/null
+++ b/services/media/framework/stages/input.cc
@@ -0,0 +1,48 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/stages/input.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+Input::Input() : prepared_(false) {}
+
+Input::~Input() {}
+
+void Input::Connect(const OutputRef& output) {
+  DCHECK(output.valid());
+  DCHECK(!mate_);
+  mate_ = output;
+}
+
+Output& Input::actual_mate() const {
+  DCHECK(mate_.valid());
+  return mate_.actual();
+}
+
+void Input::SetDemand(Demand demand, Engine* engine) const {
+  DCHECK(engine);
+  DCHECK(connected());
+
+  if (actual_mate().UpdateDemandFromInput(demand)) {
+    engine->PushToDemandBacklog(mate().stage_);
+  }
+}
+
+bool Input::SupplyPacketFromOutput(PacketPtr packet) {
+  DCHECK(packet);
+  DCHECK(!packet_from_upstream_);
+  packet_from_upstream_ = std::move(packet);
+  return true;
+}
+
+void Input::Flush() {
+  packet_from_upstream_.reset(nullptr);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/input.h b/services/media/framework/stages/input.h
new file mode 100644
index 0000000..2693a0e
--- /dev/null
+++ b/services/media/framework/stages/input.h
@@ -0,0 +1,73 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_INPUT_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_INPUT_H_
+
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/packet.h"
+#include "services/media/framework/refs.h"
+
+namespace mojo {
+namespace media {
+
+class Stage;
+class Engine;
+class Output;
+
+// Represents a stage's connector to an adjacent upstream stage.
+class Input {
+ public:
+  Input();
+
+  ~Input();
+
+  // The output to which this input is connected.
+  const OutputRef& mate() const { return mate_; }
+
+  // Establishes a connection.
+  void Connect(const OutputRef& output);
+
+  // Breaks a connection. Called only by the engine.
+  void Disconnect() {
+    DCHECK(!prepared_);
+    mate_ = nullptr;
+  }
+
+  // Determines whether the input is connected to an output.
+  bool connected() const { return static_cast<bool>(mate_); }
+
+  // The connected output.
+  Output& actual_mate() const;
+
+  // Determines if the input is prepared.
+  bool prepared() { return prepared_; }
+
+  // Changes the prepared state of the input.
+  void set_prepared(bool prepared) { prepared_ = prepared; }
+
+  // A packet supplied from upstream.
+  PacketPtr& packet_from_upstream() { return packet_from_upstream_; }
+
+  // Updates mate's demand. Called only by Stage::Update implementations.
+  void SetDemand(Demand demand, Engine* engine) const;
+
+  // Updates packet_from_upstream. Return value indicates whether the stage for
+  // this input should be added to the supply backlog. Called only by
+  // Output instances.
+  bool SupplyPacketFromOutput(PacketPtr packet);
+
+  // Flushes retained media.
+  void Flush();
+
+ private:
+  OutputRef mate_;
+  bool prepared_;
+  PacketPtr packet_from_upstream_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_INPUT_H_
diff --git a/services/media/framework/stages/lpcm_stage_input.cc b/services/media/framework/stages/lpcm_stage_input.cc
deleted file mode 100644
index b215334..0000000
--- a/services/media/framework/stages/lpcm_stage_input.cc
+++ /dev/null
@@ -1,211 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/engine.h"
-#include "services/media/framework/stages/lpcm_stage_input.h"
-#include "services/media/framework/stages/lpcm_stage_output.h"
-#include "services/media/framework/stages/stage.h"
-
-namespace mojo {
-namespace media {
-
-LpcmStageInput::LpcmStageInput() :
-    demand_pending_(false),
-    buffer_(nullptr),
-    frame_count_(0),
-    mix_(false),
-    synchronous_(false),
-    end_of_stream_(false) {
-  packet_exhausted_function_ = [this]() {
-    DCHECK(packet_from_upstream());
-    packet_from_upstream().reset();
-  };
-}
-
-LpcmStageInput::~LpcmStageInput() {}
-
-void LpcmStageInput::set_stream_type(const LpcmStreamType& stream_type) {
-  lpcm_util_ = LpcmUtil::Create(stream_type);
-  lpcm_supply_.set_bytes_per_frame(stream_type.bytes_per_frame());
-  packet_frames_.set_bytes_per_frame(stream_type.bytes_per_frame());
-  demand_frames_.set_bytes_per_frame(stream_type.bytes_per_frame());
-}
-
-void LpcmStageInput::SuggestDemand(uint64_t frame_count, Engine* engine) {
-  DCHECK(engine);
-
-  if (demand_pending_ || lpcm_supply_.frame_count() != 0) {
-    // We've already demanded. Too late for this suggestion.
-    return;
-  }
-
-  if (frame_count == 0) {
-    // No demand is suggested.
-    SetDemand(Demand::kNegative, engine);
-    return;
-  }
-
-  if (!connected_to_lpcm()) {
-    // Upstream output isn't LPCM. Demand packets.
-    demand_pending_ = true;
-    SetDemand(Demand::kPositive, engine);
-    return;
-  }
-
-  SetLpcmDemand(
-      GetDemandBuffer(frame_count),
-      frame_count,
-      false,
-      false,
-      engine);
-}
-
-void LpcmStageInput::SetLpcmDemand(
-    void* buffer,
-    uint64_t frame_count,
-    bool mix,
-    bool synchronous,
-    Engine* engine) {
-  DCHECK(engine);
-  DCHECK(connected());
-
-  demand_pending_ = true;
-
-  buffer_ = buffer;
-  frame_count_ = frame_count;
-  mix_ = mix;
-  synchronous_ = synchronous;
-
-  if (synchronous) {
-    // TODO
-    //engine->AddToFallback([lpcm_mate](Engine* engine) {
-    //  lpcm_mate->fallback(engine);
-    //});
-  }
-
-  LpcmStageOutput* lpcm_mate = mate().get_lpcm();
-  if (lpcm_mate != nullptr) {
-    // The upstream output is LPCM.
-    DCHECK(!packet_from_upstream());
-    lpcm_mate->UpdateLpcmDemand(buffer, frame_count, mix, synchronous);
-    engine->PushToDemandBacklogUnsafe(upstream_stage());
-    return;
-  }
-
-  demand_frames_.Set(buffer, frame_count);
-
-  // The upstream output isn't LPCM. See if we can satisfy demand with the
-  // packet from upstream, if there is one.
-  if (CopyOrMixFrames()) {
-    // Frames supplied. Add this stage to the supply backlog.
-    engine->PushToSupplyBacklogUnsafe(mate().downstream_stage());
-  } else {
-    // Frames not supplied. Demand another packet.
-    SetDemand(Demand::kPositive, engine);
-  }
-}
-
-bool LpcmStageInput::SupplyPacketFromOutput(PacketPtr packet) {
-  StageInput::SupplyPacketFromOutput(std::move(packet));
-
-  demand_pending_ = false;
-
-  if (connected_to_lpcm()) {
-    // The upstream output is LPCM, so the packet should be a wrapper for the
-    // demand buffer (buffer_). In this case, we can release the packet,
-    // because the frames are already where we want them.
-    DCHECK(packet_from_upstream()->payload() == buffer_);
-    DCHECK(packet_from_upstream()->duration() <= frame_count_);
-    end_of_stream_ = packet_from_upstream()->end_of_stream();
-    lpcm_supply_.Set(
-        packet_from_upstream()->payload(),
-        packet_from_upstream()->duration());
-    packet_from_upstream().reset();
-    return true;
-  }
-
-  if (buffer_ == nullptr) {
-    // The upstream output isn't LPCM, and the stage hasn't supplied a buffer.
-    // We'll supply frames right out of the packet payload and reset the
-    // packet pointer when the frames are exhausted.
-    DCHECK(!mix_);
-    end_of_stream_ = packet_from_upstream()->end_of_stream();
-    lpcm_supply_.Set(
-        packet_from_upstream()->payload(),
-        packet_from_upstream()->duration(),
-        packet_exhausted_function_);
-    return true;
-  }
-
-  // The upstream output isn't LPCM, and the stage has supplied a buffer. That
-  // means we have to copy or mix from the packet to the supplied buffer. We
-  // initialize packet_frames_ for that purpose and call CopyOrMixFrames.
-  packet_frames_.Set(
-      packet_from_upstream()->payload(),
-      packet_from_upstream()->duration(),
-      packet_exhausted_function_);
-
-  return CopyOrMixFrames();
-}
-
-LpcmStageInput* LpcmStageInput::get_lpcm() {
-  return this;
-}
-
-bool LpcmStageInput::connected_to_lpcm() {
-  return mate().get_lpcm() != nullptr;
-}
-
-bool LpcmStageInput::CopyOrMixFrames() {
-  DCHECK(buffer_);
-  DCHECK(demand_frames_.buffer());
-
-  uint64_t frame_count = std::min(
-      packet_frames_.frame_count(),
-      demand_frames_.frame_count());
-
-  if (frame_count == 0) {
-    return false;
-  }
-
-  DCHECK(packet_from_upstream());
-
-  if (mix_) {
-    lpcm_util_->Mix(
-        packet_frames_.buffer(),
-        demand_frames_.buffer(),
-        frame_count);
-  } else {
-    lpcm_util_->Copy(
-        packet_frames_.buffer(),
-        demand_frames_.buffer(),
-        frame_count);
-  }
-
-  bool end_of_stream = packet_from_upstream()->end_of_stream();
-
-  packet_frames_.Advance(frame_count);
-  demand_frames_.Advance(frame_count);
-
-  if (demand_frames_.frame_count() == 0 ||
-      (packet_frames_.frame_count() == 0 && end_of_stream)) {
-    end_of_stream_ = end_of_stream;
-    lpcm_supply_.Set(buffer_, frame_count_ - demand_frames_.frame_count());
-    demand_frames_.Reset();
-    return true;
-  }
-
-  return false;
-}
-
-void* LpcmStageInput::GetDemandBuffer(uint64_t frame_count) {
-  if (demand_buffer_.size() < frame_count) {
-    demand_buffer_.clear();
-    demand_buffer_.resize(frame_count);
-  }
-  return &demand_buffer_[0];
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/lpcm_stage_input.h b/services/media/framework/stages/lpcm_stage_input.h
deleted file mode 100644
index b5ce154..0000000
--- a/services/media/framework/stages/lpcm_stage_input.h
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_INPUT_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_INPUT_H_
-
-#include <vector>
-
-#include "services/media/framework/lpcm_util.h"
-#include "services/media/framework/models/demand.h"
-#include "services/media/framework/models/lpcm_frame_buffer.h"
-#include "services/media/framework/stages/stage_input.h"
-
-namespace mojo {
-namespace media {
-
-class LpcmStageOutput;
-
-// Represents a stage's connector to an adjacent upstream stage, with LPCM
-// optmizations.
-class LpcmStageInput : public StageInput {
- public:
-  LpcmStageInput();
-
-  ~LpcmStageInput();
-
-  // Sets the stream type.
-  void set_stream_type(const LpcmStreamType& stream_type);
-
-  // Suggests possible demand frame count, useful when !connected_to_lpcm().
-  void SuggestDemand(uint64_t frame_count, Engine* engine);
-
-  // Updates the demand signalled to the connected upstream output.
-  void SetLpcmDemand(
-      void* buffer,
-      uint64_t frame_count,
-      bool mix,
-      bool synchronous,
-      Engine* engine);
-
-  // Indicates whether demand is pending.
-  bool demand_pending() const {
-    return demand_pending_;
-  }
-
-  // Returns supplied frames.
-  LpcmFrameBuffer& lpcm_supply() {
-    return lpcm_supply_;
-  }
-
-  // Indicates whether we've hit end-of-stream.
-  bool end_of_stream() const {
-    return end_of_stream_;
-  }
-
-  // Indicates the frame count originally demanded.
-  uint64_t demand_frame_count() const {
-    return frame_count_;
-  }
-
-  // StageInput overrides.
-  bool SupplyPacketFromOutput(PacketPtr packet) override;
-
-  LpcmStageInput* get_lpcm() override;
-
- private:
-  static const uint64_t kDefaultFrameCount = 512;
-
-  bool connected_to_lpcm();
-
-  // Copies or mixes frames from packet_frames_ to demand_frames_. This is only
-  // used when !connected_to_lpcm() and the stage has supplied a buffer. The
-  // return value indicates whether the demand was met.
-  bool CopyOrMixFrames();
-
-  // Ensures demand_buffer_ can accommodate frame_count frames and returns it.
-  void* GetDemandBuffer(uint64_t frame_count);
-
-  bool demand_pending_;
-
-  void* buffer_;
-  uint64_t frame_count_;
-  bool mix_;
-  bool synchronous_;
-
-  LpcmFrameBuffer lpcm_supply_; // Frames supplied to this stage.
-  LpcmFrameBuffer packet_frames_; // Source for packet payload copy/mix.
-  LpcmFrameBuffer demand_frames_; // Destination for packet payload copy/mix.
-
-  bool end_of_stream_;
-
-  std::unique_ptr<LpcmUtil> lpcm_util_;
-  std::vector<uint8_t> demand_buffer_;
-
-  LpcmFrameBuffer::ExhaustedCallback packet_exhausted_function_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_INPUT_H_
diff --git a/services/media/framework/stages/lpcm_stage_output.cc b/services/media/framework/stages/lpcm_stage_output.cc
deleted file mode 100644
index 87cb033..0000000
--- a/services/media/framework/stages/lpcm_stage_output.cc
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/engine.h"
-#include "services/media/framework/stages/lpcm_stage_input.h"
-#include "services/media/framework/stages/lpcm_stage_output.h"
-#include "services/media/framework/stages/stage.h"
-
-namespace mojo {
-namespace media {
-
-LpcmStageOutput::LpcmStageOutput() :
-    buffer_(nullptr),
-    frame_count_(0),
-    mix_(false),
-    synchronous_(false),
-    next_presentation_time_(0) {}
-
-LpcmStageOutput::~LpcmStageOutput() {}
-
-void LpcmStageOutput::set_stream_type(const LpcmStreamType& stream_type) {
-  lpcm_demand_.set_bytes_per_frame(stream_type.bytes_per_frame());
-}
-
-void LpcmStageOutput::SupplyFrames(bool end_of_stream, Engine* engine) {
-  DCHECK(engine);
-  DCHECK(connected());
-  DCHECK(end_of_stream || buffer_);
-
-  uint64_t duration = frame_count_ - lpcm_demand_.frame_count();
-  if (allocator_ != nullptr) {
-    SupplyPacketInternal(
-        Packet::Create(
-            next_presentation_time_,
-            duration,
-            end_of_stream,
-            duration * lpcm_demand_.bytes_per_frame(),
-            duration == 0 ? nullptr : buffer_,
-            allocator_),
-        engine);
-  } else {
-    SupplyPacketInternal(
-        Packet::CreateNoAllocator(
-            next_presentation_time_,
-            duration,
-            end_of_stream,
-            duration * lpcm_demand_.bytes_per_frame(),
-            duration == 0 ? nullptr : buffer_),
-        engine);
-  }
-
-  next_presentation_time_ += duration;
-
-  buffer_ = nullptr;
-  frame_count_ = 0;
-  mix_ = false;
-  synchronous_ = false;
-  lpcm_demand_.Reset();
-}
-
-Allocator* LpcmStageOutput::Prepare(bool can_accept_allocator) {
-  // The stage isn't concerned with allocators, but we'll need one if our mate
-  // isn't LPCM.
-  Allocator* allocator = StageOutput::Prepare(true);
-  DCHECK(!connected_to_lpcm() || allocator == nullptr);
-  if (!connected_to_lpcm()) {
-    // Our mate isn't lpcm, so we'll need an allocator to create packet buffers.
-    // Use the provided one or the default.
-    allocator_ = allocator == nullptr ? Allocator::GetDefault() : allocator;
-  }
-
-  return nullptr;
-}
-
-LpcmStageOutput* LpcmStageOutput::get_lpcm() {
-  return this;
-}
-
-bool LpcmStageOutput::connected_to_lpcm() {
-  return mate().get_lpcm() != nullptr;
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/lpcm_stage_output.h b/services/media/framework/stages/lpcm_stage_output.h
deleted file mode 100644
index a60868a..0000000
--- a/services/media/framework/stages/lpcm_stage_output.h
+++ /dev/null
@@ -1,120 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_OUTPUT_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_OUTPUT_H_
-
-#include "services/media/framework/allocator.h"
-#include "services/media/framework/models/lpcm_frame_buffer.h"
-#include "services/media/framework/stages/stage_output.h"
-
-namespace mojo {
-namespace media {
-
-class LpcmStageInput;
-
-// Represents a stage's connector to an adjacent downstream stage.
-class LpcmStageOutput : public StageOutput {
- public:
-  LpcmStageOutput();
-
-  ~LpcmStageOutput();
-
-  // Sets the stream type.
-  void set_stream_type(const LpcmStreamType& stream_type);
-
-  // Gets demand suggestion for the stage's input.
-  uint64_t demand_suggestion() const {
-    if (lpcm_demand_.frame_count() != 0) {
-      return lpcm_demand_.frame_count();
-    }
-
-    return demand() == Demand::kPositive ? kDefaultFrameCount : 0;
-  }
-
-  // Gets the presentation time of the next packet to be supplied. Used for
-  // adaptation to non-lpcm input.
-  int64_t next_presentation_time() const {
-    return next_presentation_time_;
-  }
-
-  // Sets the presentation time for the next packet to be supplied. Used for
-  // adaptation to non-lpcm input.
-  void set_next_presentation_time(int64_t next_presentation_time) {
-    next_presentation_time_ = next_presentation_time;
-  }
-
-  // Gets the demand signalled from downstream.
-  LpcmFrameBuffer& lpcm_demand(uint64_t suggested_frame_count = 0) {
-    if (lpcm_demand_.frame_count() == 0 &&
-        demand() == Demand::kPositive &&
-        !connected_to_lpcm()) {
-      DCHECK(buffer_ == nullptr);
-      if (suggested_frame_count == 0) {
-        suggested_frame_count = kDefaultFrameCount;
-      }
-      DCHECK(allocator_);
-      void* buffer = allocator_->AllocatePayloadBuffer(
-          suggested_frame_count * lpcm_demand_.bytes_per_frame());
-      if (buffer != nullptr) {
-        UpdateLpcmDemand(buffer, suggested_frame_count, false, false);
-      }
-    }
-    return lpcm_demand_;
-  }
-
-  // Indicates whether supplied frames should be mixed.
-  bool mix() const {
-    return mix_;
-  }
-
-  // Indicates whether supplied frames need to be delivered synchronously.
-  bool synchronous() const {
-    return synchronous_;
-  }
-
-  // Indicates that all demanded frames have been supplied or we've hit end of
-  // stream. Called only by Stage::Update implementations.
-  void SupplyFrames(bool end_of_stream, Engine* engine);
-
-  // Demands LPCM frames. Called only by LpcmStageInput instances and
-  // LpcmStageOutput::lpcm_demand.
-  void UpdateLpcmDemand(
-      void* buffer,
-      uint64_t frame_count,
-      bool mix,
-      bool synchronous) {
-    buffer_ = buffer;
-    frame_count_ = frame_count;
-    mix_ = mix;
-    synchronous_ = synchronous;
-
-    lpcm_demand_.Set(buffer, frame_count);
-  }
-
-  // StageOutput override.
-  Allocator* Prepare(bool can_accept_allocator) override;
-
-  LpcmStageOutput* get_lpcm() override;
-
- private:
-  static const uint64_t kDefaultFrameCount = 512;
-
-  bool connected_to_lpcm();
-
-  void* buffer_;
-  uint64_t frame_count_;
-  bool mix_;
-  bool synchronous_;
-
-  LpcmFrameBuffer lpcm_demand_;
-
-  int64_t next_presentation_time_;
-  Allocator* allocator_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_OUTPUT_H_
diff --git a/services/media/framework/stages/lpcm_transform_stage.cc b/services/media/framework/stages/lpcm_transform_stage.cc
deleted file mode 100644
index 3f42e01..0000000
--- a/services/media/framework/stages/lpcm_transform_stage.cc
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/stages/lpcm_transform_stage.h"
-
-namespace mojo {
-namespace media {
-
-LpcmTransformStage::LpcmTransformStage(LpcmTransformPtr transform) :
-    transform_(transform) {
-  DCHECK(transform_);
-  input_.set_stream_type(transform_->input_stream_type());
-  output_.set_stream_type(transform_->output_stream_type());
-}
-
-LpcmTransformStage::~LpcmTransformStage() {}
-
-uint32_t LpcmTransformStage::input_count() const {
-  return 1;
-};
-
-StageInput& LpcmTransformStage::input(uint32_t index) {
-  DCHECK_EQ(index, 0u);
-  return input_;
-}
-
-uint32_t LpcmTransformStage::output_count() const {
-  return 1;
-}
-
-StageOutput& LpcmTransformStage::output(uint32_t index) {
-  DCHECK_EQ(index, 0u);
-  return output_;
-}
-
-bool LpcmTransformStage::Prepare(UpdateCallback update_callback) {
-  output_.Prepare(false);
-  input_.Prepare(nullptr, false);
-  return false;
-}
-
-void LpcmTransformStage::Update(Engine* engine) {
-  DCHECK(engine);
-
-  LpcmFrameBuffer& supply = input_.lpcm_supply();
-
-  if (supply.frame_count() != 0 || input_.end_of_stream()) {
-    // TODO(dalesat): Assumes 1-1.
-    LpcmFrameBuffer& demand = output_.lpcm_demand(supply.frame_count());
-
-    if (demand.frame_count() != 0) {
-      if (supply.frame_count() != 0) {
-        transform_->TransformFrames(&supply, &demand, output_.mix());
-      }
-
-      if (demand.frame_count() == 0 || input_.end_of_stream()) {
-        output_.SupplyFrames(input_.end_of_stream(), engine);
-      }
-    }
-  }
-
-  // TODO(dalesat): Assumes 1-1.
-  input_.SuggestDemand(output_.demand_suggestion(), engine);
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/lpcm_transform_stage.h b/services/media/framework/stages/lpcm_transform_stage.h
deleted file mode 100644
index 2c4f613..0000000
--- a/services/media/framework/stages/lpcm_transform_stage.h
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_TRANSFORM_STAGE_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_TRANSFORM_STAGE_H_
-
-#include "services/media/framework/models/lpcm_transform.h"
-#include "services/media/framework/stages/stage.h"
-
-namespace mojo {
-namespace media {
-
-// A stage that hosts an LpcmTransform.
-class LpcmTransformStage : public Stage {
- public:
-  LpcmTransformStage(LpcmTransformPtr transform);
-
-  ~LpcmTransformStage() override;
-
-  // Stage implementation.
-  uint32_t input_count() const override;
-
-  StageInput& input(uint32_t index) override;
-
-  uint32_t output_count() const override;
-
-  StageOutput& output(uint32_t index) override;
-
-  bool Prepare(UpdateCallback update_callback) override;
-
-  void Update(Engine* engine) override;
-
- private:
-  LpcmStageInput input_;
-  LpcmStageOutput output_;
-  LpcmTransformPtr transform_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_TRANSFORM_STAGE_H_
diff --git a/services/media/framework/stages/multistream_source_stage.cc b/services/media/framework/stages/multistream_source_stage.cc
new file mode 100644
index 0000000..3d64ba0
--- /dev/null
+++ b/services/media/framework/stages/multistream_source_stage.cc
@@ -0,0 +1,124 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/multistream_source_stage.h"
+
+namespace mojo {
+namespace media {
+
+MultistreamSourceStage::MultistreamSourceStage(
+    std::shared_ptr<MultistreamSource> source) :
+    source_(source),
+    ended_streams_(0) {
+  DCHECK(source);
+  outputs_.resize(source->stream_count());
+}
+
+MultistreamSourceStage::~MultistreamSourceStage() {}
+
+size_t MultistreamSourceStage::input_count() const {
+  return 0;
+};
+
+Input& MultistreamSourceStage::input(size_t index) {
+  CHECK(false) << "input requested from source";
+  return *(static_cast<Input*>(nullptr));
+}
+
+size_t MultistreamSourceStage::output_count() const {
+  return outputs_.size();
+}
+
+Output& MultistreamSourceStage::output(size_t index) {
+  DCHECK(index < outputs_.size());
+  return outputs_[index];
+}
+
+PayloadAllocator* MultistreamSourceStage::PrepareInput(size_t index) {
+  CHECK(false) << "PrepareInput called on source";
+  return nullptr;
+}
+
+void MultistreamSourceStage::PrepareOutput(
+    size_t index,
+    PayloadAllocator* allocator,
+    const UpstreamCallback& callback) {
+  DCHECK(index < outputs_.size());
+
+  if (allocator != nullptr) {
+    // Currently, we don't support a source that uses provided allocators. If
+    // we're provided an allocator, the output must have it so supplied packets
+    // can be copied.
+    outputs_[index].SetCopyAllocator(allocator);
+  }
+}
+
+void MultistreamSourceStage::UnprepareOutput(
+    size_t index,
+    const UpstreamCallback& callback) {
+  DCHECK(index < outputs_.size());
+  outputs_[index].SetCopyAllocator(nullptr);
+}
+
+void MultistreamSourceStage::Update(Engine* engine) {
+  DCHECK(engine);
+
+  bool has_positive_demand = false;
+  for (Output& output : outputs_) {
+    if (output.demand() == Demand::kPositive) {
+      has_positive_demand = true;
+      break;
+    }
+  }
+
+  while (true) {
+    if (cached_packet_ && has_positive_demand) {
+      DCHECK(cached_packet_output_index_ < outputs_.size());
+      Output& output = outputs_[cached_packet_output_index_];
+
+      if (output.demand() != Demand::kNegative) {
+        // cached_packet_ is intended for an output which will accept packets.
+        output.SupplyPacket(std::move(cached_packet_), engine);
+      }
+    }
+
+    if (cached_packet_) {
+      // There's still a cached packet. We're done for now.
+      return;
+    }
+
+    if (ended_streams_ == outputs_.size()) {
+      // We've seen end-of-stream for all streams. All done.
+      return;
+    }
+
+    // Pull a packet from the source.
+    cached_packet_ = source_->PullPacket(&cached_packet_output_index_);
+    DCHECK(cached_packet_);
+    DCHECK(cached_packet_output_index_ < outputs_.size());
+
+    if (cached_packet_->end_of_stream()) {
+      ended_streams_++;
+    }
+  }
+}
+
+void MultistreamSourceStage::FlushInput(
+    size_t index,
+    const DownstreamCallback& callback) {
+  CHECK(false) << "FlushInput called on source";
+}
+
+void MultistreamSourceStage::FlushOutput(size_t index) {
+  DCHECK(index < outputs_.size());
+  DCHECK(source_);
+  outputs_[index].Flush();
+  source_->Flush();
+  cached_packet_.reset(nullptr);
+  cached_packet_output_index_ = 0;
+  ended_streams_ = 0;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/multistream_source_stage.h b/services/media/framework/stages/multistream_source_stage.h
new file mode 100644
index 0000000..d7af7a2
--- /dev/null
+++ b/services/media/framework/stages/multistream_source_stage.h
@@ -0,0 +1,63 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_MULTISTREAM_SOURCE_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_MULTISTREAM_SOURCE_STAGE_H_
+
+#include <vector>
+
+#include "services/media/framework/models/multistream_source.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts a MultistreamSource.
+// TODO(dalesat): May need to grow the list of outputs dynamically.
+class MultistreamSourceStage : public Stage {
+ public:
+  MultistreamSourceStage(std::shared_ptr<MultistreamSource> source);
+
+  ~MultistreamSourceStage() override;
+
+  // Stage implementation.
+  size_t input_count() const override;
+
+  Input& input(size_t index) override;
+
+  size_t output_count() const override;
+
+  Output& output(size_t index) override;
+
+  PayloadAllocator* PrepareInput(size_t index) override;
+
+  void PrepareOutput(
+      size_t index,
+      PayloadAllocator* allocator,
+      const UpstreamCallback& callback) override;
+
+  void UnprepareOutput(
+      size_t index,
+      const UpstreamCallback& callback) override;
+
+  void Update(Engine* engine) override;
+
+  void FlushInput(
+      size_t index,
+      const DownstreamCallback& callback) override;
+
+  void FlushOutput(size_t index) override;
+
+ private:
+  std::vector<Output> outputs_;
+  std::shared_ptr<MultistreamSource> source_;
+  PacketPtr cached_packet_;
+  size_t cached_packet_output_index_;
+  size_t ended_streams_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_MULTISTREAM_SOURCE_STAGE_H_
diff --git a/services/media/framework/stages/output.cc b/services/media/framework/stages/output.cc
new file mode 100644
index 0000000..e64d913
--- /dev/null
+++ b/services/media/framework/stages/output.cc
@@ -0,0 +1,94 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/stages/output.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+Output::Output() :
+    demand_(Demand::kNegative),
+    copy_allocator_(nullptr) {}
+
+Output::~Output() {}
+
+void Output::Connect(const InputRef& input) {
+  DCHECK(input.valid());
+  DCHECK(!mate_);
+  mate_ = input;
+}
+
+Input& Output::actual_mate() const {
+  DCHECK(mate_.valid());
+  return mate_.actual();
+}
+
+void Output::SetCopyAllocator(PayloadAllocator* copy_allocator) {
+  DCHECK(connected());
+  copy_allocator_ = copy_allocator;
+}
+
+Demand Output::demand() const {
+  DCHECK(connected());
+
+  // Return negative demand if mate() already has a packet.
+  // We check demand_ here to possibly avoid the second check.
+  if (demand_ == Demand::kNegative || actual_mate().packet_from_upstream()) {
+    return Demand::kNegative;
+  }
+
+  return demand_;
+}
+
+void Output::SupplyPacket(PacketPtr packet, Engine* engine) const {
+  DCHECK(packet);
+  DCHECK(engine);
+  DCHECK(connected());
+
+  if (copy_allocator_ != nullptr) {
+    // Need to copy the packet due to an allocation conflict.
+    size_t size = packet->size();
+    void *buffer;
+
+    if (size == 0) {
+      buffer = nullptr;
+    } else {
+      buffer = copy_allocator_->AllocatePayloadBuffer(size);
+      if (buffer == nullptr) {
+        LOG(WARNING) << "allocator starved copying output";
+        return;
+      }
+      memcpy(buffer, packet->payload(), size);
+    }
+
+    packet = Packet::Create(
+        packet->presentation_time(),
+        packet->duration(),
+        packet->end_of_stream(),
+        size,
+        buffer,
+        copy_allocator_);
+  }
+
+  if (actual_mate().SupplyPacketFromOutput(std::move(packet))) {
+    engine->PushToSupplyBacklog(mate_.stage_);
+  }
+}
+
+bool Output::UpdateDemandFromInput(Demand demand) {
+  if (demand_ == demand) {
+    return false;
+  }
+  demand_ = demand;
+  return true;
+}
+
+void Output::Flush() {
+  demand_ = Demand::kNegative;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/output.h b/services/media/framework/stages/output.h
new file mode 100644
index 0000000..4cebeb5
--- /dev/null
+++ b/services/media/framework/stages/output.h
@@ -0,0 +1,68 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_OUTPUT_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_OUTPUT_H_
+
+#include "services/media/framework/packet.h"
+#include "services/media/framework/payload_allocator.h"
+#include "services/media/framework/refs.h"
+
+namespace mojo {
+namespace media {
+
+class Stage;
+class Engine;
+class Input;
+
+// Represents a stage's connector to an adjacent downstream stage.
+class Output {
+ public:
+  Output();
+
+  ~Output();
+
+  // The input to which this output is connected.
+  const InputRef& mate() const { return mate_; }
+
+  // Establishes a connection.
+  void Connect(const InputRef& input);
+
+  // Breaks a connection. Called only by the engine.
+  void Disconnect() { mate_ = nullptr; }
+
+  // Determines whether the output is connected to an input.
+  bool connected() const { return static_cast<bool>(mate_); }
+
+  // The connected input.
+  Input& actual_mate() const;
+
+  // Sets the allocator the output must use to copy the payload of output
+  // packets. This is used when the connected input insists that a specific
+  // allocator be used, but the stage can't use it.
+  void SetCopyAllocator(PayloadAllocator* copy_allocator);
+
+  // Demand signalled from downstream, or kNegative if the downstream input
+  // is currently holding a packet.
+  Demand demand() const;
+
+  // Supplies a packet to mate. Called only by Stage::Update implementations.
+  void SupplyPacket(PacketPtr packet, Engine* engine) const;
+
+  // Updates packet demand. Called only by Input instances.
+  bool UpdateDemandFromInput(Demand demand);
+
+  // Flushes retained media.
+  void Flush();
+
+ private:
+  InputRef mate_;
+  Demand demand_;
+  PayloadAllocator* copy_allocator_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_OUTPUT_H_
diff --git a/services/media/framework/stages/packet_transform_stage.cc b/services/media/framework/stages/packet_transform_stage.cc
deleted file mode 100644
index 2564f12..0000000
--- a/services/media/framework/stages/packet_transform_stage.cc
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/stages/packet_transform_stage.h"
-
-namespace mojo {
-namespace media {
-
-PacketTransformStage::PacketTransformStage(
-    PacketTransformPtr transform) :
-    transform_(transform),
-    allocator_(nullptr),
-    input_packet_is_new_(true) {
-  DCHECK(transform_);
-}
-
-PacketTransformStage::~PacketTransformStage() {}
-
-uint32_t PacketTransformStage::input_count() const {
-  return 1;
-};
-
-StageInput& PacketTransformStage::input(uint32_t index) {
-  DCHECK_EQ(index, 0u);
-  return input_;
-}
-
-uint32_t PacketTransformStage::output_count() const {
-  return 1;
-}
-
-StageOutput& PacketTransformStage::output(uint32_t index) {
-  DCHECK_EQ(index, 0u);
-  return output_;
-}
-
-bool PacketTransformStage::Prepare(const UpdateCallback update_callback) {
-  allocator_ = output_.Prepare(true);
-  if (allocator_ == nullptr) {
-    allocator_ = Allocator::GetDefault();
-  }
-  input_.Prepare(nullptr, false);
-  return false;
-}
-
-void PacketTransformStage::Update(Engine* engine) {
-  DCHECK(engine);
-  DCHECK(allocator_);
-
-  if (input_.packet_from_upstream() && output_.demand() != Demand::kNegative) {
-    PacketPtr output_packet;
-    if (transform_->TransformPacket(
-        input_.packet_from_upstream(),
-        input_packet_is_new_,
-        allocator_,
-        &output_packet)) {
-      input_.packet_from_upstream().reset();
-      input_packet_is_new_ = true;
-    } else {
-      input_packet_is_new_ = false;
-    }
-
-    if (output_packet) {
-      output_.SupplyPacket(std::move(output_packet), engine);
-    }
-  }
-
-  input_.SetDemand(output_.demand(), engine);
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/packet_transform_stage.h b/services/media/framework/stages/packet_transform_stage.h
deleted file mode 100644
index ab03e78..0000000
--- a/services/media/framework/stages/packet_transform_stage.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_PACKET_TRANSFORM_STAGE_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_PACKET_TRANSFORM_STAGE_H_
-
-#include "services/media/framework/models/packet_transform.h"
-#include "services/media/framework/stages/stage.h"
-
-namespace mojo {
-namespace media {
-
-// A stage that hosts a PacketTransform.
-class PacketTransformStage : public Stage {
- public:
-  PacketTransformStage(PacketTransformPtr transform);
-
-  ~PacketTransformStage() override;
-
-  // Stage implementation.
-  uint32_t input_count() const override;
-
-  StageInput& input(uint32_t index) override;
-
-  uint32_t output_count() const override;
-
-  StageOutput& output(uint32_t index) override;
-
-  bool Prepare(const UpdateCallback update_callback) override;
-
-  void Update(Engine* engine) override;
-
- private:
-  StageInput input_;
-  StageOutput output_;
-  PacketTransformPtr transform_;
-  Allocator* allocator_;
-  bool input_packet_is_new_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_PACKET_TRANSFORM_STAGE_H_
diff --git a/services/media/framework/stages/stage.cc b/services/media/framework/stages/stage.cc
index e46eead..813da9b 100644
--- a/services/media/framework/stages/stage.cc
+++ b/services/media/framework/stages/stage.cc
@@ -9,15 +9,16 @@
 namespace media {
 
 Stage::Stage() :
-    prepared_(false),
     in_supply_backlog_(false),
     in_demand_backlog_(false) {}
 
 Stage::~Stage() {}
 
-bool Stage::Prepare(UpdateCallback update_callback) {
-  return false;
-}
+void Stage::UnprepareInput(size_t index) {}
+
+void Stage::UnprepareOutput(
+    size_t index,
+    const UpstreamCallback& callback) {}
 
 void Stage::Prime() {}
 
diff --git a/services/media/framework/stages/stage.h b/services/media/framework/stages/stage.h
index 20b21c4..ce5299b 100644
--- a/services/media/framework/stages/stage.h
+++ b/services/media/framework/stages/stage.h
@@ -2,16 +2,15 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_H_
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_STAGE_H_
 
 #include <vector>
 
 #include "services/media/framework/packet.h"
-#include "services/media/framework/stages/lpcm_stage_input.h"
-#include "services/media/framework/stages/lpcm_stage_output.h"
-#include "services/media/framework/stages/stage_input.h"
-#include "services/media/framework/stages/stage_output.h"
+#include "services/media/framework/payload_allocator.h"
+#include "services/media/framework/stages/input.h"
+#include "services/media/framework/stages/output.h"
 
 namespace mojo {
 namespace media {
@@ -21,30 +20,53 @@
 // Host for a source, sink or transform.
 class Stage {
  public:
+  using UpstreamCallback = std::function<void(size_t input_index)>;
+  using DownstreamCallback = std::function<void(size_t output_index)>;
   using UpdateCallback = std::function<void(Stage* stage)>;
 
   Stage();
 
   virtual ~Stage();
 
+  void SetUpdateCallback(const UpdateCallback& update_callback) {
+    update_callback_ = update_callback;
+  }
+
   // Returns the number of input connections.
-  virtual uint32_t input_count() const = 0;
+  virtual size_t input_count() const = 0;
 
   // Returns the indicated input connection.
-  virtual StageInput& input(uint32_t index) = 0;
+  virtual Input& input(size_t index) = 0;
 
   // Returns the number of output connections.
-  virtual uint32_t output_count() const = 0;
+  virtual size_t output_count() const = 0;
 
   // Returns the indicated output connection.
-  virtual StageOutput& output(uint32_t index) = 0;
+  virtual Output& output(size_t index) = 0;
 
-  // Prepares the stage for operation, providing a callback used to signal the
-  // need to update this stage. Returns true if the stage will call the
-  // callback, false if not. The default implementation of this method returns
-  // false.
-  // TODO(dalesat): Should this be const UpdateCallback&?
-  virtual bool Prepare(UpdateCallback update_callback);
+  // Prepares the input for operation. Returns nullptr unless the connected
+  // output must use a specific allocator, in which case it returns that
+  // allocator.
+  virtual PayloadAllocator* PrepareInput(size_t index) = 0;
+
+  // Prepares the output for operation, passing an allocator that must be used
+  // by the output or nullptr if there is no such requirement. The callback is
+  // used to indicate what inputs are ready to be prepared as a consequence of
+  // preparing the output.
+  virtual void PrepareOutput(
+      size_t index,
+      PayloadAllocator* allocator,
+      const UpstreamCallback& callback) = 0;
+
+  // Unprepares the input. The default implementation does nothing.
+  virtual void UnprepareInput(size_t index);
+
+  // Unprepares the output. The default implementation does nothing. The
+  // the callback is used to indicate what inputs are ready to be unprepared as
+  // a consequence of unpreparing the output.
+  virtual void UnprepareOutput(
+    size_t index,
+    const UpstreamCallback& callback);
 
   // Initiates demand. Called on sink stages after the graph is prepared. The
   // default implementation does nothing.
@@ -53,13 +75,23 @@
   // Performs processing.
   virtual void Update(Engine* engine) = 0;
 
-  // Returns a bool indicating whether the stage is prepared.
-  bool prepared() {
-    return prepared_;
+  // Flushes an input. The callback is used to indicate what outputs are ready
+  // to be flushed as a consequence of flushing the input.
+  virtual void FlushInput(
+      size_t index,
+      const DownstreamCallback& callback) = 0;
+
+  // Flushes an output.
+  virtual void FlushOutput(size_t index) = 0;
+
+ protected:
+  void RequestUpdate() {
+    DCHECK(update_callback_);
+    update_callback_(this);
   }
 
  private:
-  bool prepared_;
+  UpdateCallback update_callback_;
   bool in_supply_backlog_;
   bool in_demand_backlog_;
 
@@ -69,4 +101,4 @@
 }  // namespace media
 }  // namespace mojo
 
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_H_
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_STAGE_H_
diff --git a/services/media/framework/stages/stage_input.cc b/services/media/framework/stages/stage_input.cc
deleted file mode 100644
index 0f5312a..0000000
--- a/services/media/framework/stages/stage_input.cc
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/engine.h"
-#include "services/media/framework/stages/stage.h"
-#include "services/media/framework/stages/stage_input.h"
-
-namespace mojo {
-namespace media {
-
-StageInput::StageInput() :
-    upstream_stage_(nullptr),
-    output_index_(0),
-    allocator_(nullptr),
-    must_allocate_(false) {}
-
-StageInput::~StageInput() {}
-
-void StageInput::connect(Stage* upstream_stage, uint32_t output_index) {
-  DCHECK(upstream_stage);
-  DCHECK(output_index < upstream_stage->output_count());
-  DCHECK(upstream_stage_ == nullptr);
-  upstream_stage_ = upstream_stage;
-  output_index_ = output_index;
-}
-
-StageOutput& StageInput::mate() const {
-  DCHECK(upstream_stage_);
-  DCHECK(output_index_ < upstream_stage_->output_count());
-  return upstream_stage_->output(output_index_);
-}
-
-void StageInput::Prepare(Allocator* allocator, bool must_allocate) {
-  DCHECK(allocator != nullptr || must_allocate == false);
-  allocator_ = allocator;
-  must_allocate_ = must_allocate;
-}
-
-Allocator* StageInput::allocator() const {
-  DCHECK(connected());
-  DCHECK(mate().downstream_stage()->prepared());
-  return allocator_;
-}
-
-bool StageInput::must_allocate() const {
-  DCHECK(connected());
-  DCHECK(mate().downstream_stage()->prepared());
-  return must_allocate_;
-}
-
-void StageInput::SetDemand(Demand demand, Engine* engine) const {
-  DCHECK(engine);
-  DCHECK(connected());
-
-  if (mate().UpdateDemand(demand)) {
-    engine->PushToDemandBacklogUnsafe(upstream_stage());
-  }
-}
-
-bool StageInput::SupplyPacketFromOutput(PacketPtr packet) {
-  DCHECK(packet);
-  DCHECK(!packet_from_upstream_);
-  packet_from_upstream_ = std::move(packet);
-  return true;
-}
-
-LpcmStageInput* StageInput::get_lpcm() {
-  return nullptr;
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/stage_input.h b/services/media/framework/stages/stage_input.h
deleted file mode 100644
index 2fe61c6..0000000
--- a/services/media/framework/stages/stage_input.h
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_INPUT_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_INPUT_H_
-
-#include "services/media/framework/models/demand.h"
-#include "services/media/framework/packet.h"
-
-namespace mojo {
-namespace media {
-
-class Stage;
-class Engine;
-class StageOutput;
-class LpcmStageInput;
-
-// Represents a stage's connector to an adjacent upstream stage.
-class StageInput {
- public:
-  StageInput();
-
-  ~StageInput();
-
-  // The stage to which this input is connected.
-  Stage* upstream_stage() const { return upstream_stage_; }
-
-  // The index of the output to which this input is connected.
-  uint32_t output_index() const { return output_index_; }
-
-  // Establishes a connection. Called only by the engine.
-  void connect(Stage* upstream_stage, uint32_t output_index);
-
-  // Breaks a connection. Called only by the engine.
-  void disconnect() {
-    upstream_stage_ = nullptr;
-    output_index_ = 0;
-  }
-
-  // Determines whether the input is connected to an output.
-  bool connected() const { return upstream_stage_ != nullptr; }
-
-  // The connected output.
-  StageOutput& mate() const;
-
-  // Prepare to move packets by providing an allocator for the upstream stage
-  // if we have one and indicating whether we require its use.
-  void Prepare(Allocator* allocator, bool must_allocate);
-
-  // Returns the allocator provided by this input.
-  Allocator* allocator() const;
-
-  // Determines whether the input requires the use of its allocator.
-  bool must_allocate() const;
-
-  // A packet supplied from upstream.
-  PacketPtr& packet_from_upstream() { return packet_from_upstream_; }
-
-  // Updates mate's demand. Called only by Stage::Update implementations.
-  void SetDemand(Demand demand, Engine* engine) const;
-
-  // Updates packet_from_upstream. Return value indicates whether the stage for
-  // this input should be added to the supply backlog. Called only by
-  // StageOutput instances.
-  virtual bool SupplyPacketFromOutput(PacketPtr packet);
-
-  // Returns the LPCM specialization if this instance is an LpcmStageInput,
-  // nullptr otherwise.
-  virtual LpcmStageInput* get_lpcm();
-
- private:
-  Stage* upstream_stage_;
-  uint32_t output_index_;
-  Allocator* allocator_;
-  bool must_allocate_;
-  PacketPtr packet_from_upstream_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_INPUT_H_
diff --git a/services/media/framework/stages/stage_output.cc b/services/media/framework/stages/stage_output.cc
deleted file mode 100644
index 34d921f..0000000
--- a/services/media/framework/stages/stage_output.cc
+++ /dev/null
@@ -1,122 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "services/media/framework/engine.h"
-#include "services/media/framework/stages/stage.h"
-#include "services/media/framework/stages/stage_output.h"
-
-namespace mojo {
-namespace media {
-
-StageOutput::StageOutput() :
-    downstream_stage_(nullptr),
-    input_index_(0),
-    demand_(Demand::kNegative),
-    copy_allocator_(nullptr) {}
-
-StageOutput::~StageOutput() {}
-
-void StageOutput::connect(Stage* downstream_stage, uint32_t input_index) {
-  DCHECK(downstream_stage);
-  DCHECK(input_index < downstream_stage->input_count());
-  DCHECK(downstream_stage_ == nullptr);
-  downstream_stage_ = downstream_stage;
-  input_index_ = input_index;
-}
-
-StageInput& StageOutput::mate() const {
-  DCHECK(downstream_stage_);
-  DCHECK(input_index_ < downstream_stage_->input_count());
-  return downstream_stage_->input(input_index_);
-}
-
-Allocator* StageOutput::Prepare(bool can_accept_allocator) {
-  DCHECK(connected());
-  StageInput& mate = this->mate();
-  copy_allocator_ = nullptr;
-
-  if (can_accept_allocator) {
-    // We can use an allocator. Use our mate's allocator, if it has one.
-    return mate.allocator();
-  } else if (mate.must_allocate()) {
-    // We can't use an allocator, but our mate needs us to. We'll need to copy
-    // every packet.
-    copy_allocator_ = mate.allocator();
-    DCHECK(copy_allocator_);
-    return nullptr;
-  } else {
-    // We can't use an allocator, and our mate doesn't need us to.
-    return nullptr;
-  }
-}
-
-Demand StageOutput::demand() const {
-  DCHECK(connected());
-
-  // Return negative demand if mate() already has a packet.
-  // We check demand_ here to possibly avoid the second check.
-  if (demand_ == Demand::kNegative || mate().packet_from_upstream()) {
-    return Demand::kNegative;
-  }
-
-  return demand_;
-}
-
-void StageOutput::SupplyPacket(PacketPtr packet, Engine* engine) const {
-  DCHECK(packet);
-  DCHECK(engine);
-  DCHECK(connected());
-
-  if (copy_allocator_ != nullptr) {
-    // Need to copy the packet due to an allocation conflict.
-    uint64_t size = packet->size();
-    void *buffer;
-
-    if (size == 0) {
-      buffer = nullptr;
-    } else {
-      buffer = copy_allocator_->AllocatePayloadBuffer(size);
-      if (buffer == nullptr) {
-        // Starved for buffer space.
-        return;
-      }
-      memcpy(buffer, packet->payload(), size);
-    }
-
-    packet = Packet::Create(
-        packet->presentation_time(),
-        packet->duration(),
-        packet->end_of_stream(),
-        size,
-        buffer,
-        copy_allocator_);
-  }
-
-  SupplyPacketInternal(std::move(packet), engine);
-}
-
-bool StageOutput::UpdateDemand(Demand demand) {
-  if (demand_ == demand) {
-    return false;
-  }
-  demand_ = demand;
-  return true;
-}
-
-LpcmStageOutput* StageOutput::get_lpcm() {
-  return nullptr;
-}
-
-void StageOutput::SupplyPacketInternal(PacketPtr packet, Engine* engine)
-    const {
-  DCHECK(packet);
-  DCHECK(engine);
-  DCHECK(connected());
-  if (mate().SupplyPacketFromOutput(std::move(packet))) {
-    engine->PushToSupplyBacklogUnsafe(downstream_stage());
-  }
-}
-
-}  // namespace media
-}  // namespace mojo
diff --git a/services/media/framework/stages/stage_output.h b/services/media/framework/stages/stage_output.h
deleted file mode 100644
index 8cf72bd..0000000
--- a/services/media/framework/stages/stage_output.h
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_OUTPUT_H_
-#define SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_OUTPUT_H_
-
-#include "services/media/framework/allocator.h"
-#include "services/media/framework/packet.h"
-
-namespace mojo {
-namespace media {
-
-class Stage;
-class Engine;
-class StageInput;
-class LpcmStageOutput;
-
-// Represents a stage's connector to an adjacent downstream stage.
-class StageOutput {
- public:
-  StageOutput();
-
-  ~StageOutput();
-
-  // The stage to which this output is connected.
-  Stage* downstream_stage() const { return downstream_stage_; }
-
-  // The index of the input to which this output is connected.
-  uint32_t input_index() const { return input_index_; }
-
-  // Establishes a connection. Called only by the engine.
-  void connect(Stage* downstream_stage, uint32_t input_index);
-
-  // Breaks a connection. Called only by the engine.
-  void disconnect() {
-    downstream_stage_ = nullptr;
-    input_index_ = 0;
-  }
-
-  // Determines whether the output is connected to an input.
-  bool connected() const {
-    return downstream_stage_ != nullptr;
-  }
-
-  // The connected input.
-  StageInput& mate() const;
-
-  // Gets ready to move packets by negotiating the use of allocators. If the
-  // downstream input provides an allocator, and we can use it, this method
-  // returns the provided allocator. Otherwise, it returns nullptr.
-  virtual Allocator* Prepare(bool can_accept_allocator);
-
-  // Demand signalled from downstream, or kNegative if the downstream input
-  // is currently holding a packet.
-  Demand demand() const;
-
-  // Supplies a packet to mate. Called only by Stage::Update implementations.
-  void SupplyPacket(PacketPtr packet, Engine* engine) const;
-
-  // Updates packet demand. Called only by StageInput instances.
-  bool UpdateDemand(Demand demand);
-
-  // Returns the LPCM specialization if this instance is an LpcmStageOutput,
-  // nullptr otherwise.
-  virtual LpcmStageOutput* get_lpcm();
-
- protected:
-  void SupplyPacketInternal(PacketPtr packet, Engine* engine) const;
-
- private:
-  Stage* downstream_stage_;
-  uint32_t input_index_;
-  Demand demand_;
-  Allocator* copy_allocator_;
-};
-
-}  // namespace media
-}  // namespace mojo
-
-#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_OUTPUT_H_
diff --git a/services/media/framework/stages/transform_stage.cc b/services/media/framework/stages/transform_stage.cc
new file mode 100644
index 0000000..dce8254
--- /dev/null
+++ b/services/media/framework/stages/transform_stage.cc
@@ -0,0 +1,104 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/transform_stage.h"
+
+namespace mojo {
+namespace media {
+
+TransformStage::TransformStage(
+    std::shared_ptr<Transform> transform) :
+    transform_(transform),
+    allocator_(nullptr),
+    input_packet_is_new_(true) {
+  DCHECK(transform_);
+}
+
+TransformStage::~TransformStage() {}
+
+size_t TransformStage::input_count() const {
+  return 1;
+};
+
+Input& TransformStage::input(size_t index) {
+  DCHECK_EQ(index, 0u);
+  return input_;
+}
+
+size_t TransformStage::output_count() const {
+  return 1;
+}
+
+Output& TransformStage::output(size_t index) {
+  DCHECK_EQ(index, 0u);
+  return output_;
+}
+
+PayloadAllocator* TransformStage::PrepareInput(size_t index) {
+  DCHECK_EQ(index, 0u);
+  return nullptr;
+}
+
+void TransformStage::PrepareOutput(
+    size_t index,
+    PayloadAllocator* allocator,
+    const UpstreamCallback& callback) {
+  DCHECK_EQ(index, 0u);
+
+  allocator_ =
+      allocator == nullptr ? PayloadAllocator::GetDefault() : allocator;
+
+  callback(0);
+}
+
+void TransformStage::UnprepareOutput(
+    size_t index,
+    const UpstreamCallback& callback) {
+  allocator_ = nullptr;
+  callback(0);
+}
+
+void TransformStage::Update(Engine* engine) {
+  DCHECK(engine);
+  DCHECK(allocator_);
+
+  if (input_.packet_from_upstream() && output_.demand() != Demand::kNegative) {
+    PacketPtr output_packet;
+    if (transform_->TransformPacket(
+        input_.packet_from_upstream(),
+        input_packet_is_new_,
+        allocator_,
+        &output_packet)) {
+      input_.packet_from_upstream().reset();
+      input_packet_is_new_ = true;
+    } else {
+      input_packet_is_new_ = false;
+    }
+
+    if (output_packet) {
+      output_.SupplyPacket(std::move(output_packet), engine);
+    }
+  }
+
+  input_.SetDemand(output_.demand(), engine);
+}
+
+void TransformStage::FlushInput(
+    size_t index,
+    const DownstreamCallback& callback) {
+  DCHECK_EQ(index, 0u);
+  input_.Flush();
+  callback(0);
+}
+
+void TransformStage::FlushOutput(size_t index) {
+  DCHECK_EQ(index, 0u);
+  DCHECK(transform_);
+  output_.Flush();
+  transform_->Flush();
+  input_packet_is_new_ = true;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/transform_stage.h b/services/media/framework/stages/transform_stage.h
new file mode 100644
index 0000000..07cec4a
--- /dev/null
+++ b/services/media/framework/stages/transform_stage.h
@@ -0,0 +1,60 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_TRANSFORM_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_TRANSFORM_STAGE_H_
+
+#include "services/media/framework/models/transform.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts a Transform.
+class TransformStage : public Stage {
+ public:
+  TransformStage(std::shared_ptr<Transform> transform);
+
+  ~TransformStage() override;
+
+  // Stage implementation.
+  size_t input_count() const override;
+
+  Input& input(size_t index) override;
+
+  size_t output_count() const override;
+
+  Output& output(size_t index) override;
+
+  PayloadAllocator* PrepareInput(size_t index) override;
+
+  void PrepareOutput(
+      size_t index,
+      PayloadAllocator* allocator,
+      const UpstreamCallback& callback) override;
+
+  void UnprepareOutput(
+      size_t index,
+      const UpstreamCallback& callback) override;
+
+  void Update(Engine* engine) override;
+
+  void FlushInput(
+      size_t index,
+      const DownstreamCallback& callback) override;
+
+  void FlushOutput(size_t index) override;
+
+ private:
+  Input input_;
+  Output output_;
+  std::shared_ptr<Transform> transform_;
+  PayloadAllocator* allocator_;
+  bool input_packet_is_new_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_TRANSFORM_STAGE_H_
diff --git a/services/media/framework/stream_type.cc b/services/media/framework/stream_type.cc
index 1839f92..ae2f19a 100644
--- a/services/media/framework/stream_type.cc
+++ b/services/media/framework/stream_type.cc
@@ -3,13 +3,18 @@
 // found in the LICENSE file.
 
 #include "base/logging.h"
+#include "services/media/framework/safe_clone.h"
 #include "services/media/framework/stream_type.h"
 
 namespace mojo {
 namespace media {
 
-BytesPtr Bytes::Clone() const {
-  return BytesPtr(new Bytes(*this));
+Bytes::Bytes(size_t size) : storage_(size) {}
+
+Bytes::~Bytes() {}
+
+std::unique_ptr<Bytes> Bytes::Clone() const {
+  return std::unique_ptr<Bytes>(new Bytes(*this));
 }
 
 StreamType::StreamType(Scheme scheme) : scheme_(scheme) {}
@@ -36,30 +41,14 @@
   return nullptr;
 }
 
-StreamTypePtr StreamType::Clone() const {
+std::unique_ptr<StreamType> StreamType::Clone() const {
   return Create(scheme());
 }
 
-StreamTypesPtr StreamTypes::Clone() const {
-  StreamTypes* result = new StreamTypes(size());
-  for (const StreamTypePtr& stream_type : *this) {
-    result->push_back(stream_type.Clone());
-  }
-  return StreamTypesPtr(result);
-}
-
 StreamTypeSet::StreamTypeSet(StreamType::Scheme scheme) : scheme_(scheme) {}
 
 StreamTypeSet::~StreamTypeSet() {}
 
-StreamTypeSetsPtr StreamTypeSets::Clone() const {
-  StreamTypeSets* result = new StreamTypeSets(size());
-  for (const StreamTypeSetPtr& stream_type_set : *this) {
-    result->push_back(stream_type_set.Clone());
-  }
-  return StreamTypeSetsPtr(result);
-}
-
 const MultiplexedStreamTypeSet* StreamTypeSet::multiplexed() const {
   NOTREACHED();
   return nullptr;
@@ -80,13 +69,13 @@
   return nullptr;
 }
 
-StreamTypeSetPtr StreamTypeSet::Clone() const {
+std::unique_ptr<StreamTypeSet> StreamTypeSet::Clone() const {
   return Create(scheme());
 }
 
 MultiplexedStreamType::MultiplexedStreamType(
-    StreamTypePtr multiplex_type,
-    StreamTypesPtr substream_types) :
+    std::unique_ptr<StreamType> multiplex_type,
+    std::unique_ptr<std::vector<std::unique_ptr<StreamType>>> substream_types) :
     StreamType(Scheme::kMultiplexed),
     multiplex_type_(std::move(multiplex_type)),
     substream_types_(std::move(substream_types)) {}
@@ -97,13 +86,14 @@
   return this;
 }
 
-StreamTypePtr MultiplexedStreamType::Clone() const {
-  return Create(multiplex_type().Clone(), substream_types().Clone());
+std::unique_ptr<StreamType> MultiplexedStreamType::Clone() const {
+  return Create(SafeClone(multiplex_type()), SafeClone(substream_types()));
 }
 
 MultiplexedStreamTypeSet::MultiplexedStreamTypeSet(
-    StreamTypeSetPtr multiplex_type_set,
-    StreamTypeSetsPtr substream_type_sets) :
+    std::unique_ptr<StreamTypeSet> multiplex_type_set,
+    std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>
+        substream_type_sets) :
     StreamTypeSet(StreamType::Scheme::kMultiplexed),
     multiplex_type_set_(std::move(multiplex_type_set)),
     substream_type_sets_(std::move(substream_type_sets)) {}
@@ -114,8 +104,10 @@
   return this;
 }
 
-StreamTypeSetPtr MultiplexedStreamTypeSet::Clone() const {
-  return Create(multiplex_type_set().Clone(), substream_type_sets().Clone());
+std::unique_ptr<StreamTypeSet> MultiplexedStreamTypeSet::Clone() const {
+  return Create(
+      SafeClone(multiplex_type_set()),
+      SafeClone(substream_type_sets()));
 }
 
 LpcmStreamType::LpcmStreamType(
@@ -165,7 +157,7 @@
   return 0;
 }
 
-StreamTypePtr LpcmStreamType::Clone() const {
+std::unique_ptr<StreamType> LpcmStreamType::Clone() const {
   return Create(sample_format(), channels(), frames_per_second());
 }
 
@@ -203,7 +195,7 @@
       frames_per_second().contains(type.frames_per_second());
 }
 
-StreamTypeSetPtr LpcmStreamTypeSet::Clone() const {
+std::unique_ptr<StreamTypeSet> LpcmStreamTypeSet::Clone() const {
   return Create(sample_format(), channels(), frames_per_second());
 }
 
@@ -212,7 +204,7 @@
   SampleFormat sample_format,
   uint32_t channels,
   uint32_t frames_per_second,
-  BytesPtr encoding_details) :
+  std::unique_ptr<Bytes> encoding_details) :
   LpcmStreamType(
       Scheme::kCompressedAudio,
       sample_format,
@@ -228,13 +220,13 @@
   return this;
 }
 
-StreamTypePtr CompressedAudioStreamType::Clone() const {
+std::unique_ptr<StreamType> CompressedAudioStreamType::Clone() const {
   return Create(
       encoding(),
       sample_format(),
       channels(),
       frames_per_second(),
-      encoding_details().Clone());
+      SafeClone(encoding_details()));
 }
 
 CompressedAudioStreamTypeSet::CompressedAudioStreamTypeSet(
@@ -267,7 +259,7 @@
       frames_per_second().contains(type.frames_per_second());
 }
 
-StreamTypeSetPtr CompressedAudioStreamTypeSet::Clone() const {
+std::unique_ptr<StreamTypeSet> CompressedAudioStreamTypeSet::Clone() const {
   return Create(
       encoding(),
       sample_format(),
@@ -284,7 +276,7 @@
   uint32_t height,
   uint32_t coded_width,
   uint32_t coded_height,
-  BytesPtr encoding_details) :
+  std::unique_ptr<Bytes> encoding_details) :
   StreamType(StreamType::Scheme::kVideo),
   encoding_(encoding),
   profile_(profile),
@@ -302,7 +294,7 @@
   return this;
 }
 
-StreamTypePtr VideoStreamType::Clone() const {
+std::unique_ptr<StreamType> VideoStreamType::Clone() const {
   return Create(
       encoding(),
       profile(),
@@ -312,7 +304,7 @@
       height(),
       coded_width(),
       coded_height(),
-      encoding_details().Clone());
+      SafeClone(encoding_details()));
 }
 
 VideoStreamTypeSet::VideoStreamTypeSet(
@@ -330,7 +322,7 @@
   return this;
 }
 
-StreamTypeSetPtr VideoStreamTypeSet::Clone() const {
+std::unique_ptr<StreamTypeSet> VideoStreamTypeSet::Clone() const {
   return Create(
       encoding(),
       width(),
diff --git a/services/media/framework/stream_type.h b/services/media/framework/stream_type.h
index 84d4456..aaa3136 100644
--- a/services/media/framework/stream_type.h
+++ b/services/media/framework/stream_type.h
@@ -6,49 +6,29 @@
 #define SERVICES_MEDIA_FRAMEWORK_STREAM_TYPE_H_
 
 #include <cstring>
+#include <memory>
 #include <string>
 #include <vector>
 
 #include "base/logging.h"
-#include "services/media/framework/ptr.h"
 
 namespace mojo {
 namespace media {
 
 class StreamType;
-class StreamTypes;
 class MultiplexedStreamType;
 class LpcmStreamType;
 class CompressedAudioStreamType;
 class VideoStreamType;
 
-typedef UniquePtr<StreamType> StreamTypePtr;
-typedef UniquePtr<StreamTypes> StreamTypesPtr;
-
-// TODO(dalesat): Get rid of this class.
-class StreamTypes : public std::vector<StreamTypePtr> {
+class Bytes {
  public:
-  static StreamTypesPtr Create(size_t size) {
-    return StreamTypesPtr(new StreamTypes(size));
+  static std::unique_ptr<Bytes> Create(size_t size) {
+    return std::unique_ptr<Bytes>(new Bytes(size));
   }
 
-  explicit StreamTypes(size_t size) : std::vector<StreamTypePtr>(size) {}
-
-  StreamTypesPtr Clone() const;
-};
-
-class Bytes;
-typedef UniquePtr<Bytes> BytesPtr;
-
-// TODO(dalesat): Get rid of this class.
-class Bytes : public std::vector<uint8_t> {
- public:
-  static BytesPtr Create(size_t size) {
-    return BytesPtr(new Bytes(size));
-  }
-
-  static BytesPtr Create(uint8_t* data, size_t size) {
-    BytesPtr result = Create(size);
+  static std::unique_ptr<Bytes> Create(const uint8_t* data, size_t size) {
+    std::unique_ptr<Bytes> result = Create(size);
     if (size != 0) {
       DCHECK(result->data());
       DCHECK(data);
@@ -57,11 +37,20 @@
     return result;
   }
 
-  explicit Bytes(size_t size) : std::vector<uint8_t>(size) {}
+  ~Bytes();
 
-  explicit Bytes(const Bytes& copy_from) : std::vector<uint8_t>(copy_from) {}
+  std::unique_ptr<Bytes> Clone() const;
 
-  BytesPtr Clone() const;
+  uint8_t* data() { return storage_.data(); }
+
+  const uint8_t* data() const { return storage_.data(); }
+
+  size_t size() const { return storage_.size(); }
+
+ private:
+  explicit Bytes(size_t size);
+
+  std::vector<uint8_t> storage_;
 };
 
 // Describes the type of a stream.
@@ -84,8 +73,8 @@
     kVideo
   };
 
-  static StreamTypePtr Create(Scheme scheme) {
-    return StreamTypePtr(new StreamType(scheme));
+  static std::unique_ptr<StreamType> Create(Scheme scheme) {
+    return std::unique_ptr<StreamType>(new StreamType(scheme));
   }
 
   explicit StreamType(Scheme scheme);
@@ -101,7 +90,7 @@
   virtual const CompressedAudioStreamType* compressed_audio() const;
   virtual const VideoStreamType* video() const;
 
-  virtual StreamTypePtr Clone() const;
+  virtual std::unique_ptr<StreamType> Clone() const;
 
  private:
   Scheme scheme_;
@@ -122,32 +111,16 @@
 };
 
 class StreamTypeSet;
-class StreamTypeSets;
 class MultiplexedStreamTypeSet;
 class LpcmStreamTypeSet;
 class CompressedAudioStreamTypeSet;
 class VideoStreamTypeSet;
 
-typedef UniquePtr<StreamTypeSet> StreamTypeSetPtr;
-typedef UniquePtr<StreamTypeSets> StreamTypeSetsPtr;
-
-// TODO(dalesat): Get rid of this class.
-class StreamTypeSets : public std::vector<StreamTypeSetPtr> {
- public:
-  static StreamTypeSetsPtr Create(size_t size) {
-    return StreamTypeSetsPtr(new StreamTypeSets(size));
-  }
-
-  StreamTypeSets(size_t size) : std::vector<StreamTypeSetPtr>(size) {}
-
-  StreamTypeSetsPtr Clone() const;
-};
-
 // Describes a set of possible stream types.
 class StreamTypeSet {
  public:
-  static StreamTypeSetPtr Create(StreamType::Scheme scheme) {
-    return StreamTypeSetPtr(new StreamTypeSet(scheme));
+  static std::unique_ptr<StreamTypeSet> Create(StreamType::Scheme scheme) {
+    return std::unique_ptr<StreamTypeSet>(new StreamTypeSet(scheme));
   }
 
   StreamTypeSet(StreamType::Scheme scheme);
@@ -163,7 +136,7 @@
   virtual const CompressedAudioStreamTypeSet* compressed_audio() const;
   virtual const VideoStreamTypeSet* video() const;
 
-  virtual StreamTypeSetPtr Clone() const;
+  virtual std::unique_ptr<StreamTypeSet> Clone() const;
 
  private:
   StreamType::Scheme scheme_;
@@ -172,71 +145,78 @@
 // Describes the type of a multiplexed stream.
 class MultiplexedStreamType : public StreamType {
  public:
-  static StreamTypePtr Create(
-      StreamTypePtr multiplex_type,
-      StreamTypesPtr substream_types) {
-    return StreamTypePtr(
+  static std::unique_ptr<StreamType> Create(
+      std::unique_ptr<StreamType> multiplex_type,
+      std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>
+          substream_types) {
+    return std::unique_ptr<StreamType>(
         new MultiplexedStreamType(
             std::move(multiplex_type),
             std::move(substream_types)));
   }
 
   MultiplexedStreamType(
-      StreamTypePtr multiplex_type,
-      StreamTypesPtr substream_types);
+      std::unique_ptr<StreamType> multiplex_type,
+      std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>
+          substream_types);
 
   ~MultiplexedStreamType() override;
 
   const MultiplexedStreamType* multiplexed() const override;
 
-  const StreamTypePtr& multiplex_type() const {
+  const std::unique_ptr<StreamType>& multiplex_type() const {
     return multiplex_type_;
   }
 
-  const StreamTypesPtr& substream_types() const {
+  const std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>&
+      substream_types() const {
     return substream_types_;
   }
 
-  StreamTypePtr Clone() const override;
+  std::unique_ptr<StreamType> Clone() const override;
 
 private:
-  StreamTypePtr multiplex_type_;
-  StreamTypesPtr substream_types_;
+  std::unique_ptr<StreamType> multiplex_type_;
+  std::unique_ptr<std::vector<std::unique_ptr<StreamType>>> substream_types_;
 };
 
 // Describes the type of a multiplexed stream.
 class MultiplexedStreamTypeSet : public StreamTypeSet {
 public:
-  static StreamTypeSetPtr Create(
-      StreamTypeSetPtr multiplex_type_set,
-      StreamTypeSetsPtr substream_type_sets) {
-    return StreamTypeSetPtr(
+  static std::unique_ptr<StreamTypeSet> Create(
+      std::unique_ptr<StreamTypeSet> multiplex_type_set,
+      std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>
+          substream_type_sets) {
+    return std::unique_ptr<StreamTypeSet>(
         new MultiplexedStreamTypeSet(
             std::move(multiplex_type_set),
             std::move(substream_type_sets)));
   }
 
   MultiplexedStreamTypeSet(
-      StreamTypeSetPtr multiplex_type_set,
-      StreamTypeSetsPtr substream_type_sets);
+      std::unique_ptr<StreamTypeSet> multiplex_type_set,
+      std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>
+          substream_type_sets);
 
   ~MultiplexedStreamTypeSet() override;
 
   const MultiplexedStreamTypeSet* multiplexed() const override;
 
-  const StreamTypeSetPtr& multiplex_type_set() const {
+  const std::unique_ptr<StreamTypeSet>& multiplex_type_set() const {
     return multiplex_type_set_;
   }
 
-  const StreamTypeSetsPtr& substream_type_sets() const {
+  const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+      substream_type_sets() const {
     return substream_type_sets_;
   }
 
-  StreamTypeSetPtr Clone() const override;
+  std::unique_ptr<StreamTypeSet> Clone() const override;
 
 private:
-  StreamTypeSetPtr multiplex_type_set_;
-  StreamTypeSetsPtr substream_type_sets_;
+  std::unique_ptr<StreamTypeSet> multiplex_type_set_;
+  std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>
+      substream_type_sets_;
 };
 
 // Describes the type of an LPCM stream.
@@ -251,11 +231,11 @@
     kFloat
   };
 
-  static StreamTypePtr Create(
+  static std::unique_ptr<StreamType> Create(
       SampleFormat sample_format,
       uint32_t channels,
       uint32_t frames_per_second) {
-    return StreamTypePtr(new LpcmStreamType(
+    return std::unique_ptr<StreamType>(new LpcmStreamType(
         sample_format,
         channels,
         frames_per_second));
@@ -296,7 +276,7 @@
 
   static uint32_t SampleSizeFromFormat(SampleFormat sample_format);
 
-  StreamTypePtr Clone() const override;
+  std::unique_ptr<StreamType> Clone() const override;
 
  protected:
   LpcmStreamType(
@@ -315,11 +295,11 @@
 // Describes a set of LPCM stream types.
 class LpcmStreamTypeSet : public StreamTypeSet {
  public:
-  static StreamTypeSetPtr Create(
+  static std::unique_ptr<StreamTypeSet> Create(
       LpcmStreamType::SampleFormat sample_format,
       Range<uint32_t> channels,
       Range<uint32_t> frames_per_second) {
-    return StreamTypeSetPtr(new LpcmStreamTypeSet(
+    return std::unique_ptr<StreamTypeSet>(new LpcmStreamTypeSet(
         sample_format,
         channels,
         frames_per_second));
@@ -348,7 +328,7 @@
 
   bool contains(const LpcmStreamType& type) const;
 
-  StreamTypeSetPtr Clone() const override;
+  std::unique_ptr<StreamTypeSet> Clone() const override;
 
  protected:
   LpcmStreamTypeSet(
@@ -372,13 +352,13 @@
     kVorbis
   };
 
-  static StreamTypePtr Create(
+  static std::unique_ptr<StreamType> Create(
       AudioEncoding encoding,
       SampleFormat sample_format,
       uint32_t channels,
       uint32_t frames_per_second,
-      BytesPtr encoding_details) {
-    return StreamTypePtr(new CompressedAudioStreamType(
+      std::unique_ptr<Bytes> encoding_details) {
+    return std::unique_ptr<StreamType>(new CompressedAudioStreamType(
         encoding,
         sample_format,
         channels,
@@ -391,7 +371,7 @@
     SampleFormat sample_format,
     uint32_t channels,
     uint32_t frames_per_second,
-    BytesPtr encoding_details);
+    std::unique_ptr<Bytes> encoding_details);
 
   ~CompressedAudioStreamType() override;
 
@@ -401,26 +381,26 @@
     return encoding_;
   }
 
-  const BytesPtr& encoding_details() const {
+  const std::unique_ptr<Bytes>& encoding_details() const {
     return encoding_details_;
   }
 
-  StreamTypePtr Clone() const override;
+  std::unique_ptr<StreamType> Clone() const override;
 
  private:
   AudioEncoding encoding_;
-  BytesPtr encoding_details_;
+  std::unique_ptr<Bytes> encoding_details_;
 };
 
 // Describes a set of compressed audio stream types.
 class CompressedAudioStreamTypeSet : public LpcmStreamTypeSet {
  public:
-  static StreamTypeSetPtr Create(
+  static std::unique_ptr<StreamTypeSet> Create(
       CompressedAudioStreamType::AudioEncoding encoding,
       CompressedAudioStreamType::SampleFormat sample_format,
       Range<uint32_t> channels,
       Range<uint32_t> frames_per_second) {
-    return StreamTypeSetPtr(new CompressedAudioStreamTypeSet(
+    return std::unique_ptr<StreamTypeSet>(new CompressedAudioStreamTypeSet(
         encoding,
         sample_format,
         channels,
@@ -443,7 +423,7 @@
 
   bool contains(const CompressedAudioStreamType& type) const;
 
-  StreamTypeSetPtr Clone() const override;
+  std::unique_ptr<StreamTypeSet> Clone() const override;
 
  private:
   CompressedAudioStreamType::AudioEncoding encoding_;
@@ -502,7 +482,7 @@
     kSdRec601
   };
 
-  static StreamTypePtr Create(
+  static std::unique_ptr<StreamType> Create(
       VideoEncoding encoding,
       VideoProfile profile,
       PixelFormat pixel_format,
@@ -511,8 +491,8 @@
       uint32_t height,
       uint32_t coded_width,
       uint32_t coded_height,
-      BytesPtr encoding_details) {
-    return StreamTypePtr(new VideoStreamType(
+      std::unique_ptr<Bytes> encoding_details) {
+    return std::unique_ptr<StreamType>(new VideoStreamType(
         encoding,
         profile,
         pixel_format,
@@ -533,7 +513,7 @@
     uint32_t height,
     uint32_t coded_width,
     uint32_t coded_height,
-    BytesPtr encoding_details);
+    std::unique_ptr<Bytes> encoding_details);
 
   ~VideoStreamType() override;
 
@@ -571,11 +551,11 @@
     return coded_height_;
   }
 
-  const BytesPtr& encoding_details() const {
+  const std::unique_ptr<Bytes>& encoding_details() const {
     return encoding_details_;
   }
 
-  StreamTypePtr Clone() const override;
+  std::unique_ptr<StreamType> Clone() const override;
 
  private:
   VideoEncoding encoding_;
@@ -586,17 +566,17 @@
   uint32_t height_;
   uint32_t coded_width_;
   uint32_t coded_height_;
-  BytesPtr encoding_details_;
+  std::unique_ptr<Bytes> encoding_details_;
 };
 
 // Describes a set of video stream types.
 class VideoStreamTypeSet : public StreamTypeSet {
  public:
-  static StreamTypeSetPtr Create(
+  static std::unique_ptr<StreamTypeSet> Create(
       VideoStreamType::VideoEncoding encoding,
       Range<uint32_t> width,
       Range<uint32_t> height) {
-    return StreamTypeSetPtr(new VideoStreamTypeSet(
+    return std::unique_ptr<StreamTypeSet>(new VideoStreamTypeSet(
         encoding,
         width,
         height));
@@ -623,7 +603,7 @@
     return height_;
   }
 
-  StreamTypeSetPtr Clone() const override;
+  std::unique_ptr<StreamTypeSet> Clone() const override;
 
  private:
   VideoStreamType::VideoEncoding encoding_;
diff --git a/services/media/framework_create/decoder.cc b/services/media/framework_create/decoder.cc
index ec37d59..c88dc58 100644
--- a/services/media/framework_create/decoder.cc
+++ b/services/media/framework_create/decoder.cc
@@ -8,8 +8,8 @@
 namespace media {
 
 Result Decoder::Create(
-    const StreamTypePtr& stream_type,
-    DecoderPtr* decoder_out) {
+    const std::unique_ptr<StreamType>& stream_type,
+    std::shared_ptr<Decoder>* decoder_out) {
   return Result::kUnsupportedOperation;
 }
 
diff --git a/services/media/framework_create/demux.cc b/services/media/framework_create/demux.cc
index fb48b69..c763c1b 100644
--- a/services/media/framework_create/demux.cc
+++ b/services/media/framework_create/demux.cc
@@ -7,7 +7,9 @@
 namespace mojo {
 namespace media {
 
-Result Demux::Create(ReaderPtr reader, DemuxPtr* demux_out) {
+Result Demux::Create(
+    std::shared_ptr<Reader> reader,
+    std::shared_ptr<Demux>* demux_out) {
   return Result::kUnsupportedOperation;
 }