Motown in-proc streaming framework used to implement media services.
Includes the engine that hosts the various 'parts' that make up in-proc
streaming graphs, as well as the parts that aren't dependendent on
ffmpeg or mojo. framework_create is where ffmpeg and other technologies
will be integrated. For now, the 'Create' functions simply fail.

R=johngro@google.com

Review URL: https://codereview.chromium.org/1577953002 .
diff --git a/services/media/BUILD.gn b/services/media/BUILD.gn
index 042c359..fa73662 100644
--- a/services/media/BUILD.gn
+++ b/services/media/BUILD.gn
@@ -6,5 +6,7 @@
   deps = [
     "//services/media/audio",
     "//services/media/common",
+    "//services/media/framework",
+    "//services/media/framework_create",
   ]
 }
diff --git a/services/media/framework/BUILD.gn b/services/media/framework/BUILD.gn
new file mode 100644
index 0000000..29a97c9
--- /dev/null
+++ b/services/media/framework/BUILD.gn
@@ -0,0 +1,72 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//build/module_args/mojo.gni")
+import("$mojo_sdk_root/mojo/public/mojo_sdk.gni")
+
+source_set("framework") {
+  sources = [
+    "allocator.cc",
+    "allocator.h",
+    "conversion_pipeline_builder.cc",
+    "conversion_pipeline_builder.h",
+    "engine.cc",
+    "engine.h",
+    "formatting.cc",
+    "formatting.h",
+    "lpcm_util.cc",
+    "lpcm_util.h",
+    "metadata.cc",
+    "metadata.h",
+    "models/active_sink.h",
+    "models/active_source.h",
+    "models/demand.h",
+    "models/lpcm_frame_buffer.cc",
+    "models/lpcm_frame_buffer.h",
+    "models/lpcm_transform.h",
+    "models/multistream_packet_source.h",
+    "models/packet_transform.h",
+    "packet.cc",
+    "packet.h",
+    "parts/decoder.h",
+    "parts/demux.h",
+    "parts/file_reader.cc",
+    "parts/file_reader.h",
+    "parts/lpcm_reformatter.cc",
+    "parts/lpcm_reformatter.h",
+    "parts/null_sink.cc",
+    "parts/null_sink.h",
+    "parts/reader.cc",
+    "parts/reader.h",
+    "ptr.h",
+    "result.h",
+    "stages/active_sink_stage.cc",
+    "stages/active_sink_stage.h",
+    "stages/active_source_stage.cc",
+    "stages/active_source_stage.h",
+    "stages/distributor_stage.cc",
+    "stages/distributor_stage.h",
+    "stages/lpcm_stage_input.cc",
+    "stages/lpcm_stage_input.h",
+    "stages/lpcm_stage_output.cc",
+    "stages/lpcm_stage_output.h",
+    "stages/lpcm_transform_stage.cc",
+    "stages/lpcm_transform_stage.h",
+    "stages/packet_transform_stage.cc",
+    "stages/packet_transform_stage.h",
+    "stages/stage.cc",
+    "stages/stage.h",
+    "stages/stage_input.cc",
+    "stages/stage_input.h",
+    "stages/stage_output.cc",
+    "stages/stage_output.h",
+    "stream_type.cc",
+    "stream_type.h",
+  ]
+
+  deps = [
+    "//base",
+    "//url",
+  ]
+}
diff --git a/services/media/framework/allocator.cc b/services/media/framework/allocator.cc
new file mode 100644
index 0000000..e066599
--- /dev/null
+++ b/services/media/framework/allocator.cc
@@ -0,0 +1,46 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdlib>
+
+#include "base/logging.h"
+#include "services/media/framework/allocator.h"
+
+namespace mojo {
+namespace media {
+
+namespace {
+
+class DefaultAllocator : public Allocator {
+ public:
+  constexpr DefaultAllocator() {}
+
+  // Allocator implementation.
+  void* AllocatePayloadBuffer(uint64_t size) override;
+
+  void ReleasePayloadBuffer(uint64_t size, void* buffer) override;
+};
+
+void* DefaultAllocator::AllocatePayloadBuffer(uint64_t size) {
+  DCHECK(size > 0);
+  return std::malloc(static_cast<size_t>(size));
+}
+
+void DefaultAllocator::ReleasePayloadBuffer(uint64_t size, void* buffer) {
+  DCHECK(size > 0);
+  DCHECK(buffer);
+  std::free(buffer);
+}
+
+static constexpr DefaultAllocator default_allocator;
+
+} // namespace
+
+// static
+Allocator* Allocator::GetDefault() {
+  return const_cast<DefaultAllocator*>(&default_allocator);
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/allocator.h b/services/media/framework/allocator.h
new file mode 100644
index 0000000..2bd564e
--- /dev/null
+++ b/services/media/framework/allocator.h
@@ -0,0 +1,31 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ALLOCATOR_H_
+#define SERVICES_MEDIA_FRAMEWORK_ALLOCATOR_H_
+
+#include <cstdint>
+
+namespace mojo {
+namespace media {
+
+// Abstract base class for objects that allocate buffers for packets.
+class Allocator {
+ public:
+  // Gets the default allocator, which allocates vanilla memory from the heap.
+  static Allocator* GetDefault();
+
+  // Allocates and returns a buffer of the indicated size or returns nullptr
+  // if the allocation fails.
+  // TODO(dalesat): Use size_t for sizes in units of bytes framework-wide.
+  virtual void* AllocatePayloadBuffer(uint64_t size) = 0;
+
+  // Releases a buffer previously allocated via AllocatePayloadBuffer.
+  virtual void ReleasePayloadBuffer(uint64_t size, void* buffer) = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_ALLOCATOR_H_
diff --git a/services/media/framework/conversion_pipeline_builder.cc b/services/media/framework/conversion_pipeline_builder.cc
new file mode 100644
index 0000000..c1af544
--- /dev/null
+++ b/services/media/framework/conversion_pipeline_builder.cc
@@ -0,0 +1,341 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/conversion_pipeline_builder.h"
+#include "services/media/framework/formatting.h"
+#include "services/media/framework/parts/decoder.h"
+#include "services/media/framework/parts/lpcm_reformatter.h"
+
+namespace mojo {
+namespace media {
+
+namespace {
+
+enum class AddResult {
+  kFailed, // Can't convert.
+  kProgressed, // Added a conversion transform.
+  kFinished // Done adding conversion transforms.
+};
+
+// Produces a score for in_type with respect to out_type_set. The score
+// is used to compare type sets to see which represents the best goal for
+// conversion. Higher scores are preferred. A score of zero indicates that
+// in_type is incompatible with out_type_set.
+int Score(
+    const LpcmStreamType& in_type,
+    const LpcmStreamTypeSet& out_type_set) {
+  // TODO(dalesat): Plenty of room for more subtlety here. Maybe actually
+  // measure conversion costs (cpu, quality, etc) and reflect them here.
+
+  int score = 1; // We can convert anything, so 1 is the minimum score.
+
+  if (in_type.sample_format() == out_type_set.sample_format() ||
+      out_type_set.sample_format() == LpcmStreamType::SampleFormat::kAny) {
+    // Prefer not to convert sample format.
+    score += 10;
+  } else {
+    // Prefer higher-quality formats.
+    switch (out_type_set.sample_format()) {
+      case LpcmStreamType::SampleFormat::kUnsigned8:
+        break;
+      case LpcmStreamType::SampleFormat::kSigned16:
+        score += 1;
+        break;
+      case LpcmStreamType::SampleFormat::kSigned24In32:
+        score += 2;
+        break;
+      case LpcmStreamType::SampleFormat::kFloat:
+        score += 3;
+        break;
+      default:
+        NOTREACHED() << "unsupported sample format "
+            << out_type_set.sample_format();
+    }
+  }
+
+  if (out_type_set.channels().contains(in_type.channels())) {
+    // Prefer not to mixdown/up.
+    score += 10;
+  } else {
+    return 0; // TODO(dalesat): Remove when we have mixdown/up.
+  }
+
+  if (out_type_set.frames_per_second().
+      contains(in_type.frames_per_second())) {
+    // Very much prefer not to resample.
+    score += 50;
+  } else {
+    return 0; // TODO(dalesat): Remove when we have resamplers.
+  }
+
+  return score;
+}
+
+// Finds the media type set that best matches in_type.
+const StreamTypeSetPtr* FindBestLpcm(
+    const LpcmStreamType& in_type,
+    const StreamTypeSetsPtr& out_type_sets) {
+  const StreamTypeSetPtr* best = nullptr;
+  int best_score = 0;
+  for (const StreamTypeSetPtr& out_type_set : *out_type_sets) {
+    switch (out_type_set->scheme()) {
+      case StreamType::Scheme::kAnyElementary:
+      case StreamType::Scheme::kAnyAudio:
+      case StreamType::Scheme::kAny:
+        // Wildcard scheme allows any type without conversion.
+        return &out_type_set;
+      case StreamType::Scheme::kLpcm: {
+        int score = Score(in_type, *out_type_set->lpcm());
+        if (best_score < score) {
+          best_score = score;
+          best = &out_type_set;
+        }
+        break;
+      }
+      default:
+        break;
+    }
+  }
+  return best;
+}
+
+// Attempts to add transforms to the pipeline given an input compressed audio
+// stream type with (in_type) and the set of output types we need to convert to
+// (out_type_sets). If the call succeeds, *out_type is set to the new output
+// type. Otherwise, *out_type is set to nullptr.
+AddResult AddTransformsForCompressedAudio(
+    const CompressedAudioStreamType& in_type,
+    const StreamTypePtr& in_type_ptr,
+    const StreamTypeSetsPtr& out_type_sets,
+    Engine* engine,
+    Engine::Output* output,
+    StreamTypePtr* out_type) {
+  DCHECK(out_type);
+  DCHECK(engine);
+
+  // See if we have a matching COMPRESSED_AUDIO type.
+  for (const StreamTypeSetPtr& out_type_set : *out_type_sets) {
+    switch (out_type_set->scheme()) {
+      case StreamType::Scheme::kAnyElementary:
+      case StreamType::Scheme::kAnyAudio:
+      case StreamType::Scheme::kAny:
+        // Wildcard scheme allows any type without conversion.
+        *out_type = in_type.Clone();
+        return AddResult::kFinished;
+      case StreamType::Scheme::kCompressedAudio: {
+        if (out_type_set->compressed_audio()->contains(in_type)) {
+          // No transform needed.
+          *out_type = in_type.Clone();
+          return AddResult::kFinished;
+        }
+        break;
+      }
+      default:
+        break;
+    }
+    // TODO(dalesat): Support a different compressed output type by transcoding.
+  }
+
+  // Find the best LPCM output type.
+  const StreamTypeSetPtr* best = FindBestLpcm(in_type, out_type_sets);
+  if (best == nullptr) {
+    // No candidates found.
+    *out_type = nullptr;
+    return AddResult::kFailed;
+  }
+
+  DCHECK_EQ((*best)->scheme(), StreamType::Scheme::kLpcm);
+
+  // Need to decode. Create a decoder and go from there.
+  DecoderPtr decoder;
+  Result result = Decoder::Create(in_type_ptr, &decoder);
+  if (result !=  Result::kOk) {
+    // No decoder found.
+    *out_type = nullptr;
+    return AddResult::kFailed;
+  }
+
+  *output = engine->ConnectOutputToPart(*output, engine->Add(decoder)).output();
+  *out_type = decoder->output_stream_type();
+
+  return AddResult::kProgressed;
+}
+
+// Attempts to add transforms to the pipeline given an input LPCM stream type
+// (in_type) and the output lpcm stream type set for the type we need to convert
+// to (out_type_set). If the call succeeds, *out_type is set to the new output
+// type. Otherwise, *out_type is set to nullptr.
+AddResult AddTransformsForLpcm(
+    const LpcmStreamType& in_type,
+    const LpcmStreamTypeSet& out_type_set,
+    Engine* engine,
+    Engine::Output* output,
+    StreamTypePtr* out_type) {
+  DCHECK(engine);
+  DCHECK(out_type);
+
+  // TODO(dalesat): Room for more intelligence here wrt transform ordering and
+  // transforms that handle more than one conversion.
+  if (in_type.sample_format() != out_type_set.sample_format() &&
+      out_type_set.sample_format() != LpcmStreamType::SampleFormat::kAny) {
+    // The reformatter will fix interleave conversion.
+    *output = engine->ConnectOutputToPart(
+        *output,
+        engine->Add(LpcmReformatter::Create(in_type, out_type_set))).output();
+  }
+
+  if (!out_type_set.channels().contains(in_type.channels())) {
+    // TODO(dalesat): Insert mixdown/up transform.
+    NOTREACHED() << "conversion requires mixdown/up - not supported";
+    *out_type = nullptr;
+    return AddResult::kFailed;
+  }
+
+  if (!out_type_set.frames_per_second().contains(in_type.frames_per_second())) {
+    // TODO(dalesat): Insert resampler.
+    NOTREACHED() << "conversion requires resampling - not supported";
+    *out_type = nullptr;
+    return AddResult::kFailed;
+  }
+
+  // Build the resulting media type.
+  *out_type = LpcmStreamType::Create(
+      out_type_set.sample_format() == LpcmStreamType::SampleFormat::kAny ?
+          in_type.sample_format() :
+          out_type_set.sample_format(),
+      in_type.channels(),
+      in_type.frames_per_second());
+
+  return AddResult::kFinished;
+}
+
+// Attempts to add transforms to the pipeline given an input media type with
+// scheme LPCM (in_type) and the set of output types we need to convert to
+// (out_type_sets). If the call succeeds, *out_type is set to the new output
+// type. Otherwise, *out_type is set to nullptr.
+AddResult AddTransformsForLpcm(
+    const LpcmStreamType& in_type,
+    const StreamTypeSetsPtr& out_type_sets,
+    Engine* engine,
+    Engine::Output* output,
+    StreamTypePtr* out_type) {
+  DCHECK(engine);
+  DCHECK(out_type);
+
+  const StreamTypeSetPtr* best = FindBestLpcm(in_type, out_type_sets);
+  if (best == nullptr) {
+    // TODO(dalesat): Support a compressed output type by encoding.
+    NOTREACHED() << "conversion using encoder not supported";
+    *out_type = nullptr;
+    return AddResult::kFailed;
+  }
+
+  switch ((*best)->scheme()) {
+    case StreamType::Scheme::kAnyElementary:
+    case StreamType::Scheme::kAnyAudio:
+    case StreamType::Scheme::kAny:
+      // Wildcard scheme allows any type without conversion.
+      *out_type = in_type.Clone();
+      return AddResult::kFinished;
+    case StreamType::Scheme::kLpcm:
+      return AddTransformsForLpcm(
+          in_type,
+          *(*best)->lpcm(),
+          engine,
+          output,
+          out_type);
+    default:
+      NOTREACHED() << "FindBestLpcm produced unexpected type set scheme"
+          << (*best)->scheme();
+      return AddResult::kFailed;
+  }
+}
+
+// Attempts to add transforms to the pipeline given an input media type of any
+// scheme (in_type) and the set of output types we need to convert to
+// (out_type_sets). If the call succeeds, *out_type is set to the new output
+// type. Otherwise, *out_type is set to nullptr.
+AddResult AddTransforms(
+    const StreamTypePtr& in_type,
+    const StreamTypeSetsPtr& out_type_sets,
+    Engine* engine,
+    Engine::Output* output,
+    StreamTypePtr* out_type) {
+  DCHECK(in_type);
+  DCHECK(engine);
+  DCHECK(out_type);
+
+  switch (in_type->scheme()) {
+    case StreamType::Scheme::kLpcm:
+      return AddTransformsForLpcm(
+          *in_type->lpcm(),
+          out_type_sets,
+          engine,
+          output,
+          out_type);
+    case StreamType::Scheme::kCompressedAudio:
+      return AddTransformsForCompressedAudio(
+          *in_type->compressed_audio(),
+          in_type,
+          out_type_sets,
+          engine,
+          output,
+          out_type);
+    default:
+      NOTREACHED() << "conversion not supported for scheme"
+          << in_type->scheme();
+      *out_type = nullptr;
+      return AddResult::kFailed;
+  }
+}
+
+}  // namespace
+
+bool BuildConversionPipeline(
+    const StreamTypePtr& in_type,
+    const StreamTypeSetsPtr& out_type_sets,
+    Engine* engine,
+    Engine::Output* output,
+    StreamTypePtr* out_type) {
+  DCHECK(in_type);
+  DCHECK(out_type_sets);
+  DCHECK(engine);
+  DCHECK(output);
+  DCHECK(out_type);
+
+  Engine::Output out = *output;
+
+  const StreamTypePtr* type_to_convert = &in_type;
+  StreamTypePtr next_in_type;
+  while (true) {
+    StreamTypePtr converted_type;
+    switch (AddTransforms(
+        *type_to_convert,
+        out_type_sets,
+        engine,
+        &out,
+        &converted_type)) {
+      case AddResult::kFailed:
+        // Failed to find a suitable conversion. Return the pipeline to its
+        // original state.
+        engine->RemovePartsConnectedToOutput(*output);
+        *out_type = nullptr;
+        return false;
+      case AddResult::kProgressed:
+        // Made progress. Continue.
+        break;
+      case AddResult::kFinished:
+        // No further conversion required.
+        *output = out;
+        *out_type = std::move(converted_type);
+        return true;
+    }
+
+    next_in_type = std::move(converted_type);
+    type_to_convert = &next_in_type;
+  }
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/conversion_pipeline_builder.h b/services/media/framework/conversion_pipeline_builder.h
new file mode 100644
index 0000000..988eb5f
--- /dev/null
+++ b/services/media/framework/conversion_pipeline_builder.h
@@ -0,0 +1,29 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_CONVERSION_PIPELINE_BUILDER_H_
+#define SERVICES_MEDIA_FRAMEWORK_CONVERSION_PIPELINE_BUILDER_H_
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/packet.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+// Attempts to add transforms to the given pipeline to convert in_type to a
+// type compatible with out_type_sets. If it succeeds, returns true, updates
+// *output and delivers the resulting output type via *out_type. If it fails,
+// returns false, sets *out_type to nullptr and leaves *output unchanged.
+bool BuildConversionPipeline(
+    const StreamTypePtr& in_type,
+    const StreamTypeSetsPtr& out_type_sets,
+    Engine* engine,
+    Engine::Output* output,
+    StreamTypePtr* out_type);
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_CONVERSION_PIPELINE_BUILDER_H_
diff --git a/services/media/framework/engine.cc b/services/media/framework/engine.cc
new file mode 100644
index 0000000..980440c
--- /dev/null
+++ b/services/media/framework/engine.cc
@@ -0,0 +1,434 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+
+namespace mojo {
+namespace media {
+
+uint32_t Engine::Part::input_count() {
+  DCHECK(stage_ != nullptr);
+  return stage_->input_count();
+}
+
+Engine::Input Engine::Part::input(uint32_t index) {
+  DCHECK(stage_ != nullptr && index < stage_->input_count());
+  return Input(stage_, index);
+}
+
+Engine::Input Engine::Part::input() {
+  DCHECK(stage_ != nullptr && stage_->input_count() == 1);
+  return Input(stage_, 0);
+}
+
+uint32_t Engine::Part::output_count() {
+  DCHECK(stage_ != nullptr);
+  return stage_->output_count();
+}
+
+Engine::Output Engine::Part::output(uint32_t index) {
+  DCHECK(stage_ != nullptr && index < stage_->output_count());
+  return Output(stage_, index);
+}
+
+Engine::Output Engine::Part::output() {
+  DCHECK(stage_ != nullptr && stage_->output_count() == 1);
+  return Output(stage_, 0);
+}
+
+Engine::Part Engine::Part::upstream_part(uint32_t index) {
+  DCHECK(stage_ != nullptr && index < stage_->input_count());
+  return Part(stage_->input(index).upstream_stage());
+}
+
+Engine::Part Engine::Part::upstream_part() {
+  DCHECK(stage_ != nullptr && stage_->input_count() == 1);
+  return Part(stage_->input(0).upstream_stage());
+}
+
+Engine::Part Engine::Part::downstream_part(uint32_t index) {
+  DCHECK(stage_ != nullptr && index < stage_->output_count());
+  return Part(stage_->output(index).downstream_stage());
+}
+
+Engine::Part Engine::Part::downstream_part() {
+  DCHECK(stage_ != nullptr && stage_->output_count() == 1);
+  return Part(stage_->output(0).downstream_stage());
+}
+
+Engine::Engine() {
+  update_function_ = [this](Stage* stage) {
+    DCHECK(stage);
+    base::AutoLock lock(lock_);
+    UpdateUnsafe(stage);
+    UpdateUnsafe();
+  };
+}
+
+Engine::~Engine() {
+  Reset();
+}
+
+void Engine::RemovePart(Part part) {
+  DCHECK(part);
+  base::AutoLock lock(lock_);
+  RemoveUnsafe(part.stage_);
+}
+
+Engine::Part Engine::Connect(Output output, Input input) {
+  DCHECK(output);
+  DCHECK(input);
+
+  base::AutoLock lock(lock_);
+
+  if (output.connected()) {
+    DisconnectOutputUnsafe(output.stage_, output.index_);
+  }
+  if (input.connected()) {
+    DisconnectInputUnsafe(input.stage_, input.index_);
+  }
+
+  output.stage_output().connect(input.stage_, input.index_);
+  input.stage_input().connect(output.stage_, output.index_);
+
+  return input.part();
+}
+
+Engine::Part Engine::ConnectParts(Part upstream_part, Part downstream_part) {
+  DCHECK(upstream_part);
+  DCHECK(downstream_part);
+  Connect(upstream_part.output(), downstream_part.input());
+  return downstream_part;
+}
+
+Engine::Part Engine::ConnectOutputToPart(
+    Output output,
+    Part downstream_part) {
+  DCHECK(output);
+  DCHECK(downstream_part);
+  Connect(output, downstream_part.input());
+  return downstream_part;
+}
+
+Engine::Part Engine::ConnectPartToInput(Part upstream_part, Input input) {
+  DCHECK(upstream_part);
+  DCHECK(input);
+  Connect(upstream_part.output(), input);
+  return input.part();
+}
+
+void Engine::DisconnectOutput(Output output) {
+  DCHECK(output);
+
+  base::AutoLock lock(lock_);
+  DisconnectOutputUnsafe(output.stage_, output.index_);
+}
+
+void Engine::DisconnectInput(Input input) {
+  DCHECK(input);
+
+  base::AutoLock lock(lock_);
+  DisconnectInputUnsafe(input.stage_, input.index_);
+}
+
+void Engine::RemovePartsConnectedToPart(Part part) {
+  DCHECK(part);
+
+  base::AutoLock lock(lock_);
+
+  std::deque<Part> to_remove { part };
+
+  while (!to_remove.empty()) {
+    Part part = to_remove.front();
+    to_remove.pop_front();
+
+    for (uint32_t i = 0; i < part.input_count(); ++i) {
+      to_remove.push_back(part.upstream_part(i));
+    }
+
+    for (uint32_t i = 0; i < part.output_count(); ++i) {
+      to_remove.push_back(part.downstream_part(i));
+    }
+
+    RemoveUnsafe(part.stage_);
+  }
+}
+
+void Engine::RemovePartsConnectedToOutput(Output output) {
+  DCHECK(output);
+
+  if (!output.connected()) {
+    return;
+  }
+
+  Part downstream_part = output.downstream_part();
+  DisconnectOutput(output);
+  RemovePartsConnectedToPart(downstream_part);
+}
+
+void Engine::RemovePartsConnectedToInput(Input input) {
+  DCHECK(input);
+
+  if (!input.connected()) {
+    return;
+  }
+
+  Part upstream_part = input.upstream_part();
+  DisconnectInput(input);
+  RemovePartsConnectedToPart(upstream_part);
+}
+
+void Engine::Prepare() {
+  base::AutoLock lock(lock_);
+  for (Stage* sink : sinks_) {
+    sink->Prepare(update_function_);
+    sink->prepared_ = true;
+    uint32_t input_count = sink->input_count();
+    for (uint32_t input_index = 0; input_index < input_count; input_index++) {
+      MaybePrepareUnsafe(sink->input(input_index).upstream_stage());
+    }
+  }
+}
+
+void Engine::Prepare(Part part) {
+  DCHECK(part);
+  base::AutoLock lock(lock_);
+  MaybePrepareUnsafe(part.stage_);
+}
+
+void Engine::PrimeSinks() {
+  lock_.Acquire();
+  std::list<Stage*> sinks(sinks_);
+  lock_.Release();
+
+  // TODO(dalesat): Threading issue: these sinks may go away during priming.
+  for (Stage* sink : sinks) {
+    sink->Prime();
+  }
+}
+
+void Engine::Reset() {
+  base::AutoLock lock(lock_);
+  while (!supply_backlog_.empty()) {
+    supply_backlog_.pop();
+  }
+  while (!demand_backlog_.empty()) {
+    demand_backlog_.pop();
+  }
+  sources_.clear();
+  sinks_.clear();
+  while (!stages_.empty()) {
+    Stage* stage = stages_.front();
+    stages_.pop_front();
+    delete stage;
+  }
+}
+
+void Engine::PushToSupplyBacklogUnsafe(Stage* stage) {
+  lock_.AssertAcquired();
+
+  DCHECK(stage);
+  packets_produced_ = true;
+  if (!stage->in_supply_backlog_) {
+    supply_backlog_.push(stage);
+    stage->in_supply_backlog_ = true;
+  }
+}
+
+void Engine::PushToDemandBacklogUnsafe(Stage* stage) {
+  lock_.AssertAcquired();
+
+  DCHECK(stage);
+  if (!stage->in_demand_backlog_) {
+    demand_backlog_.push(stage);
+    stage->in_demand_backlog_ = true;
+  }
+}
+
+Engine::Part Engine::Add(Stage* stage) {
+  base::AutoLock lock(lock_);
+
+  stages_.push_back(stage);
+  if (stage->input_count() == 0) {
+    sources_.push_back(stage);
+  }
+  if (stage->output_count() == 0) {
+    sinks_.push_back(stage);
+  }
+  return Part(stage);
+}
+
+void Engine::DisconnectOutputUnsafe(Stage* stage, uint32_t index) {
+  DCHECK(stage);
+  DCHECK(index < stage->output_count());
+
+  lock_.AssertAcquired();
+
+  StageOutput& stage_output = stage->output(index);
+
+  if (stage_output.downstream_stage() == nullptr) {
+    return;
+  }
+
+  stage_output.mate().disconnect();
+  stage_output.disconnect();
+}
+
+void Engine::DisconnectInputUnsafe(Stage* stage, uint32_t index) {
+  DCHECK(stage);
+  DCHECK(index < stage->input_count());
+
+  lock_.AssertAcquired();
+
+  StageInput& stage_input = stage->input(index);
+
+  if (stage_input.upstream_stage() == nullptr) {
+    return;
+  }
+
+  stage_input.mate().disconnect();
+  stage_input.disconnect();
+}
+
+void Engine::RemoveUnsafe(Stage* stage) {
+  DCHECK(stage);
+
+  lock_.AssertAcquired();
+
+  uint32_t input_count = stage->input_count();
+  for (uint32_t input_index = 0; input_index < input_count; input_index++) {
+    if (stage->input(input_index).connected()) {
+      DisconnectInputUnsafe(stage, input_index);
+    }
+  }
+
+  uint32_t output_count = stage->output_count();
+  for (uint32_t output_index = 0; output_index < output_count; output_index++) {
+    if (stage->output(output_index).connected()) {
+      DisconnectOutputUnsafe(stage, output_index);
+    }
+  }
+
+  sources_.remove(stage);
+  sinks_.remove(stage);
+  stages_.remove(stage);
+  delete stage;
+}
+
+// static
+Stage* Engine::CreateStage(MultiStreamPacketSourcePtr source) {
+  return new DistributorStage(source);
+}
+
+// static
+Stage* Engine::CreateStage(PacketTransformPtr transform) {
+  return new PacketTransformStage(transform);
+}
+
+// static
+Stage* Engine::CreateStage(ActiveSourcePtr source) {
+  return new ActiveSourceStage(source);
+}
+
+// static
+Stage* Engine::CreateStage(ActiveSinkPtr sink) {
+  return new ActiveSinkStage(sink);
+}
+
+// static
+Stage* Engine::CreateStage(LpcmTransformPtr transform) {
+  return new LpcmTransformStage(transform);
+}
+
+void Engine::MaybePrepareUnsafe(Stage* stage) {
+  lock_.AssertAcquired();
+
+  if (stage == nullptr || stage->prepared_) {
+    return;
+  }
+
+  // Make sure all downstream stages have been prepared.
+  uint32_t output_count = stage->output_count();
+  for (uint32_t output_index = 0; output_index < output_count; output_index++) {
+    StageOutput& output = stage->output(output_index);
+    if (output.connected() && !output.downstream_stage()->prepared()) {
+      return;
+    }
+  }
+
+  stage->Prepare(update_function_);
+  stage->prepared_ = true;
+
+  // Prepare all upstream stages.
+  uint32_t input_count = stage->input_count();
+  for (uint32_t input_index = 0; input_index < input_count; input_index++) {
+    MaybePrepareUnsafe(stage->input(input_index).upstream_stage());
+  }
+}
+
+void Engine::UpdateUnsafe() {
+  lock_.AssertAcquired();
+
+  while (true) {
+    Stage* stage = PopFromSupplyBacklogUnsafe();
+    if (stage != nullptr) {
+      UpdateUnsafe(stage);
+      continue;
+    }
+
+    stage = PopFromDemandBacklogUnsafe();
+    if (stage != nullptr) {
+      UpdateUnsafe(stage);
+      continue;
+    }
+
+    break;
+  }
+}
+
+void Engine::UpdateUnsafe(Stage *stage) {
+  lock_.AssertAcquired();
+
+  DCHECK(stage);
+
+  packets_produced_ = false;
+
+  stage->Update(this);
+
+  // If the stage produced packets, it may need to reevaluate demand later.
+  if (packets_produced_) {
+    PushToDemandBacklogUnsafe(stage);
+  }
+}
+
+Stage* Engine::PopFromSupplyBacklogUnsafe() {
+  lock_.AssertAcquired();
+
+  if (supply_backlog_.empty()) {
+    return nullptr;
+  }
+
+  Stage* stage = supply_backlog_.front();
+  supply_backlog_.pop();
+  DCHECK(stage->in_supply_backlog_);
+  stage->in_supply_backlog_ = false;
+  return stage;
+}
+
+Stage* Engine::PopFromDemandBacklogUnsafe() {
+  lock_.AssertAcquired();
+
+  if (demand_backlog_.empty()) {
+    return nullptr;
+  }
+
+  Stage* stage = demand_backlog_.top();
+  demand_backlog_.pop();
+  DCHECK(stage->in_demand_backlog_);
+  stage->in_demand_backlog_ = false;
+  return stage;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/engine.h b/services/media/framework/engine.h
new file mode 100644
index 0000000..2ceb0cf
--- /dev/null
+++ b/services/media/framework/engine.h
@@ -0,0 +1,432 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_H_
+
+#include <list>
+#include <queue>
+#include <stack>
+
+#include "base/synchronization/lock.h"
+#include "services/media/framework/stages/active_sink_stage.h"
+#include "services/media/framework/stages/active_source_stage.h"
+#include "services/media/framework/stages/distributor_stage.h"
+#include "services/media/framework/stages/lpcm_transform_stage.h"
+#include "services/media/framework/stages/packet_transform_stage.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+//
+// USAGE
+//
+// TODO(dalesat): Consider adding a suffix to Engine::Part/Input/Output to
+// indicate that they're references.
+// TODO(dalesat): Consider folding PrimeSinks into Prepare.
+//
+// Engine is a container for sources, sinks and transforms ('parts') connected
+// in a graph. Engine::Part, Engine::Input and Engine::Output are all opaque
+// references to parts and their inputs and outputs. Engine provides a variety
+// of methods for adding and removing parts and for connecting inputs and
+// outputs to form a graph.
+//
+// In addition to containing parts and representing their interconnection,
+// Engine manages the coordinated operation of its constituent parts and
+// transports media from part to part. The Prepare method prepares the graph
+// for operation, and the PrimeSinks method tells the sinks in the graph to
+// prime themselves. Any additional actions required to make the graph operate
+// (such as manipulating a rate control interface) is out of scope.
+//
+// Parts added to the engine are referenced using shared pointers. The engine
+// holds pointers to the parts it contains, and the application, in many cases,
+// also holds pointers to the parts so it can call methods that are outside the
+// engine's scope. When a part is added the Engine returns an Engine::Part
+// object, which can be used to reference the part when the graph is modified.
+// Engine::Part objects can be interrogated to retrieve inputs (as Engine::Input
+// objects) and outputs (as Engine::Output objects).
+//
+// Some support is provided for modifying graphs that are operating. This
+// capability isn't fully developed at the moment. Prepare(Part) is an example
+// of a method provided for this purpose.
+//
+// Parts come in various flavors, defined by 'model' abstract classes. The
+// current list of supported models is:
+//
+//  ActiveSink              - a sink that consumes packets asynchronously
+//  ActiveSource            - a source that produces packets asynchronously
+//  LpcmMixer               - a transform that mixes LPCM frames from multiple
+//                            inputs and produces a single stream of LPCM frames
+//                            via one output
+//  LpcmSource              - a source that produces LPCM frames synchronously
+//  LpcmTransform           - a synchronous transform with one LPCM input and
+//                            one LPCM output
+//  MultiStreamPacketSource - a source that produces multiple streams of packets
+//                            synchronously
+//  PacketTransform         - a synchronous transform that consumes and produces
+//                            packets via one input and one output
+//
+// Other models will be defined in the future as needed.
+//
+
+//
+// DESIGN
+//
+// The Engine is implemented as a system of cooperating objects. Of those
+// objects, only the engine itself is of relevance to code that uses Engine and
+// to part implementations. The other objects are:
+//
+// Stage
+// A stage hosts a single part. There are many subclasses of Stage, one for
+// each supported part model. The stage's job is to implement the contract
+// represented by the model so the parts that conform to the model can
+// participate in the operation of the engine. Stages are uniform with respect
+// to how they interact with engine. Engine::Part references a stage.
+//
+// StageInput
+// A stage possesses zero or more StageInput instances. StageInput objects
+// implement the supply of media into the stage and demand for media signalled
+// upstream. StageInputs receive media from StageOutputs in the form of packets
+// (type Packet). LpcmStageInput is a subclass of StageInput that interoperates
+// with LpcmStageInputs in a way that provides optimizations relavant to LPCM
+// audio media. Engine::Input references a StageInput.
+//
+// StageOutput
+// A stage possesses zero or more StageOutput instances. StageOutput objects
+// implement the supply of media output of the stage to a downstream input and
+// demand for media signalled from that input. LpcmStageOutput implements
+// optimized LPCM flow. Engine::Output references a StageOutput.
+//
+// Engine uses a 'work list' algorithm to operate the contained graph. The
+// engine has a backlog of stages that need to be updated. To advance the
+// operation of the graph, the engine removes a stage from the backlog and calls
+// the stage's Update method. The Stage::Update may cause stages to be added
+// synchronously to the the backlog. This procedure continues until the backlog
+// is empty.
+//
+// Stage::Update is the stage's opportunity to react to the supply of new media
+// via its inputs and the signalling of new demand via its outputs. During
+// Update, the stage does whatever work it can with the current supply and
+// demand, possibly supplying media downstream through its outputs and/or
+// signalling new demand via its inputs. When a stage supplies media through
+// an output, the downstream stage is added to the backlog. When a stage updates
+// its demand through an input, the upstream stage is added to the backlog.
+//
+// The process starts when a stage invokes an update callback supplied by the
+// engine. Stages that implement synchronous models never do this. Other stages
+// do this as directed by the parts they host in accordance with their
+// respective models. When a stage is ready to supply media or update demand
+// due to external events, it calls the update callback. The engine responds by
+// adding the stage to the backlog and then burning down the backlog. The stage
+// that called back is updated first, and then all the work that can be done
+// synchronously as a result of the external event is completed. In this way,
+// the operation of the graph is driven by external events signalled through
+// update callbacks.
+//
+// Currently, Engine uses an opportunistic threading model that only allows
+// one thread to drive the backlog processing at any given time. The engine
+// runs the processing on whatever thread enters it via an update callback.
+// An engine employs a single lock that protects manipulation of the graph and
+// processing of the backlog. Stage update methods are invoked with that lock
+// taken. This arrangement implies the following constraints:
+//
+// 1) An update callback cannot be called synchronously with a Stage::Update
+//    call, because the lock is taken for the duration of Update, and the
+//    callback will take the lock. Update callbacks may occur during Engine::
+//    PrimeSinks, and they generally will.
+// 2) A stage cannot update supply/demand on its inputs/outputs except during
+//    Update. When an external event occurs, the stage and/or its hosted part
+//    should update its internal state as required and invoke the callback.
+//    During the subsequent Update, the stage and/or part can then update
+//    supply and/or demand.
+// 3) Threads used to call update callbacks must be suitable for operating the
+//    engine. There is currently no affordance for processing other tasks on
+//    a thread while the callback is running. A callback may run for a long
+//    time, depending on how much work needs to be done.
+// 4) Parts cannot rely on being called back on the same thread on which they
+//    invoke update callbacks. This may require additional synchronization and
+//    thread transitions inside the part.
+// 5) If a part takes a lock of its own during Update, it should not also hold
+//    that lock when calling the update callback. Doing so will result in
+//    deadlock.
+//
+// NOTE: Allocators, not otherwise discussed here, are required to be thread-
+// safe so that packets may be cleaned up on any thread.
+//
+// In the future, the threading model will be enhanced. Intended features
+// include:
+// 1) Support for multiple threads.
+// 2) Marshalling update callbacks to a different thread.
+//
+
+// Host for a source, sink or transform.
+class Engine {
+ public:
+  class Input;
+  class Output;
+
+  // Opaque Stage pointer used for graph building.
+  class Part {
+   public:
+    Part() : stage_(nullptr) {}
+
+    uint32_t input_count();
+    Input input(uint32_t index);
+    Input input();
+    uint32_t output_count();
+    Output output(uint32_t index);
+    Output output();
+    Part upstream_part(uint32_t index);
+    Part upstream_part();
+    Part downstream_part(uint32_t index);
+    Part downstream_part();
+
+   private:
+    explicit Part(Stage* stage) : stage_(stage) {}
+
+    explicit operator bool() const { return stage_ != nullptr; }
+
+    Stage* stage_;
+
+    friend Engine;
+    friend Input;
+    friend Output;
+  };
+
+  // Opaque StageInput pointer used for graph building.
+  class Input {
+   public:
+    Input() : stage_(nullptr), index_(0) {}
+
+    explicit operator bool() const { return stage_ != nullptr; }
+
+    Part part() { return Part(stage_); }
+
+    bool connected() {
+      DCHECK(stage_);
+      return stage_input().upstream_stage() != nullptr;
+    }
+
+    Part upstream_part() {
+      DCHECK(connected());
+      return Part(stage_input().upstream_stage());
+    }
+
+   private:
+    Input(Stage* stage, uint32_t index) :
+        stage_(stage), index_(index) {
+      DCHECK(stage_);
+      DCHECK(index_ < stage_->input_count());
+    }
+
+    StageInput& stage_input() {
+      DCHECK(stage_);
+      return stage_->input(index_);
+    }
+
+    Stage* stage_;
+    uint32_t index_;
+
+    friend Engine;
+    friend Part;
+    friend Output;
+  };
+
+  // Opaque StageOutput pointer used for graph building.
+  class Output {
+   public:
+    Output() : stage_(nullptr), index_(0) {}
+
+    explicit operator bool() const { return stage_ != nullptr; }
+
+    Part part() { return Part(stage_); }
+
+    bool connected() {
+      DCHECK(stage_);
+      return stage_output().downstream_stage() != nullptr;
+    }
+
+    Part downstream_part() {
+      DCHECK(connected());
+      return Part(stage_output().downstream_stage());
+    }
+
+   private:
+    Output(Stage* stage, uint32_t index) :
+        stage_(stage), index_(index) {
+      DCHECK(stage_);
+      DCHECK(index_ < stage_->output_count());
+    }
+
+    StageOutput& stage_output() {
+      DCHECK(stage_);
+      return stage_->output(index_);
+    }
+
+    Stage* stage_;
+    uint32_t index_;
+
+    friend Engine;
+    friend Part;
+    friend Input;
+  };
+
+  Engine();
+
+  ~Engine();
+
+  // Adds a part to the engine.
+  template<typename T, typename TBase>
+  Part Add(SharedPtr<T, TBase> t) {
+    DCHECK(t);
+    return Add(CreateStage(std::shared_ptr<TBase>(t)));
+  }
+
+  // Removes a part from the engine after disconnecting it from other parts.
+  void RemovePart(Part part);
+
+  // Connects an output connector to an input connector. Returns the dowstream
+  // part.
+  Part Connect(Output output, Input input);
+
+  // Connects a part with exactly one output to a part with exactly one input.
+  // Returns the downstream part.
+  Part ConnectParts(Part upstream_part, Part downstream_part);
+
+  // Connects an output connector to a part that has exactly one input. Returns
+  // the downstream part.
+  Part ConnectOutputToPart(Output output, Part downstream_part);
+
+  // Connects a part with exactly one output to an input connector. Returns the
+  // downstream part.
+  Part ConnectPartToInput(Part upstream_part, Input input);
+
+  // Disconnects an output connector and the input connector to which it's
+  // connected.
+  void DisconnectOutput(Output output);
+
+  // Disconnects an input connector and the output connector to which it's
+  // connected.
+  void DisconnectInput(Input input);
+
+  // Disconnects and removes part and everything connected to it.
+  void RemovePartsConnectedToPart(Part part);
+
+  // Disconnects and removes everything connected to output.
+  void RemovePartsConnectedToOutput(Output output);
+
+  // Disconnects and removes everything connected to input.
+  void RemovePartsConnectedToInput(Input input);
+
+  // Adds all the parts in t (which must all have one input and one output) and
+  // connects them in sequence to the output connector. Returns the output
+  // connector of the last part or the output parameter if it is empty.
+  template<typename T>
+  Output AddAndConnectAll(
+      Output output,
+      const T& t) {
+    for (auto& element : t) {
+      Part part = Add(CreateStage(element));
+      Connect(output, part.input());
+      output = part.output();
+    }
+    return output;
+  }
+
+  // Prepares the engine.
+  void Prepare();
+
+  // Prepares the part and everything upstream of it. This method is used to
+  // prepare subgraphs added when the rest of the graph is already prepared.
+  void Prepare(Part part);
+
+  // Primes all the sinks in the graph.
+  void PrimeSinks();
+
+  // Removes all parts from the engine.
+  void Reset();
+
+ private:
+  // Adds a stage to the engine.
+  Part Add(Stage* stage);
+
+  // Disconnects an output.
+  void DisconnectOutputUnsafe(Stage* stage, uint32_t index);
+
+  // Disconnects an input.
+  void DisconnectInputUnsafe(Stage* stage, uint32_t index);
+
+  // Removes a stage.
+  void RemoveUnsafe(Stage* stage);
+
+  // Creates a stage from a source, sink or transform. A specialization of this
+  // template is defined for each type of source, sink or transform that can be
+  // added to the engine.
+  template<typename T>
+  static Stage* CreateStage(std::shared_ptr<T> t);
+
+  // CreateStage template specialization for MultiStreamPacketSource.
+  static Stage* CreateStage(MultiStreamPacketSourcePtr source);
+
+  // CreateStage template specialization for PacketTransform.
+  static Stage* CreateStage(PacketTransformPtr transform);
+
+  // CreateStage template specialization for ActiveSource.
+  static Stage* CreateStage(ActiveSourcePtr source);
+
+  // CreateStage template specialization for ActiveSink.
+  static Stage* CreateStage(ActiveSinkPtr sink);
+
+  // CreateStage template specialization for LpcmTransform.
+  static Stage* CreateStage(LpcmTransformPtr transform);
+
+  // Prepares a stage if all its downstream stages are prepared.
+  void MaybePrepareUnsafe(Stage* stage);
+
+  // Processes the entire backlog.
+  void UpdateUnsafe();
+
+  // Performs processing for a single stage, updating the backlog accordingly.
+  void UpdateUnsafe(Stage *stage);
+
+  // Pushes the stage to the supply backlog if it isn't already there.
+  void PushToSupplyBacklogUnsafe(Stage* stage);
+
+  // Pushes the stage to the demand backlog if it isn't already there.
+  void PushToDemandBacklogUnsafe(Stage* stage);
+
+  // Pops a stage from the supply backlog and returns it or returns nullptr if
+  // the supply backlog is empty.
+  Stage* PopFromSupplyBacklogUnsafe();
+
+  // Pops a stage from the demand backlog and returns it or returns nullptr if
+  // the demand backlog is empty.
+  Stage* PopFromDemandBacklogUnsafe();
+
+  mutable base::Lock lock_;
+  std::list<Stage*> stages_;
+  std::list<Stage*> sources_;
+  std::list<Stage*> sinks_;
+  // supply_backlog_ contains pointers to all the stages that have been supplied
+  // (packets or frames) but have not been updated since. demand_backlog_ does
+  // the same for demand. The use of queue vs stack here is a guess as to what
+  // will yield the best results. It's possible that only a single backlog is
+  // required.
+  // TODO(dalesat): Determine the best ordering and implement it.
+  std::queue<Stage*> supply_backlog_;
+  std::stack<Stage*> demand_backlog_;
+  Stage::UpdateCallback update_function_;
+  bool packets_produced_;
+
+  friend class StageInput;
+  friend class StageOutput;
+  friend class LpcmStageInput;
+  friend class LpcmStageOutput;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_ENGINE_H_
diff --git a/services/media/framework/formatting.cc b/services/media/framework/formatting.cc
new file mode 100644
index 0000000..cf128ab
--- /dev/null
+++ b/services/media/framework/formatting.cc
@@ -0,0 +1,385 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <iostream>
+
+#include "services/media/framework/formatting.h"
+
+namespace mojo {
+namespace media {
+
+int ostream_indent_index() {
+  static int i = std::ios_base::xalloc();
+  return i;
+}
+
+std::ostream& operator<<(std::ostream& os, Result value) {
+  switch (value) {
+    case Result::kOk:
+      return os << "kOk";
+    case Result::kUnknownError:
+      return os << "kUnknownError";
+    case Result::kInternalError:
+      return os << "kInternalError";
+    case Result::kUnsupportedOperation:
+      return os << "kUnsupportedOperation";
+    case Result::kInvalidArgument:
+      return os << "kInvalidArgument";
+    case Result::kNotFound:
+      return os << "kNotFound";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, Demand value) {
+  switch (value) {
+    case Demand::kNegative:
+      return os << "kNegative";
+    case Demand::kNeutral:
+      return os << "kNeutral";
+    case Demand::kPositive:
+      return os << "kPositive";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, const PacketPtr& value) {
+  if (!value) {
+    return os << "<nullptr>";
+  }
+
+  os << "&" << std::hex << uint64_t(value.get()) << std::dec;
+  os << "/pts:" << value->presentation_time();
+  os << "/dur:" << value->duration();
+  os << "/eos:" << (value->end_of_stream() ? "t" : "f");
+  os << "/size:" << value->size();
+  os << "/payload:" << std::hex << uint64_t(value->payload()) << std::dec;
+  return os;
+}
+
+std::ostream& operator<<(std::ostream& os, const StreamTypePtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "Scheme scheme(): " << value->scheme() << std::endl;
+  switch (value->scheme()) {
+    case StreamType::Scheme::kMultiplexed:
+      os << begl << "StreamTypePtr multiplex_type: "
+          << value->multiplexed()->multiplex_type();
+      os << begl << "StreamTypesPtr substream_types: "
+          << value->multiplexed()->substream_types();
+      break;
+    case StreamType::Scheme::kLpcm:
+      os << begl << "SampleFormat sample_format: "
+          << value->lpcm()->sample_format() << std::endl;
+      os << begl << "uint32_t channels: "
+          << value->lpcm()->channels() << std::endl;
+      os << begl << "uint32_t frames_per_second: "
+          << value->lpcm()->frames_per_second() << std::endl;
+      break;
+    case StreamType::Scheme::kCompressedAudio:
+      os << begl << "AudioEncoding encoding: "
+          << value->compressed_audio()->encoding() << std::endl;
+      os << begl << "SampleFormat sample_format: "
+          << value->compressed_audio()->sample_format() << std::endl;
+      os << begl << "uint32_t channels: "
+          << value->compressed_audio()->channels() << std::endl;
+      os << begl << "uint32_t frames_per_second: "
+          << value->compressed_audio()->frames_per_second() << std::endl;
+      os << begl << "BytesPtr encoding_details: "
+          << value->compressed_audio()->encoding_details() << std::endl;
+      break;
+    case StreamType::Scheme::kVideo:
+      os << begl << "VideoEncoding encoding: "
+          << value->video()->encoding() << std::endl;
+      os << begl << "VideoProfile profile: "
+          << value->video()->profile() << std::endl;
+      os << begl << "PixelFormat pixel_format: "
+          << value->video()->pixel_format() << std::endl;
+      os << begl << "ColorSpace color_space: "
+          << value->video()->color_space() << std::endl;
+      os << begl << "uint32_t width: "
+          << value->video()->width() << std::endl;
+      os << begl << "uint32_t height: "
+          << value->video()->height() << std::endl;
+      os << begl << "uint32_t coded_width: "
+          << value->video()->coded_width() << std::endl;
+      os << begl << "uint32_t coded_height: "
+          << value->video()->coded_height() << std::endl;
+      os << begl << "BytesPtr encoding_details: "
+          << value->video()->encoding_details() << std::endl;
+      break;
+    default:
+      break;
+  }
+
+  return os << outdent;
+}
+
+std::ostream& operator<<(std::ostream& os, const StreamTypeSetPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "Scheme scheme(): " << value->scheme() << std::endl;
+  switch (value->scheme()) {
+    case StreamType::Scheme::kMultiplexed:
+      os << begl << "StreamTypeSetPtr multiplex_type_set: "
+          << value->multiplexed()->multiplex_type_set();
+      os << begl << "StreamTypeSetsPtr substream_type_sets: "
+          << value->multiplexed()->substream_type_sets();
+      break;
+    case StreamType::Scheme::kLpcm:
+      os << begl << "SampleFormat sample_format: "
+          << value->lpcm()->sample_format() << std::endl;
+      os << begl << "Range<uint32_t> channels: "
+          << value->lpcm()->channels() << std::endl;
+      os << begl << "Range<uint32_t> frames_per_second: "
+          << value->lpcm()->frames_per_second() << std::endl;
+      break;
+    case StreamType::Scheme::kCompressedAudio:
+      os << begl << "AudioEncoding encoding: "
+          << value->compressed_audio()->encoding() << std::endl;
+      os << begl << "SampleFormat sample_format: "
+          << value->compressed_audio()->sample_format() << std::endl;
+      os << begl << "Range<uint32_t> channels: "
+          << value->compressed_audio()->channels() << std::endl;
+      os << begl << "Range<uint32_t> frames_per_second: "
+          << value->compressed_audio()->frames_per_second() << std::endl;
+      break;
+    case StreamType::Scheme::kVideo:
+      os << begl << "VideoEncoding encoding: "
+          << value->video()->encoding() << std::endl;
+      os << begl << "Range<uint32_t> width: "
+          << value->video()->width() << std::endl;
+      os << begl << "Range<uint32_t> height: "
+          << value->video()->height() << std::endl;
+      break;
+    default:
+      break;
+  }
+
+  return os << outdent;
+}
+
+std::ostream& operator<<(std::ostream& os, const StreamTypesPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else if (value->size() == 0) {
+    return os << "<empty>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  int index = 0;
+  for (const StreamTypePtr& element : *value) {
+    os << "[" << index++ << "]: " << element;
+  }
+
+  return os;
+}
+
+std::ostream& operator<<(std::ostream& os, const StreamTypeSetsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else if (value->size() == 0) {
+    return os << "<empty>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  int index = 0;
+  for (const StreamTypeSetPtr& element : *value) {
+    os << "[" << index++ << "]: " << element;
+  }
+
+  return os;
+}
+
+std::ostream& operator<<(std::ostream& os, StreamType::Scheme value) {
+  switch (value) {
+    case StreamType::Scheme::kUnknown:
+      return os << "kUnknown";
+    case StreamType::Scheme::kNone:
+      return os << "kNone";
+    case StreamType::Scheme::kAnyElementary:
+      return os << "kAnyElementary";
+    case StreamType::Scheme::kAnyAudio:
+      return os << "kAnyAudio";
+    case StreamType::Scheme::kAnyVideo:
+      return os << "kAnyVideo";
+    case StreamType::Scheme::kAnySubpicture:
+      return os << "kAnySubpicture";
+    case StreamType::Scheme::kAnyText:
+      return os << "kAnyText";
+    case StreamType::Scheme::kAnyMultiplexed:
+      return os << "kAnyMultiplexed";
+    case StreamType::Scheme::kAny:
+      return os << "kAny";
+    case StreamType::Scheme::kMultiplexed:
+      return os << "kMultiplexed";
+    case StreamType::Scheme::kLpcm:
+      return os << "kLpcm";
+    case StreamType::Scheme::kCompressedAudio:
+      return os << "kCompressedAudio";
+    case StreamType::Scheme::kVideo:
+      return os << "kVideo";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, LpcmStreamType::SampleFormat value) {
+  switch (value) {
+    case LpcmStreamType::SampleFormat::kUnknown:
+      return os << "kUnknown";
+    case LpcmStreamType::SampleFormat::kAny:
+      return os << "kAny";
+    case LpcmStreamType::SampleFormat::kUnsigned8:
+      return os << "kUnsigned8";
+    case LpcmStreamType::SampleFormat::kSigned16:
+      return os << "kSigned16";
+    case LpcmStreamType::SampleFormat::kSigned24In32:
+      return os << "kSigned24In32";
+    case LpcmStreamType::SampleFormat::kFloat:
+      return os << "kFloat";
+  }
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    CompressedAudioStreamType::AudioEncoding value) {
+  switch (value) {
+    case CompressedAudioStreamType::AudioEncoding::kUnknown:
+      return os << "kUnknown";
+    case CompressedAudioStreamType::AudioEncoding::kAny:
+      return os << "kAny";
+    case CompressedAudioStreamType::AudioEncoding::kVorbis:
+      return os << "kVorbis";
+  }
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    VideoStreamType::VideoEncoding value) {
+  switch (value) {
+    case VideoStreamType::VideoEncoding::kUnknown:
+      return os << "kUnknown";
+    case VideoStreamType::VideoEncoding::kAny:
+      return os << "kAny";
+    case VideoStreamType::VideoEncoding::kTheora:
+      return os << "kTheora";
+    case VideoStreamType::VideoEncoding::kVp8:
+      return os << "kVp8";
+  }
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    VideoStreamType::VideoProfile value) {
+  switch (value) {
+    case VideoStreamType::VideoProfile::kUnknown:
+      return os << "kUnknown";
+    case VideoStreamType::VideoProfile::kNotApplicable:
+      return os << "kNotApplicable";
+    case VideoStreamType::VideoProfile::kH264Baseline:
+      return os << "kH264Baseline";
+    case VideoStreamType::VideoProfile::kH264Main:
+      return os << "kH264Main";
+    case VideoStreamType::VideoProfile::kH264Extended:
+      return os << "kH264Extended";
+    case VideoStreamType::VideoProfile::kH264High:
+      return os << "kH264High";
+    case VideoStreamType::VideoProfile::kH264High10:
+      return os << "kH264High10";
+    case VideoStreamType::VideoProfile::kH264High422:
+      return os << "kH264High422";
+    case VideoStreamType::VideoProfile::kH264High444Predictive:
+      return os << "kH264High444Predictive";
+    case VideoStreamType::VideoProfile::kH264ScalableBaseline:
+      return os << "kH264ScalableBaseline";
+    case VideoStreamType::VideoProfile::kH264ScalableHigh:
+      return os << "kH264ScalableHigh";
+    case VideoStreamType::VideoProfile::kH264StereoHigh:
+      return os << "kH264StereoHigh";
+    case VideoStreamType::VideoProfile::kH264MultiviewHigh:
+      return os << "kH264MultiviewHigh";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, VideoStreamType::PixelFormat value) {
+  switch (value) {
+    case VideoStreamType::PixelFormat::kUnknown:
+      return os << "kUnknown";
+    case VideoStreamType::PixelFormat::kI420:
+      return os << "kI420";
+    case VideoStreamType::PixelFormat::kYv12:
+      return os << "kYv12";
+    case VideoStreamType::PixelFormat::kYv16:
+      return os << "kYv16";
+    case VideoStreamType::PixelFormat::kYv12A:
+      return os << "kYv12A";
+    case VideoStreamType::PixelFormat::kYv24:
+      return os << "kYv24";
+    case VideoStreamType::PixelFormat::kNv12:
+      return os << "kNv12";
+    case VideoStreamType::PixelFormat::kNv21:
+      return os << "kNv21";
+    case VideoStreamType::PixelFormat::kUyvy:
+      return os << "kUyvy";
+    case VideoStreamType::PixelFormat::kYuy2:
+      return os << "kYuy2";
+    case VideoStreamType::PixelFormat::kArgb:
+      return os << "kArgb";
+    case VideoStreamType::PixelFormat::kXrgb:
+      return os << "kXrgb";
+    case VideoStreamType::PixelFormat::kRgb24:
+      return os << "kRgb24";
+    case VideoStreamType::PixelFormat::kRgb32:
+      return os << "kRgb24";
+    case VideoStreamType::PixelFormat::kMjpeg:
+      return os << "kRgb24";
+    case VideoStreamType::PixelFormat::kMt21:
+      return os << "kRgb24";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, VideoStreamType::ColorSpace value) {
+  switch (value) {
+    case VideoStreamType::ColorSpace::kUnknown:
+      return os << "kUnknown";
+    case VideoStreamType::ColorSpace::kNotApplicable:
+      return os << "kNotApplicable";
+    case VideoStreamType::ColorSpace::kJpeg:
+      return os << "kJpeg";
+    case VideoStreamType::ColorSpace::kHdRec709:
+      return os << "kHdRec709";
+    case VideoStreamType::ColorSpace::kSdRec601:
+      return os << "kSdRec601";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, const BytesPtr& value) {
+  if (value == nullptr) {
+    return os << "<nullptr>";
+  } else {
+    return os << value->size() << " bytes";
+  }
+}
+
+std::ostream& operator<<(std::ostream& os, Range<bool> value) {
+  if (value.min) {
+    return os << "true";
+  } else if (value.max) {
+    return os << "false..true";
+  }  else {
+    return os << "false";
+  }
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/formatting.h b/services/media/framework/formatting.h
new file mode 100644
index 0000000..a455ae1
--- /dev/null
+++ b/services/media/framework/formatting.h
@@ -0,0 +1,88 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_FORMATTING_H_
+#define SERVICES_MEDIA_FRAMEWORK_FORMATTING_H_
+
+#include <ostream>
+
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/packet.h"
+#include "services/media/framework/result.h"
+#include "services/media/framework/stream_type.h"
+
+//
+// This file declares a bunch of << operator overloads for dumping media stuff.
+// Unless you want to add new operators, it's sufficient to know that you can
+// just use the operators as expected, except that some of the overloads can
+// produce multiple lines and therefore provide their own newlines.
+//
+// These operators are intended to be called after a label has been added to
+// the stream with a trailing space. If the text generated by an operator is
+// sufficiently short, the operator may add that text with no preamble and
+// terminate it with std::endl. If the text has to be multiline, the operator
+// first adds std::endl, then the multiline text with std::endl termination.
+// Each line starts with begl in order to apply the appropriate indentation.
+// The Indenter class is provided to adjust the identation level. Operators
+// that take pointers need to handle nullptr.
+//
+
+namespace mojo {
+namespace media {
+
+int ostream_indent_index();
+
+std::ostream& begl(std::ostream& os) {
+  for (int i = 0; i < os.iword(ostream_indent_index()); i++) {
+    os << "    ";
+  }
+  return os;
+}
+
+inline std::ostream& indent(std::ostream& os) {
+  ++os.iword(ostream_indent_index());
+  return os;
+}
+
+inline std::ostream& outdent(std::ostream& os) {
+  --os.iword(ostream_indent_index());
+  return os;
+}
+
+// The following overloads don't add newlines.
+
+std::ostream& operator<<(std::ostream& os, Result value);
+std::ostream& operator<<(std::ostream& os, Demand value);
+std::ostream& operator<<(std::ostream& os, const PacketPtr& value);
+std::ostream& operator<<(std::ostream& os, StreamType::Scheme value);
+std::ostream& operator<<(std::ostream& os, LpcmStreamType::SampleFormat value);
+std::ostream& operator<<(
+    std::ostream& os,
+    CompressedAudioStreamType::AudioEncoding value);
+std::ostream& operator<<(
+    std::ostream& os,
+    VideoStreamType::VideoEncoding value);
+std::ostream& operator<<(std::ostream& os, VideoStreamType::VideoProfile value);
+std::ostream& operator<<(std::ostream& os, VideoStreamType::PixelFormat value);
+std::ostream& operator<<(std::ostream& os, VideoStreamType::ColorSpace value);
+std::ostream& operator<<(std::ostream& os, const BytesPtr& value);
+
+template<typename T>
+std::ostream& operator<<(std::ostream& os, Range<T> value) {
+  return os << value.min << ".." << value.max;
+}
+
+std::ostream& operator<<(std::ostream& os, Range<bool> value);
+
+// The following overloads add newlines.
+
+std::ostream& operator<<(std::ostream& os, const StreamTypePtr& value);
+std::ostream& operator<<(std::ostream& os, const StreamTypeSetPtr& value);
+std::ostream& operator<<(std::ostream& os, const StreamTypesPtr& value);
+std::ostream& operator<<(std::ostream& os, const StreamTypeSetsPtr& value);
+
+} // namespace media
+} // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_FORMATTING_H_
diff --git a/services/media/framework/lpcm_util.cc b/services/media/framework/lpcm_util.cc
new file mode 100644
index 0000000..ca20807
--- /dev/null
+++ b/services/media/framework/lpcm_util.cc
@@ -0,0 +1,169 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/formatting.h"
+#include "services/media/framework/lpcm_util.h"
+
+namespace mojo {
+namespace media {
+
+// LpcmUtil implementation that processes samples of type T.
+template<typename T>
+class LpcmUtilImpl : public LpcmUtil {
+ public:
+  ~LpcmUtilImpl();
+
+  void Silence(void* buffer, uint64_t frame_count) const override;
+
+  void Copy(const void* in, void* out, uint64_t frame_count) const override;
+
+  void Mix(const void* in, void* out, uint64_t frame_count) const override;
+
+  void Interleave(
+      const void* in,
+      uint64_t in_byte_count,
+      void* out,
+      uint64_t frame_count) const override;
+
+ private:
+  LpcmUtilImpl(const LpcmStreamType& stream_type);
+
+  LpcmStreamType stream_type_;
+
+  friend class LpcmUtil;
+};
+
+std::unique_ptr<LpcmUtil> LpcmUtil::Create(const LpcmStreamType& stream_type) {
+  LpcmUtil* result;
+  switch (stream_type.sample_format()) {
+    case LpcmStreamType::SampleFormat::kUnsigned8:
+    case LpcmStreamType::SampleFormat::kAny:
+      result = new LpcmUtilImpl<uint8_t>(stream_type);
+      break;
+    case LpcmStreamType::SampleFormat::kSigned16:
+      result = new LpcmUtilImpl<int16_t>(stream_type);
+      break;
+    case LpcmStreamType::SampleFormat::kSigned24In32:
+      result = new LpcmUtilImpl<int32_t>(stream_type);
+      break;
+    case LpcmStreamType::SampleFormat::kFloat:
+      result = new LpcmUtilImpl<float>(stream_type);
+      break;
+    default:
+      NOTREACHED()
+          << "unsupported sample format " << stream_type.sample_format();
+      result = nullptr;
+      break;
+  }
+
+  return std::unique_ptr<LpcmUtil>(result);
+}
+
+template<typename T>
+LpcmUtilImpl<T>::LpcmUtilImpl(const LpcmStreamType& stream_type) :
+    stream_type_(stream_type) {}
+
+template<typename T>
+LpcmUtilImpl<T>::~LpcmUtilImpl() {}
+
+template<typename T>
+void LpcmUtilImpl<T>::Silence(void* buffer, uint64_t frame_count) const {
+  T* sample = reinterpret_cast<T*>(buffer);
+  for (
+      uint64_t sample_countdown = frame_count * stream_type_.channels();
+      sample_countdown != 0;
+      --sample_countdown) {
+    *sample = 0;
+    sample++;
+  }
+}
+
+template<>
+void LpcmUtilImpl<uint8_t>::Silence(void* buffer, uint64_t frame_count) const {
+  std::memset(buffer, 0x80, frame_count * stream_type_.bytes_per_frame());
+}
+
+template<>
+void LpcmUtilImpl<int16_t>::Silence(void* buffer, uint64_t frame_count) const {
+  std::memset(buffer, 0, frame_count * stream_type_.bytes_per_frame());
+}
+
+template<>
+void LpcmUtilImpl<int32_t>::Silence(void* buffer, uint64_t frame_count) const {
+  std::memset(buffer, 0, frame_count * stream_type_.bytes_per_frame());
+}
+
+template<typename T>
+void LpcmUtilImpl<T>::Copy(const void* in, void* out, uint64_t frame_count)
+    const {
+  std::memcpy(out, in, stream_type_.min_buffer_size(frame_count));
+}
+
+template<typename T>
+void LpcmUtilImpl<T>::Mix(const void* in, void* out, uint64_t frame_count)
+    const {
+  const T* in_sample = reinterpret_cast<const T*>(in);
+  T* out_sample = reinterpret_cast<T*>(out);
+  for (
+      uint64_t sample_countdown = frame_count * stream_type_.channels();
+      sample_countdown != 0;
+      --sample_countdown) {
+    *out_sample += *in_sample; // TODO(dalesat): Limit.
+    out_sample++;
+    in_sample++;
+  }
+}
+
+template<>
+void LpcmUtilImpl<uint8_t>::Mix(const void* in, void* out, uint64_t frame_count)
+    const {
+  const uint8_t* in_sample = reinterpret_cast<const uint8_t*>(in);
+  uint8_t* out_sample = reinterpret_cast<uint8_t*>(out);
+  for (
+      uint64_t sample_countdown = frame_count * stream_type_.channels();
+      sample_countdown != 0;
+      --sample_countdown) {
+    *out_sample = uint8_t(uint16_t(*out_sample) + uint16_t(*in_sample) - 0x80);
+    // TODO(dalesat): Limit.
+    out_sample++;
+    in_sample++;
+  }
+}
+
+template<typename T>
+void LpcmUtilImpl<T>::Interleave(
+    const void* in,
+    uint64_t in_byte_count,
+    void* out,
+    uint64_t frame_count) const {
+  DCHECK(in);
+  DCHECK(in_byte_count);
+  DCHECK(out);
+  DCHECK(frame_count);
+
+  uint32_t channels = stream_type_.channels();
+  DCHECK(channels);
+  DCHECK(in_byte_count % stream_type_.bytes_per_frame() == 0);
+  DCHECK(in_byte_count >= frame_count * stream_type_.bytes_per_frame());
+  uint64_t in_channel_stride = in_byte_count / stream_type_.bytes_per_frame();
+
+  const T* in_channel = reinterpret_cast<const T*>(in);
+  T* out_channel = reinterpret_cast<T*>(out);
+
+  for (uint32_t channel = channels; channel != 0; --channel) {
+    const T* in_sample = in_channel;
+    T* out_sample = out_channel;
+    for (uint64_t frame = frame_count; frame != 0; --frame) {
+      *out_sample = *in_sample;
+      ++in_sample;
+      out_sample += channels;
+    }
+    in_channel += in_channel_stride;
+    ++out_channel;
+  }
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/lpcm_util.h b/services/media/framework/lpcm_util.h
new file mode 100644
index 0000000..6758472
--- /dev/null
+++ b/services/media/framework/lpcm_util.h
@@ -0,0 +1,46 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_LPCM_UTIL_H_
+#define SERVICES_MEDIA_FRAMEWORK_LPCM_UTIL_H_
+
+#include <memory>
+
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+// Helper class that performs various LPCM processing functions.
+class LpcmUtil {
+ public:
+  static std::unique_ptr<LpcmUtil> Create(const LpcmStreamType& stream_type);
+
+  virtual ~LpcmUtil() {}
+
+  // Fills the buffer with silence.
+  virtual void Silence(void* buffer, uint64_t frame_count) const = 0;
+
+  // Copies samples.
+  virtual void Copy(const void* in, void* out, uint64_t frame_count) const = 0;
+
+  // Mixes samples.
+  virtual void Mix(const void* in, void* out, uint64_t frame_count) const = 0;
+
+  // Interleaves non-interleaved samples. This assumes ffmpeg non-interleaved
+  // ("planar") layout, in which the buffer (in) is divided evenly into a
+  // channel buffer per channel. The samples for each channel are contiguous
+  // in the respective channel buffer with possible empty space at the end
+  // (hence the in_type_count and the frame_count).
+  virtual void Interleave(
+      const void* in,
+      uint64_t in_byte_count,
+      void* out,
+      uint64_t frame_count) const = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_LPCM_UTIL_H_
diff --git a/services/media/framework/metadata.cc b/services/media/framework/metadata.cc
new file mode 100644
index 0000000..d62b4b7
--- /dev/null
+++ b/services/media/framework/metadata.cc
@@ -0,0 +1,48 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/metadata.h"
+
+namespace mojo {
+namespace media {
+
+// static
+MetadataPtr Metadata::Create(
+    uint64_t duration_ns,
+    const std::string& title,
+    const std::string& artist,
+    const std::string& album,
+    const std::string& publisher,
+    const std::string& genre,
+    const std::string& composer) {
+  return MetadataPtr(new Metadata(
+      duration_ns,
+      title,
+      artist,
+      album,
+      publisher,
+      genre,
+      composer));
+}
+
+Metadata::Metadata(
+    uint64_t duration_ns,
+    const std::string& title,
+    const std::string& artist,
+    const std::string& album,
+    const std::string& publisher,
+    const std::string& genre,
+    const std::string& composer) :
+    duration_ns_(duration_ns),
+    title_(title),
+    artist_(artist),
+    album_(album),
+    publisher_(publisher),
+    genre_(genre),
+    composer_(composer) {}
+
+Metadata::~Metadata() {}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/metadata.h b/services/media/framework/metadata.h
new file mode 100644
index 0000000..ee44240
--- /dev/null
+++ b/services/media/framework/metadata.h
@@ -0,0 +1,87 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_METADATA_H_
+#define SERVICES_MEDIA_FRAMEWORK_METADATA_H_
+
+#include <memory>
+#include <string>
+
+#include "base/macros.h"
+#include "services/media/framework/ptr.h"
+
+namespace mojo {
+namespace media {
+
+class Metadata;
+
+// TODO(dalesat): Get rid of typedefs like these.
+typedef UniquePtr<Metadata> MetadataPtr;
+
+// Container for content metadata.
+// TODO(dalesat): Probably needs to be extensible. Consider using map-like.
+class Metadata {
+ public:
+  // TODO(dalesat): Rename methods like this 'Create'.
+  static MetadataPtr Create(
+      uint64_t duration_ns,
+      const std::string& title,
+      const std::string& artist,
+      const std::string& album,
+      const std::string& publisher,
+      const std::string& genre,
+      const std::string& composer);
+
+  ~Metadata();
+
+  uint64_t duration_ns() const { return duration_ns_; }
+
+  const std::string& title() const { return title_; }
+
+  const std::string& artist() const { return artist_; }
+
+  const std::string& album() const { return album_; }
+
+  const std::string& publisher() const { return publisher_; }
+
+  const std::string& genre() const { return genre_; }
+
+  const std::string& composer() const { return composer_; }
+
+  MetadataPtr Clone() const {
+    return Create(
+        duration_ns_,
+        title_,
+        artist_,
+        album_,
+        publisher_,
+        genre_,
+        composer_);
+  }
+
+ private:
+  Metadata(
+      uint64_t duration_ns,
+      const std::string& title,
+      const std::string& artist,
+      const std::string& album,
+      const std::string& publisher,
+      const std::string& genre,
+      const std::string& composer);
+
+  uint64_t duration_ns_;
+  std::string title_;
+  std::string artist_;
+  std::string album_;
+  std::string publisher_;
+  std::string genre_;
+  std::string composer_;
+
+  DISALLOW_COPY_AND_ASSIGN(Metadata);
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_METADATA_H_
diff --git a/services/media/framework/models/active_sink.h b/services/media/framework/models/active_sink.h
new file mode 100644
index 0000000..955fe06
--- /dev/null
+++ b/services/media/framework/models/active_sink.h
@@ -0,0 +1,46 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_ACTIVE_SINK_H_
+#define MOJO_MEDIA_MODELS_ACTIVE_SINK_H_
+
+#include <memory>
+
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+// Sink that consumes packets asynchronously.
+class ActiveSink {
+ public:
+  using DemandCallback = std::function<void(Demand demand)>;
+
+  virtual ~ActiveSink() {}
+
+  // Indicates whether the sink must allocate.
+  virtual bool must_allocate() const = 0;
+
+  // The consumer's allocator. Can return nullptr, in which case the default
+  // allocator should be used.
+  virtual Allocator* allocator() = 0;
+
+  // Sets the callback that signals demand asynchronously.
+  virtual void SetDemandCallback(DemandCallback demand_callback) = 0;
+
+  // Initiates demand.
+  virtual void Prime() = 0;
+
+  // Supplies a packet to the sink.
+  virtual Demand SupplyPacket(PacketPtr packet) = 0;
+};
+
+typedef std::shared_ptr<ActiveSink> ActiveSinkPtr;
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_ACTIVE_SINK_H_
diff --git a/services/media/framework/models/active_source.h b/services/media/framework/models/active_source.h
new file mode 100644
index 0000000..a1b7b82
--- /dev/null
+++ b/services/media/framework/models/active_source.h
@@ -0,0 +1,42 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_ACTIVE_SOURCE_H_
+#define MOJO_MEDIA_MODELS_ACTIVE_SOURCE_H_
+
+#include <memory>
+
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+// Source that produces packets asynchronously.
+class ActiveSource {
+ public:
+  using SupplyCallback = std::function<void(PacketPtr packet)>;
+
+  virtual ~ActiveSource() {}
+
+  // Whether the source can accept an allocator.
+  virtual bool can_accept_allocator() const = 0;
+
+  // Sets the allocator for the source.
+  virtual void set_allocator(Allocator* allocator) = 0;
+
+  // Sets the callback that supplies a packet asynchronously.
+  virtual void SetSupplyCallback(SupplyCallback supply_callback) = 0;
+
+  // Sets the demand signalled from downstream.
+  virtual void SetDownstreamDemand(Demand demand) = 0;
+};
+
+typedef std::shared_ptr<ActiveSource> ActiveSourcePtr;
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_ACTIVE_SOURCE_H_
diff --git a/services/media/framework/models/demand.h b/services/media/framework/models/demand.h
new file mode 100644
index 0000000..843923d
--- /dev/null
+++ b/services/media/framework/models/demand.h
@@ -0,0 +1,28 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_DEMAND_H_
+#define MOJO_MEDIA_MODELS_DEMAND_H_
+
+namespace mojo {
+namespace media {
+
+// Expresses packet demand for signalling upstream in a graph.
+enum class Demand {
+  // Ordered such that (kNegative < kNeutral < kPositive).
+
+  // No packet can currently be accepted.
+  kNegative = -1,
+
+  // A packet can be accepted but is not required to meet timing constraints.
+  kNeutral = 0,
+
+  // A packet is required to meet timing constraints.
+  kPositive = 1
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_DEMAND_H_
diff --git a/services/media/framework/models/lpcm_frame_buffer.cc b/services/media/framework/models/lpcm_frame_buffer.cc
new file mode 100644
index 0000000..a99696c
--- /dev/null
+++ b/services/media/framework/models/lpcm_frame_buffer.cc
@@ -0,0 +1,19 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/models/lpcm_frame_buffer.h"
+
+namespace mojo {
+namespace media {
+
+LpcmFrameBuffer::LpcmFrameBuffer() :
+    bytes_per_frame_(0),
+    remaining_buffer_(nullptr),
+    remaining_frame_count_(0),
+    exhausted_callback_(nullptr) {}
+
+LpcmFrameBuffer::~LpcmFrameBuffer() {}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/models/lpcm_frame_buffer.h b/services/media/framework/models/lpcm_frame_buffer.h
new file mode 100644
index 0000000..e9db867
--- /dev/null
+++ b/services/media/framework/models/lpcm_frame_buffer.h
@@ -0,0 +1,83 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_LPCM_FRAME_BUFFER_H_
+#define MOJO_MEDIA_MODELS_LPCM_FRAME_BUFFER_H_
+
+#include <cstdint>
+#include <functional>
+
+#include "base/logging.h"
+
+namespace mojo {
+namespace media {
+
+// References an LPCM frame buffer and implements advancement through it.
+class LpcmFrameBuffer {
+ public:
+  using ExhaustedCallback = std::function<void()>;
+
+  LpcmFrameBuffer();
+
+  ~LpcmFrameBuffer();
+
+  void set_bytes_per_frame(uint32_t bytes_per_frame) {
+    bytes_per_frame_ = bytes_per_frame;
+  }
+
+  uint32_t bytes_per_frame() const {
+    return bytes_per_frame_;
+  }
+
+  // The remaining frame buffer.
+  void* buffer() const {
+    return remaining_buffer_;
+  }
+
+  // The remaining number of frames accommodated by the frame buffer.
+  uint64_t frame_count() const {
+    return remaining_frame_count_;
+  }
+
+  // Resets the buffer and frame.
+  void Reset() {
+    remaining_buffer_ = nullptr;
+    remaining_frame_count_ = 0;
+    exhausted_callback_ = nullptr;
+  }
+
+  // Sets the buffer and frame count.
+  void Set(
+      void* buffer,
+      uint64_t frame_count,
+      ExhaustedCallback exhausted_callback = nullptr) {
+    remaining_buffer_ = buffer;
+    remaining_frame_count_ = frame_count;
+    exhausted_callback_ = exhausted_callback;
+  }
+
+  // Updates buffer and frame_count to reflect use of the buffer.
+  void Advance(uint64_t frame_count) {
+    DCHECK(remaining_buffer_);
+    DCHECK(frame_count <= remaining_frame_count_);
+    remaining_buffer_ = reinterpret_cast<uint8_t*>(remaining_buffer_) +
+        (frame_count * bytes_per_frame_);
+    remaining_frame_count_ -= frame_count;
+    if (remaining_frame_count_ == 0 && exhausted_callback_ != nullptr) {
+      exhausted_callback_();
+      exhausted_callback_ = nullptr;
+    }
+  }
+
+ private:
+  uint32_t bytes_per_frame_;
+  void* remaining_buffer_;
+  uint64_t remaining_frame_count_;
+  ExhaustedCallback exhausted_callback_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_LPCM_FRAME_BUFFER_H_
diff --git a/services/media/framework/models/lpcm_transform.h b/services/media/framework/models/lpcm_transform.h
new file mode 100644
index 0000000..e1ec0c1
--- /dev/null
+++ b/services/media/framework/models/lpcm_transform.h
@@ -0,0 +1,39 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_LPCM_TRANSFORM_H_
+#define MOJO_MEDIA_MODELS_LPCM_TRANSFORM_H_
+
+#include <memory>
+
+#include "services/media/framework/models/lpcm_frame_buffer.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+// Synchronous lpcm transform.
+class LpcmTransform {
+ public:
+  virtual ~LpcmTransform() {}
+
+  // Gets the input stream type.
+  virtual const LpcmStreamType& input_stream_type() const = 0;
+
+  // Gets the output stream type.
+  virtual const LpcmStreamType& output_stream_type() const = 0;
+
+  // Processes frames.
+  virtual void TransformFrames(
+      LpcmFrameBuffer* source,
+      LpcmFrameBuffer* dest,
+      bool mix) = 0;
+};
+
+typedef std::shared_ptr<LpcmTransform> LpcmTransformPtr;
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_LPCM_TRANSFORM_H_
diff --git a/services/media/framework/models/multistream_packet_source.h b/services/media/framework/models/multistream_packet_source.h
new file mode 100644
index 0000000..b5072f1
--- /dev/null
+++ b/services/media/framework/models/multistream_packet_source.h
@@ -0,0 +1,35 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_MULTISTREAM_PACKET_SOURCE_H_
+#define MOJO_MEDIA_MODELS_MULTISTREAM_PACKET_SOURCE_H_
+
+#include <memory>
+
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+// Synchronous source of packets for multiple streams. This is currently used
+// by Demux, though it would be better if Demux were asynchronous.
+class MultiStreamPacketSource {
+ public:
+  virtual ~MultiStreamPacketSource() {}
+
+  // Returns the number of streams the source produces.
+  virtual uint32_t stream_count() const = 0;
+
+  // Gets a packet for the stream indicated via stream_index_out. This call
+  // should always produce a packet until end-of-stream. The caller is
+  // responsible for releasing the packet.
+  virtual PacketPtr PullPacket(uint32_t *stream_index_out) = 0;
+};
+
+typedef std::shared_ptr<MultiStreamPacketSource> MultiStreamPacketSourcePtr;
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_MULTISTREAM_PACKET_SOURCE_H_
diff --git a/services/media/framework/models/packet_transform.h b/services/media/framework/models/packet_transform.h
new file mode 100644
index 0000000..b25069b
--- /dev/null
+++ b/services/media/framework/models/packet_transform.h
@@ -0,0 +1,38 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_PACKET_TRANSFORM_H_
+#define MOJO_MEDIA_MODELS_PACKET_TRANSFORM_H_
+
+#include <memory>
+
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+// Synchronous packet transform.
+class PacketTransform {
+ public:
+  virtual ~PacketTransform() {}
+
+  // Processes a packet. Returns true to indicate the transform is done
+  // processing the input packet. Returns false to indicate the input
+  // packet should be processed again. new_input indicates whether the input
+  // packet is new (true) or is being processed again (false). An output packet
+  // may or may not be generated for any given invocation of this method.
+  virtual bool TransformPacket(
+      const PacketPtr& input,
+      bool new_input,
+      Allocator* allocator,
+      PacketPtr* output) = 0;
+};
+
+typedef std::shared_ptr<PacketTransform> PacketTransformPtr;
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_MEDIA_MODELS_PACKET_TRANSFORM_H_
diff --git a/services/media/framework/packet.cc b/services/media/framework/packet.cc
new file mode 100644
index 0000000..9b02493
--- /dev/null
+++ b/services/media/framework/packet.cc
@@ -0,0 +1,106 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+class PacketImpl : public Packet {
+ public:
+  PacketImpl(
+      int64_t presentation_time,
+      uint64_t duration,
+      bool end_of_stream,
+      uint64_t size,
+      void* payload,
+      Allocator* allocator) :
+      presentation_time_(presentation_time),
+      duration_(duration),
+      end_of_stream_(end_of_stream),
+      size_(size),
+      payload_(payload),
+      allocator_(allocator) {
+    DCHECK((size == 0) == (payload == nullptr));
+  }
+
+  ~PacketImpl() override {};
+
+  int64_t presentation_time() const override { return presentation_time_; }
+
+  uint64_t duration() const override { return duration_; }
+
+  bool end_of_stream() const override { return end_of_stream_; }
+
+  uint64_t size() const override { return size_; }
+
+  void* payload() const override { return payload_; }
+
+ protected:
+  void Release() override {
+    if (payload_ != nullptr && allocator_ != nullptr) {
+      DCHECK(allocator_);
+      allocator_->ReleasePayloadBuffer(size_, payload_);
+    }
+    delete this;
+  }
+
+ private:
+  int64_t presentation_time_;
+  uint64_t duration_;
+  bool end_of_stream_;
+  uint64_t size_;
+  void* payload_;
+  Allocator* allocator_;
+};
+
+// static
+PacketPtr Packet::Create(
+    int64_t presentation_time,
+    uint64_t duration,
+    bool end_of_stream,
+    uint64_t size,
+    void* payload,
+    Allocator* allocator) {
+  DCHECK(payload == nullptr || allocator != nullptr);
+  return PacketPtr(new PacketImpl(
+      presentation_time,
+      duration,
+      end_of_stream,
+      size,
+      payload,
+      allocator));
+}
+
+// static
+PacketPtr Packet::CreateNoAllocator(
+    int64_t presentation_time,
+    uint64_t duration,
+    bool end_of_stream,
+    uint64_t size,
+    void* payload) {
+  return PacketPtr(new PacketImpl(
+      presentation_time,
+      duration,
+      end_of_stream,
+      size,
+      payload,
+      nullptr));
+}
+
+// static
+PacketPtr Packet::CreateEndOfStream(int64_t presentation_time) {
+  return PacketPtr(new PacketImpl(
+      presentation_time,
+      0, // duration
+      true, // end_of_stream
+      0, // size
+      nullptr, // payload
+      nullptr)); // allocator
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/packet.h b/services/media/framework/packet.h
new file mode 100644
index 0000000..bbc0125
--- /dev/null
+++ b/services/media/framework/packet.h
@@ -0,0 +1,84 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PACKET_H_
+#define SERVICES_MEDIA_FRAMEWORK_PACKET_H_
+
+#include <memory>
+
+#include "base/logging.h"
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/ptr.h"
+
+namespace mojo {
+namespace media {
+
+class Packet;
+
+// Used for PacketPtr.
+struct PacketDeleter {
+  void operator()(Packet* ptr) const;
+};
+
+// Unique pointer for packets.
+typedef UniquePtr<Packet, PacketDeleter> PacketPtr;
+
+// Media packet abstract base class. Subclasses may be defined as needed.
+// Packet::Create and Packet::CreateEndOfStream use an implementation with
+// no special behavior.
+// TODO(dalesat): Revisit this definition:
+// 1) We probably need an extensible way to add metadata to packets.
+// 2) The relationship to the allocator could be clearer.
+class Packet {
+ public:
+  // Creates a packet. If size is 0, payload must be nullptr and vice-versa.
+  // If payload is not nullptr, an allocator must be provided.
+  static PacketPtr Create(
+      int64_t presentation_time,
+      uint64_t duration,
+      bool end_of_stream,
+      uint64_t size,
+      void* payload,
+      Allocator* allocator);
+
+  // Creates a packet. If size is 0, payload must be nullptr and vice-versa.
+  // No allocator is provided, and the payload will not be released when the
+  // packet is released.
+  static PacketPtr CreateNoAllocator(
+      int64_t presentation_time,
+      uint64_t duration,
+      bool end_of_stream,
+      uint64_t size,
+      void* payload);
+
+  // Creates an end-of-stream packet with no payload.
+  static PacketPtr CreateEndOfStream(int64_t presentation_time);
+
+  virtual int64_t presentation_time() const = 0;
+
+  virtual uint64_t duration() const = 0;
+
+  virtual bool end_of_stream() const = 0;
+
+  virtual uint64_t size() const = 0;
+
+  virtual void* payload() const = 0;
+
+ protected:
+  virtual ~Packet() {}
+
+  virtual void Release() = 0;
+
+  friend PacketDeleter;
+};
+
+inline void PacketDeleter::operator()(Packet* ptr) const {
+  DCHECK(ptr);
+  ptr->Release();
+}
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PACKET_H_
diff --git a/services/media/framework/parts/decoder.h b/services/media/framework/parts/decoder.h
new file mode 100644
index 0000000..1818dc6
--- /dev/null
+++ b/services/media/framework/parts/decoder.h
@@ -0,0 +1,42 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_DECODER_H_
+#define SERVICES_MEDIA_FRAMEWORK_PARTS_DECODER_H_
+
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/models/packet_transform.h"
+#include "services/media/framework/packet.h"
+#include "services/media/framework/result.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+class Decoder;
+
+typedef SharedPtr<Decoder, PacketTransform> DecoderPtr;
+
+// Abstract base class for transforms that decode compressed media.
+class Decoder : public PacketTransform {
+ public:
+  // Creates a Decoder object for a given stream type.
+  static Result Create(
+      const StreamTypePtr& stream_type,
+      DecoderPtr* decoder_out);
+
+  ~Decoder() override {}
+
+  // Returns the type of the stream the decoder will produce.
+  virtual StreamTypePtr output_stream_type() = 0;
+
+ protected:
+  // Initializes the decoder. Called by Decoder::Create.
+  virtual Result Init(const StreamTypePtr& stream_type) = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PARTS_DECODER_H_
diff --git a/services/media/framework/parts/demux.h b/services/media/framework/parts/demux.h
new file mode 100644
index 0000000..a522cda
--- /dev/null
+++ b/services/media/framework/parts/demux.h
@@ -0,0 +1,77 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_DEMUX_H_
+#define SERVICES_MEDIA_FRAMEWORK_PARTS_DEMUX_H_
+
+#include <memory>
+#include <vector>
+
+#include "services/media/framework/metadata.h"
+#include "services/media/framework/models/multistream_packet_source.h"
+#include "services/media/framework/packet.h"
+#include "services/media/framework/parts/reader.h"
+#include "services/media/framework/result.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+class Demux;
+
+typedef SharedPtr<Demux, MultiStreamPacketSource> DemuxPtr;
+
+// Abstract base class for sources that parse input from a reader and
+// produce one or more output streams.
+class Demux : public MultiStreamPacketSource {
+ public:
+  // Represents a stream produced by the demux.
+  class DemuxStream {
+    // TODO(dalesat): Replace this class with stream_type_, unless more stuff
+    // needs to be added.
+   public:
+    virtual ~DemuxStream() {}
+
+    virtual uint32_t index() const = 0;
+
+    virtual StreamTypePtr stream_type() const = 0;
+  };
+
+  // Creates a Demux object for a given reader.
+  static Result Create(ReaderPtr reader, DemuxPtr* demux_out);
+
+  ~Demux() override {}
+
+  // TODO(dalesat): Don't let the demux talk to the reader. We're doing it this
+  // way now because of ffmpeg's synchronous read call. Ideally, the demux
+  // would tell its owner how much data it needs from the reader, and the
+  // owner would later hand over the data and possibly get a packet back.
+
+  // TODO(dalesat): Implement seek, etc.
+
+  // TODO(dalesat): Make the demux use an allocator. Ffmpeg demuxes don't
+  // support this.
+
+  // Initializes the demux.
+  virtual Result Init(ReaderPtr reader) = 0;
+
+  // Gets the current metadata.
+  virtual MetadataPtr metadata() const = 0;
+
+  // Gets the stream collection.
+  virtual const std::vector<DemuxStream*>& streams() const = 0;
+
+  // MultiStreamProducer implementation (deferred to subclasses).
+  // bool can_accept_allocator(uint32_t stream_index) const override;
+  // void set_allocator(uint32_t stream_index, Allocator* allocator) override;
+
+  // MultiStreamPacketSource implementation (deferred to subclasses).
+  // uint32_t stream_count() const override;
+  // PacketPtr PullPacket(uint32_t* stream_index_out) override;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PARTS_DEMUX_H_
diff --git a/services/media/framework/parts/file_reader.cc b/services/media/framework/parts/file_reader.cc
new file mode 100644
index 0000000..6c68d05
--- /dev/null
+++ b/services/media/framework/parts/file_reader.cc
@@ -0,0 +1,67 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdio>
+
+#include "base/files/file_util.h"
+#include "services/media/framework/parts/file_reader.h"
+#include "url/gurl.h"
+
+namespace mojo {
+namespace media {
+
+FileReader::~FileReader() {
+  if (file_ != nullptr) {
+    fclose(file_);
+    file_ = nullptr;
+  }
+}
+
+Result FileReader::Init(const GURL& gurl) {
+  // TODO(dalesat): Assumes the authority is localhost.
+  std::string path = gurl.path();
+  file_ = base::OpenFile(base::FilePath(path), "rb");
+  if (file_ == nullptr) {
+    return Result::kNotFound;
+  }
+
+  if (fseek(file_, 0, SEEK_END) == 0) {
+    size_ = ftell(file_);
+    if (fseek(file_, 0, SEEK_SET) < 0) {
+      fclose(file_);
+      file_ = nullptr;
+      return Result::kUnsupportedOperation;
+    }
+  } else {
+    size_ = -1;
+  }
+
+  return Result::kOk;
+}
+
+size_t FileReader::Read(uint8* buffer, int bytes_to_read) {
+  return fread(buffer, 1, bytes_to_read, file_);
+}
+
+int64_t FileReader::GetPosition() const {
+  return ftell(file_);
+}
+
+int64_t FileReader::SetPosition(int64 position) {
+  if (fseek(file_, position, SEEK_SET) < 0) {
+    return -1;
+  }
+  return position;
+}
+
+size_t FileReader::GetSize() const {
+  return size_;
+}
+
+bool FileReader::CanSeek() const {
+  return true;
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/parts/file_reader.h b/services/media/framework/parts/file_reader.h
new file mode 100644
index 0000000..ba7c588
--- /dev/null
+++ b/services/media/framework/parts/file_reader.h
@@ -0,0 +1,45 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_FILE_READER_H_
+#define SERVICES_MEDIA_FRAMEWORK_PARTS_FILE_READER_H_
+
+#include "services/media/framework/parts/reader.h"
+
+namespace mojo {
+namespace media {
+
+// Reads raw data from a file.
+class FileReader : public Reader {
+ public:
+  static ReaderPtr Create() {
+    return ReaderPtr(new FileReader());
+  }
+
+  ~FileReader() override;
+
+  // Reader implementation.
+  Result Init(const GURL& gurl) override;
+
+  size_t Read(uint8* buffer, int bytes_to_read) override;
+
+  int64_t GetPosition() const override;
+
+  int64_t SetPosition(int64 position) override;
+
+  size_t GetSize() const override;
+
+  bool CanSeek() const override;
+
+ private:
+  FileReader() {}
+
+  FILE* file_;
+  int64 size_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PARTS_FILE_READER_H_
diff --git a/services/media/framework/parts/lpcm_reformatter.cc b/services/media/framework/parts/lpcm_reformatter.cc
new file mode 100644
index 0000000..03beb25
--- /dev/null
+++ b/services/media/framework/parts/lpcm_reformatter.cc
@@ -0,0 +1,348 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/parts/lpcm_reformatter.h"
+
+namespace mojo {
+namespace media {
+
+// LpcmReformatter implementation that accepts samples of type TIn and
+// produces samples of type TOut.
+template<typename TIn, typename TOut>
+class LpcmReformatterImpl : public LpcmReformatter {
+ public:
+  LpcmReformatterImpl(
+      const LpcmStreamType& in_type,
+      const LpcmStreamTypeSet& out_type);
+
+  ~LpcmReformatterImpl() override;
+
+  // LpcmTransform implementation.
+  const LpcmStreamType& input_stream_type() const override;
+
+  const LpcmStreamType& output_stream_type() const override;
+
+  void TransformFrames(
+      LpcmFrameBuffer* source,
+      LpcmFrameBuffer* dest,
+      bool mix) override;
+
+ private:
+  LpcmStreamType in_type_;
+  LpcmStreamType out_type_;
+};
+
+LpcmReformatterPtr LpcmReformatter::Create(
+    const LpcmStreamType& in_type,
+    const LpcmStreamTypeSet& out_type) {
+  LpcmReformatter* result = nullptr;
+
+  switch (in_type.sample_format()) {
+    case LpcmStreamType::SampleFormat::kUnsigned8:
+      switch (out_type.sample_format()) {
+        case LpcmStreamType::SampleFormat::kUnsigned8:
+        case LpcmStreamType::SampleFormat::kAny:
+          result = new LpcmReformatterImpl<uint8_t, uint8_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned16:
+          result = new LpcmReformatterImpl<uint8_t, int16_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned24In32:
+          result = new LpcmReformatterImpl<uint8_t, int32_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kFloat:
+          result = new LpcmReformatterImpl<uint8_t, float>(
+              in_type, out_type);
+          break;
+        default:
+          NOTREACHED() << "unsupported sample format";
+          result = nullptr;
+          break;
+      }
+      break;
+    case LpcmStreamType::SampleFormat::kSigned16:
+      switch (out_type.sample_format()) {
+        case LpcmStreamType::SampleFormat::kUnsigned8:
+          result = new LpcmReformatterImpl<int16_t, uint8_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned16:
+        case LpcmStreamType::SampleFormat::kAny:
+          result = new LpcmReformatterImpl<int16_t, int16_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned24In32:
+          result = new LpcmReformatterImpl<int16_t, int32_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kFloat:
+          result = new LpcmReformatterImpl<int16_t, float>(
+              in_type, out_type);
+          break;
+        default:
+          NOTREACHED() << "unsupported sample format";
+          result = nullptr;
+          break;
+      }
+      break;
+    case LpcmStreamType::SampleFormat::kSigned24In32:
+      switch (out_type.sample_format()) {
+        case LpcmStreamType::SampleFormat::kUnsigned8:
+          result = new LpcmReformatterImpl<int32_t, uint8_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned16:
+          result = new LpcmReformatterImpl<int32_t, int16_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned24In32:
+        case LpcmStreamType::SampleFormat::kAny:
+          result = new LpcmReformatterImpl<int32_t, int32_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kFloat:
+          result = new LpcmReformatterImpl<int32_t, float>(
+              in_type, out_type);
+          break;
+        default:
+          NOTREACHED() << "unsupported sample format";
+          result = nullptr;
+          break;
+      }
+      break;
+    case LpcmStreamType::SampleFormat::kFloat:
+      switch (out_type.sample_format()) {
+        case LpcmStreamType::SampleFormat::kUnsigned8:
+          result = new LpcmReformatterImpl<float, uint8_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned16:
+          result = new LpcmReformatterImpl<float, int16_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kSigned24In32:
+          result = new LpcmReformatterImpl<float, int32_t>(
+              in_type, out_type);
+          break;
+        case LpcmStreamType::SampleFormat::kFloat:
+        case LpcmStreamType::SampleFormat::kAny:
+          result = new LpcmReformatterImpl<float, float>(in_type, out_type);
+          break;
+        default:
+          NOTREACHED() << "unsupported sample format";
+          result = nullptr;
+          break;
+      }
+      break;
+    default:
+      NOTREACHED() << "unsupported sample format";
+      result = nullptr;
+      break;
+  }
+
+  return LpcmReformatterPtr(result);
+}
+
+template<typename TIn, typename TOut>
+LpcmReformatterImpl<TIn, TOut>::LpcmReformatterImpl(
+    const LpcmStreamType& in_type,
+    const LpcmStreamTypeSet& out_type) :
+    in_type_(in_type),
+    out_type_(
+        out_type.sample_format() == LpcmStreamType::SampleFormat::kAny ?
+            in_type.sample_format() :
+            out_type.sample_format(),
+        in_type.channels(),
+        in_type.frames_per_second()) {}
+
+template<typename TIn, typename TOut>
+LpcmReformatterImpl<TIn, TOut>::~LpcmReformatterImpl() {}
+
+namespace {
+
+template <typename T>
+inline constexpr T Clamp(T val, T min, T max) {
+  return (val > max) ? max : ((val < min) ? min : val);
+}
+
+template <typename T>
+inline constexpr T Clamp(T val);
+
+template <>
+inline constexpr float Clamp(float val) {
+  return Clamp(val, -1.0f, 1.0f);
+}
+
+template <>
+inline constexpr int32_t Clamp(int32_t val) {
+  return Clamp(val, 1 << 23, -(1 << 23));
+}
+
+template<typename TIn, typename TOut>
+inline void CopySample(TOut* dest, TIn* source) {
+  *dest = static_cast<TOut>(*source);
+}
+
+inline void CopySample(uint8_t* dest, int16_t* source) {
+  *dest = static_cast<uint8_t>((*source >> 8) ^ 0x80);
+}
+
+inline void CopySample(uint8_t* dest, int32_t* source) {
+  *dest = static_cast<uint8_t>((Clamp(*source) >> 16) ^ 0x80);
+}
+
+inline void CopySample(uint8_t* dest, float* source) {
+  *dest = static_cast<uint8_t>((Clamp(*source) * 0x7f) + 128);
+}
+
+inline void CopySample(int16_t* dest, uint8_t* source) {
+  *dest = static_cast<int16_t>(*source ^ 0x80) << 8;
+}
+
+inline void CopySample(int16_t* dest, int32_t* source) {
+  *dest = static_cast<int16_t>(Clamp(*source) >> 8);
+}
+
+inline void CopySample(int16_t* dest, float* source) {
+  *dest = static_cast<int16_t>(Clamp(*source) * 0x7fff);
+}
+
+inline void CopySample(int32_t* dest, uint8_t* source) {
+  *dest = static_cast<int32_t>(*source ^ 0x80) << 16;
+}
+
+inline void CopySample(int32_t* dest, int16_t* source) {
+  *dest = static_cast<int32_t>(*source << 8);
+}
+
+inline void CopySample(int32_t* dest, float* source) {
+  *dest = static_cast<int32_t>(Clamp(*source) * 0x7fffff);
+}
+
+inline void CopySample(float* dest, uint8_t* source) {
+  *dest = static_cast<float>(*source ^ 0x80) / 0x80;
+}
+
+inline void CopySample(float* dest, int16_t* source) {
+  *dest = static_cast<float>(*source) / 0x8000;
+}
+
+inline void CopySample(float* dest, int32_t* source) {
+  *dest = static_cast<float>(Clamp(*source)) / 0x800000;
+}
+
+template<typename TIn, typename TOut>
+inline void MixSample(TOut* dest, TIn* source) {
+  *dest += static_cast<TOut>(*source);
+}
+
+inline void MixSample(uint8_t* dest, int16_t* source) {
+  *dest += static_cast<uint8_t>((*source >> 8) ^ 0x80);
+}
+
+inline void MixSample(uint8_t* dest, int32_t* source) {
+  *dest += static_cast<uint8_t>((Clamp(*source) >> 16) ^ 0x80);
+}
+
+inline void MixSample(uint8_t* dest, float* source) {
+  *dest += static_cast<uint8_t>((Clamp(*source) * 0x7f) + 128);
+}
+
+inline void MixSample(int16_t* dest, uint8_t* source) {
+  *dest += static_cast<int16_t>(*source ^ 0x80) << 8;
+}
+
+inline void MixSample(int16_t* dest, int32_t* source) {
+  *dest += static_cast<int16_t>(Clamp(*source) >> 8);
+}
+
+inline void MixSample(int16_t* dest, float* source) {
+  *dest += static_cast<int16_t>(Clamp(*source) * 0x7fff);
+}
+
+inline void MixSample(int32_t* dest, uint8_t* source) {
+  *dest += static_cast<int32_t>(*source ^ 0x80) << 16;
+}
+
+inline void MixSample(int32_t* dest, int16_t* source) {
+  *dest += static_cast<int32_t>(*source << 8);
+}
+
+inline void MixSample(int32_t* dest, float* source) {
+  *dest += static_cast<int32_t>(Clamp(*source) * 0x7fffff);
+}
+
+inline void MixSample(float* dest, uint8_t* source) {
+  *dest += static_cast<float>(*source ^ 0x80) / 0x80;
+}
+
+inline void MixSample(float* dest, int16_t* source) {
+  *dest += static_cast<float>(*source) / 0x8000;
+}
+
+inline void MixSample(float* dest, int32_t* source) {
+  *dest += static_cast<float>(Clamp(*source)) / 0x800000;
+}
+
+} // namespace
+
+template<typename TIn, typename TOut>
+const LpcmStreamType& LpcmReformatterImpl<TIn, TOut>::input_stream_type()
+    const {
+  return in_type_;
+}
+
+template<typename TIn, typename TOut>
+const LpcmStreamType& LpcmReformatterImpl<TIn, TOut>::output_stream_type()
+    const {
+  return out_type_;
+}
+
+template<typename TIn, typename TOut>
+void LpcmReformatterImpl<TIn, TOut>::TransformFrames(
+    LpcmFrameBuffer* source,
+    LpcmFrameBuffer* dest,
+    bool mix) {
+  DCHECK(source);
+  DCHECK(dest);
+  DCHECK(source->buffer());
+  DCHECK(source->frame_count());
+  DCHECK(dest->buffer());
+  DCHECK(dest->frame_count());
+
+  uint64_t frame_count = std::min(source->frame_count(), dest->frame_count());
+
+  uint8_t* in_channel = static_cast<uint8_t*>(source->buffer());
+  uint8_t* out_channel = static_cast<uint8_t*>(dest->buffer());
+
+  for (uint32_t channel = 0; channel < in_type_.channels(); channel++) {
+    TIn* in_sample = reinterpret_cast<TIn*>(in_channel);
+    TOut* out_sample = reinterpret_cast<TOut*>(out_channel);
+    if (mix) {
+      for (uint64_t sample = 0; sample < frame_count; sample++) {
+        MixSample(out_sample, in_sample);
+        in_sample += in_type_.channels();
+        out_sample += out_type_.channels();
+      }
+    } else {
+      for (uint64_t sample = 0; sample < frame_count; sample++) {
+        CopySample(out_sample, in_sample);
+        in_sample += in_type_.channels();
+        out_sample += out_type_.channels();
+      }
+    }
+    in_channel += in_type_.sample_size();
+    out_channel += out_type_.sample_size();
+  }
+
+  source->Advance(frame_count);
+  dest->Advance(frame_count);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/parts/lpcm_reformatter.h b/services/media/framework/parts/lpcm_reformatter.h
new file mode 100644
index 0000000..b275303
--- /dev/null
+++ b/services/media/framework/parts/lpcm_reformatter.h
@@ -0,0 +1,30 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_LPCM_REFORMATTER_H_
+#define SERVICES_MEDIA_FRAMEWORK_PARTS_LPCM_REFORMATTER_H_
+
+#include "services/media/framework/models/lpcm_transform.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+class LpcmReformatter;
+
+typedef SharedPtr<LpcmReformatter, LpcmTransform> LpcmReformatterPtr;
+
+// A transform that reformats samples.
+// TODO(dalesat): Some variations on this could be InPlacePacketTransforms.
+class LpcmReformatter : public LpcmTransform {
+ public:
+  static LpcmReformatterPtr Create(
+      const LpcmStreamType& in_type,
+      const LpcmStreamTypeSet& out_type);
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PARTS_LPCM_REFORMATTER_H_
diff --git a/services/media/framework/parts/null_sink.cc b/services/media/framework/parts/null_sink.cc
new file mode 100644
index 0000000..9c9b29c
--- /dev/null
+++ b/services/media/framework/parts/null_sink.cc
@@ -0,0 +1,36 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/parts/null_sink.h"
+
+namespace mojo {
+namespace media {
+
+NullSink::NullSink() {}
+
+NullSink::~NullSink() {}
+
+bool NullSink::must_allocate() const {
+  return false;
+}
+
+Allocator* NullSink::allocator() {
+  return nullptr;
+}
+
+void NullSink::SetDemandCallback(DemandCallback demand_callback) {
+  demand_callback_ = demand_callback;
+}
+
+void NullSink::Prime() {
+  DCHECK(demand_callback_);
+  demand_callback_(Demand::kNeutral);
+}
+
+Demand NullSink::SupplyPacket(PacketPtr packet) {
+  return Demand::kNeutral;
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/parts/null_sink.h b/services/media/framework/parts/null_sink.h
new file mode 100644
index 0000000..41846b5
--- /dev/null
+++ b/services/media/framework/parts/null_sink.h
@@ -0,0 +1,44 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_NULL_SINK_H_
+#define SERVICES_MEDIA_FRAMEWORK_PARTS_NULL_SINK_H_
+
+#include "services/media/framework/models/active_sink.h"
+
+namespace mojo {
+namespace media {
+
+class NullSink;
+
+typedef SharedPtr<NullSink, ActiveSink> NullSinkPtr;
+
+// Sink that throws packets away.
+class NullSink : public ActiveSink {
+ public:
+  static NullSinkPtr Create() { return NullSinkPtr(new NullSink()); }
+
+  ~NullSink() override;
+
+  // ActiveSink implementation.
+  bool must_allocate() const override;
+
+  Allocator* allocator() override;
+
+  void SetDemandCallback(DemandCallback demand_callback) override;
+
+  void Prime() override;
+
+  Demand SupplyPacket(PacketPtr packet) override;
+
+ private:
+  NullSink();
+
+  DemandCallback demand_callback_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_PARTS_NULL_SINK_H_
diff --git a/services/media/framework/parts/reader.cc b/services/media/framework/parts/reader.cc
new file mode 100644
index 0000000..31da7fc
--- /dev/null
+++ b/services/media/framework/parts/reader.cc
@@ -0,0 +1,37 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/parts/file_reader.h"
+#include "services/media/framework/parts/reader.h"
+#include "url/gurl.h"
+
+namespace mojo {
+namespace media {
+
+Result Reader::Create(const GURL& gurl, ReaderPtr* reader_out) {
+  if (!gurl.is_valid()) {
+    return Result::kInvalidArgument;
+  }
+
+  ReaderPtr reader = nullptr;
+  if (gurl.SchemeIsFile()) {
+    reader = FileReader::Create();
+  }
+  // TODO(dalesat): More schemes.
+
+  if (reader == nullptr) {
+    return Result::kUnsupportedOperation;
+  }
+
+  Result result = reader->Init(gurl);
+  if (result == Result::kOk) {
+    *reader_out = reader;
+  }
+
+  return result;
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/parts/reader.h b/services/media/framework/parts/reader.h
new file mode 100644
index 0000000..4aaa982
--- /dev/null
+++ b/services/media/framework/parts/reader.h
@@ -0,0 +1,54 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PARTS_READER_H_
+#define SERVICES_MEDIA_FRAMEWORK_PARTS_READER_H_
+
+#include <memory>
+
+#include "services/media/framework/result.h"
+#include "url/gurl.h"
+
+namespace mojo {
+namespace media {
+
+class Reader;
+
+typedef std::shared_ptr<Reader> ReaderPtr;
+
+// Abstract base class for objects that read raw data on behalf of demuxes.
+// This model is synchronous, because that's how ffmpeg works.
+class Reader {
+ public:
+  // Creates a Reader object for a given url.
+  static Result Create(const GURL& gurl, ReaderPtr* reader_out);
+
+  virtual ~Reader() {}
+
+  // Initializes the reader.
+  virtual Result Init(const GURL& gurl) = 0;
+
+  // Reads the given number of bytes into the buffer and returns the number of
+  // bytes read. Returns -1 if the operation fails.
+  virtual size_t Read(uint8* buffer, int bytes_to_read) = 0;
+
+  // Gets the current position or -1 if the operation fails.
+  virtual int64_t GetPosition() const = 0;
+
+  // Seeks to the given position and returns it. Returns -1 if the operation
+  // fails.
+  virtual int64_t SetPosition(int64 position) = 0;
+
+  // Returns the file size. Returns -1 if the operation fails or the size isn't
+  // known.
+  virtual size_t GetSize() const = 0;
+
+  // Returns true if this object supports seeking, false otherwise.
+  virtual bool CanSeek() const = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // MOJO_SERVICES_MEDIA_READER_H_
diff --git a/services/media/framework/ptr.h b/services/media/framework/ptr.h
new file mode 100644
index 0000000..824f1a7
--- /dev/null
+++ b/services/media/framework/ptr.h
@@ -0,0 +1,64 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_PTR_H_
+#define SERVICES_MEDIA_FRAMEWORK_PTR_H_
+
+#include <memory>
+
+namespace mojo {
+namespace media {
+
+// unique_ptr with Clone.
+// TODO(dalesat): Remove in favor of unique_ptr and a Clone template function.
+template<class T, class Deleter = std::default_delete<T>>
+class UniquePtr : public std::unique_ptr<T, Deleter> {
+ public:
+  UniquePtr() : std::unique_ptr<T, Deleter>() {}
+
+  UniquePtr(std::nullptr_t) : std::unique_ptr<T, Deleter>() {}
+
+  explicit UniquePtr(T* ptr) : std::unique_ptr<T, Deleter>(ptr) {}
+
+  UniquePtr(UniquePtr&& other) :
+      std::unique_ptr<T, Deleter>(std::move(other)) {}
+
+  UniquePtr& operator=(std::nullptr_t) {
+    this->reset();
+    return *this;
+  }
+
+  UniquePtr& operator=(UniquePtr&& other) {
+    *static_cast<std::unique_ptr<T, Deleter>*>(this) = std::move(other);
+    return *this;
+  }
+
+  UniquePtr Clone() const { return *this ? this->get()->Clone() : UniquePtr(); }
+};
+
+// shared_ptr with upcast to TBase.
+// TODO(dalesat): Remove in favor of shared_ptr.
+template<class T, typename TBase>
+class SharedPtr : public std::shared_ptr<T> {
+ public:
+  SharedPtr() : std::shared_ptr<T>() {}
+
+  SharedPtr(std::nullptr_t) : std::shared_ptr<T>() {}
+
+  explicit SharedPtr(T* ptr) : std::shared_ptr<T>(ptr) {}
+
+  SharedPtr& operator=(std::nullptr_t) {
+    this->reset();
+    return *this;
+  }
+
+  operator std::shared_ptr<TBase>() const {
+    return std::shared_ptr<TBase>(*this, this->get());
+  }
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_PTR_H_
diff --git a/services/media/framework/result.h b/services/media/framework/result.h
new file mode 100644
index 0000000..ae6fc6d
--- /dev/null
+++ b/services/media/framework/result.h
@@ -0,0 +1,26 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_RESULT_H_
+#define SERVICES_MEDIA_FRAMEWORK_RESULT_H_
+
+#include <cstdint>
+
+namespace mojo {
+namespace media {
+
+// Possible result values indicating success or type of failure.
+enum class Result {
+  kOk,
+  kUnknownError,
+  kInternalError,
+  kUnsupportedOperation,
+  kInvalidArgument,
+  kNotFound
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_RESULT_H_
diff --git a/services/media/framework/stages/active_sink_stage.cc b/services/media/framework/stages/active_sink_stage.cc
new file mode 100644
index 0000000..c49b55e
--- /dev/null
+++ b/services/media/framework/stages/active_sink_stage.cc
@@ -0,0 +1,67 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/active_sink_stage.h"
+
+namespace mojo {
+namespace media {
+
+ActiveSinkStage::ActiveSinkStage(ActiveSinkPtr sink) : sink_(sink) {
+  DCHECK(sink_);
+
+  demand_function_ = [this](Demand demand) {
+    DCHECK(update_callback_);
+    if (sink_demand_ != demand) {
+      sink_demand_ = demand;
+      update_callback_(this);
+    }
+  };
+
+  sink_->SetDemandCallback(demand_function_);
+}
+
+ActiveSinkStage::~ActiveSinkStage() {}
+
+uint32_t ActiveSinkStage::input_count() const {
+  return 1;
+};
+
+StageInput& ActiveSinkStage::input(uint32_t index) {
+  DCHECK_EQ(index, 0u);
+  return input_;
+}
+
+uint32_t ActiveSinkStage::output_count() const {
+  return 0;
+}
+
+StageOutput& ActiveSinkStage::output(uint32_t index) {
+  NOTREACHED();
+  static StageOutput result;
+  return result;
+}
+
+bool ActiveSinkStage::Prepare(UpdateCallback update_callback) {
+  input_.Prepare(sink_->allocator(), sink_->must_allocate());
+  update_callback_ = update_callback;
+  return true;
+}
+
+void ActiveSinkStage::Prime() {
+  sink_->Prime();
+}
+
+void ActiveSinkStage::Update(Engine* engine) {
+  DCHECK(engine);
+
+  if (input_.packet_from_upstream()) {
+    sink_demand_ =
+        sink_->SupplyPacket(std::move(input_.packet_from_upstream()));
+  }
+
+  input_.SetDemand(sink_demand_, engine);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/active_sink_stage.h b/services/media/framework/stages/active_sink_stage.h
new file mode 100644
index 0000000..039698a
--- /dev/null
+++ b/services/media/framework/stages/active_sink_stage.h
@@ -0,0 +1,49 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SINK_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SINK_STAGE_H_
+
+#include <deque>
+
+#include "services/media/framework/models/active_sink.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts an ActiveSink.
+class ActiveSinkStage : public Stage {
+ public:
+  ActiveSinkStage(ActiveSinkPtr source);
+
+  ~ActiveSinkStage() override;
+
+  // Stage implementation.
+  uint32_t input_count() const override;
+
+  StageInput& input(uint32_t index) override;
+
+  uint32_t output_count() const override;
+
+  StageOutput& output(uint32_t index) override;
+
+  bool Prepare(UpdateCallback update_callback) override;
+
+  void Prime() override;
+
+  void Update(Engine* engine) override;
+
+ private:
+  StageInput input_;
+  ActiveSinkPtr sink_;
+  ActiveSink::DemandCallback demand_function_;
+  Stage::UpdateCallback update_callback_;
+  Demand sink_demand_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SINK_STAGE_H_
diff --git a/services/media/framework/stages/active_source_stage.cc b/services/media/framework/stages/active_source_stage.cc
new file mode 100644
index 0000000..6dd2fc9
--- /dev/null
+++ b/services/media/framework/stages/active_source_stage.cc
@@ -0,0 +1,69 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/active_source_stage.h"
+
+namespace mojo {
+namespace media {
+
+ActiveSourceStage::ActiveSourceStage(ActiveSourcePtr source) : source_(source) {
+  DCHECK(source_);
+
+  supply_function_ = [this](PacketPtr packet) {
+    bool packets_was_empty_ = packets_.empty();
+    packets_.push_back(std::move(packet));
+    if (packets_was_empty_ && update_callback_) {
+      update_callback_(this);
+    }
+  };
+
+  source_->SetSupplyCallback(supply_function_);
+}
+
+ActiveSourceStage::~ActiveSourceStage() {}
+
+uint32_t ActiveSourceStage::input_count() const {
+  return 0;
+};
+
+StageInput& ActiveSourceStage::input(uint32_t index) {
+  NOTREACHED();
+  static StageInput result;
+  return result;
+}
+
+uint32_t ActiveSourceStage::output_count() const {
+  return 1;
+}
+
+StageOutput& ActiveSourceStage::output(uint32_t index) {
+  DCHECK_EQ(index, 0u);
+  return output_;
+}
+
+bool ActiveSourceStage::Prepare(UpdateCallback update_callback) {
+  update_callback_ = update_callback;
+  Allocator* allocator = output_.Prepare(source_->can_accept_allocator());
+  if (allocator) {
+    DCHECK(source_->can_accept_allocator());
+    source_->set_allocator(allocator);
+  }
+  return true;
+}
+
+void ActiveSourceStage::Update(Engine* engine) {
+  DCHECK(engine);
+
+  Demand demand = output_.demand();
+
+  source_->SetDownstreamDemand(demand);
+
+  if (demand != Demand::kNegative && !packets_.empty()) {
+    output_.SupplyPacket(std::move(packets_.front()), engine);
+    packets_.pop_front();
+  }
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/active_source_stage.h b/services/media/framework/stages/active_source_stage.h
new file mode 100644
index 0000000..c89ef25
--- /dev/null
+++ b/services/media/framework/stages/active_source_stage.h
@@ -0,0 +1,47 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SOURCE_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SOURCE_STAGE_H_
+
+#include <deque>
+
+#include "services/media/framework/models/active_source.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts an ActiveSource.
+class ActiveSourceStage : public Stage {
+ public:
+  ActiveSourceStage(ActiveSourcePtr source);
+
+  ~ActiveSourceStage() override;
+
+  // Stage implementation.
+  uint32_t input_count() const override;
+
+  StageInput& input(uint32_t index) override;
+
+  uint32_t output_count() const override;
+
+  StageOutput& output(uint32_t index) override;
+
+  bool Prepare(UpdateCallback update_callback) override;
+
+  void Update(Engine* engine) override;
+
+ private:
+  StageOutput output_;
+  ActiveSourcePtr source_;
+  ActiveSource::SupplyCallback supply_function_;
+  Stage::UpdateCallback update_callback_;
+  std::deque<PacketPtr> packets_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_ACTIVE_SOURCE_STAGE_H_
diff --git a/services/media/framework/stages/distributor_stage.cc b/services/media/framework/stages/distributor_stage.cc
new file mode 100644
index 0000000..749de98
--- /dev/null
+++ b/services/media/framework/stages/distributor_stage.cc
@@ -0,0 +1,91 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/distributor_stage.h"
+
+namespace mojo {
+namespace media {
+
+DistributorStage::DistributorStage(
+    MultiStreamPacketSourcePtr packet_source) :
+    packet_source_(packet_source),
+    ended_streams_(0) {
+  DCHECK(packet_source);
+  outputs_.resize(packet_source->stream_count());
+}
+
+DistributorStage::~DistributorStage() {}
+
+uint32_t DistributorStage::input_count() const {
+  return 0;
+};
+
+StageInput& DistributorStage::input(uint32_t index) {
+  NOTREACHED();
+  static StageInput result;
+  return result;
+}
+
+uint32_t DistributorStage::output_count() const {
+  return outputs_.size();
+}
+
+StageOutput& DistributorStage::output(uint32_t index) {
+  DCHECK(index < outputs_.size());
+  return outputs_[index];
+}
+
+bool DistributorStage::Prepare(UpdateCallback update_callback) {
+  for (StageOutput& output : outputs_) {
+    output.Prepare(false);
+  }
+  return false;
+}
+
+void DistributorStage::Update(Engine* engine) {
+  DCHECK(engine);
+
+  bool has_positive_demand = false;
+  for (StageOutput& output : outputs_) {
+    if (output.demand() == Demand::kPositive) {
+      has_positive_demand = true;
+      break;
+    }
+  }
+
+  while (true) {
+    if (cached_packet_ && has_positive_demand) {
+      DCHECK(cached_packet_output_index_ < outputs_.size());
+      StageOutput& output = outputs_[cached_packet_output_index_];
+
+      if (output.demand() != Demand::kNegative) {
+        // cached_packet_ is intended for an output which will accept packets.
+        output.SupplyPacket(std::move(cached_packet_), engine);
+      } else {
+      }
+    }
+
+    if (cached_packet_) {
+      // There's still a cached packet. We're done for now.
+      return;
+    }
+
+    if (ended_streams_ == outputs_.size()) {
+      // We've seen end-of-stream for all streams. All done.
+      return;
+    }
+
+    // Pull a packet from the source.
+    cached_packet_ = packet_source_->PullPacket(&cached_packet_output_index_);
+    DCHECK(cached_packet_);
+    DCHECK(cached_packet_output_index_ < outputs_.size());
+
+    if (cached_packet_->end_of_stream()) {
+      ended_streams_++;
+    }
+  }
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/distributor_stage.h b/services/media/framework/stages/distributor_stage.h
new file mode 100644
index 0000000..c608ffe
--- /dev/null
+++ b/services/media/framework/stages/distributor_stage.h
@@ -0,0 +1,47 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_DISTRIBUTOR_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_DISTRIBUTOR_STAGE_H_
+
+#include <vector>
+
+#include "services/media/framework/models/multistream_packet_source.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts a MultiStreamPacketSource.
+class DistributorStage : public Stage {
+ public:
+  DistributorStage(MultiStreamPacketSourcePtr packet_source);
+
+  ~DistributorStage() override;
+
+  // Stage implementation.
+  uint32_t input_count() const override;
+
+  StageInput& input(uint32_t index) override;
+
+  uint32_t output_count() const override;
+
+  StageOutput& output(uint32_t index) override;
+
+  bool Prepare(UpdateCallback update_callback) override;
+
+  void Update(Engine* engine) override;
+
+ private:
+  std::vector<StageOutput> outputs_;
+  MultiStreamPacketSourcePtr packet_source_;
+  PacketPtr cached_packet_;
+  uint32_t cached_packet_output_index_;
+  uint32_t ended_streams_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_DISTRIBUTOR_STAGE_H_
diff --git a/services/media/framework/stages/lpcm_stage_input.cc b/services/media/framework/stages/lpcm_stage_input.cc
new file mode 100644
index 0000000..b215334
--- /dev/null
+++ b/services/media/framework/stages/lpcm_stage_input.cc
@@ -0,0 +1,211 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/stages/lpcm_stage_input.h"
+#include "services/media/framework/stages/lpcm_stage_output.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+LpcmStageInput::LpcmStageInput() :
+    demand_pending_(false),
+    buffer_(nullptr),
+    frame_count_(0),
+    mix_(false),
+    synchronous_(false),
+    end_of_stream_(false) {
+  packet_exhausted_function_ = [this]() {
+    DCHECK(packet_from_upstream());
+    packet_from_upstream().reset();
+  };
+}
+
+LpcmStageInput::~LpcmStageInput() {}
+
+void LpcmStageInput::set_stream_type(const LpcmStreamType& stream_type) {
+  lpcm_util_ = LpcmUtil::Create(stream_type);
+  lpcm_supply_.set_bytes_per_frame(stream_type.bytes_per_frame());
+  packet_frames_.set_bytes_per_frame(stream_type.bytes_per_frame());
+  demand_frames_.set_bytes_per_frame(stream_type.bytes_per_frame());
+}
+
+void LpcmStageInput::SuggestDemand(uint64_t frame_count, Engine* engine) {
+  DCHECK(engine);
+
+  if (demand_pending_ || lpcm_supply_.frame_count() != 0) {
+    // We've already demanded. Too late for this suggestion.
+    return;
+  }
+
+  if (frame_count == 0) {
+    // No demand is suggested.
+    SetDemand(Demand::kNegative, engine);
+    return;
+  }
+
+  if (!connected_to_lpcm()) {
+    // Upstream output isn't LPCM. Demand packets.
+    demand_pending_ = true;
+    SetDemand(Demand::kPositive, engine);
+    return;
+  }
+
+  SetLpcmDemand(
+      GetDemandBuffer(frame_count),
+      frame_count,
+      false,
+      false,
+      engine);
+}
+
+void LpcmStageInput::SetLpcmDemand(
+    void* buffer,
+    uint64_t frame_count,
+    bool mix,
+    bool synchronous,
+    Engine* engine) {
+  DCHECK(engine);
+  DCHECK(connected());
+
+  demand_pending_ = true;
+
+  buffer_ = buffer;
+  frame_count_ = frame_count;
+  mix_ = mix;
+  synchronous_ = synchronous;
+
+  if (synchronous) {
+    // TODO
+    //engine->AddToFallback([lpcm_mate](Engine* engine) {
+    //  lpcm_mate->fallback(engine);
+    //});
+  }
+
+  LpcmStageOutput* lpcm_mate = mate().get_lpcm();
+  if (lpcm_mate != nullptr) {
+    // The upstream output is LPCM.
+    DCHECK(!packet_from_upstream());
+    lpcm_mate->UpdateLpcmDemand(buffer, frame_count, mix, synchronous);
+    engine->PushToDemandBacklogUnsafe(upstream_stage());
+    return;
+  }
+
+  demand_frames_.Set(buffer, frame_count);
+
+  // The upstream output isn't LPCM. See if we can satisfy demand with the
+  // packet from upstream, if there is one.
+  if (CopyOrMixFrames()) {
+    // Frames supplied. Add this stage to the supply backlog.
+    engine->PushToSupplyBacklogUnsafe(mate().downstream_stage());
+  } else {
+    // Frames not supplied. Demand another packet.
+    SetDemand(Demand::kPositive, engine);
+  }
+}
+
+bool LpcmStageInput::SupplyPacketFromOutput(PacketPtr packet) {
+  StageInput::SupplyPacketFromOutput(std::move(packet));
+
+  demand_pending_ = false;
+
+  if (connected_to_lpcm()) {
+    // The upstream output is LPCM, so the packet should be a wrapper for the
+    // demand buffer (buffer_). In this case, we can release the packet,
+    // because the frames are already where we want them.
+    DCHECK(packet_from_upstream()->payload() == buffer_);
+    DCHECK(packet_from_upstream()->duration() <= frame_count_);
+    end_of_stream_ = packet_from_upstream()->end_of_stream();
+    lpcm_supply_.Set(
+        packet_from_upstream()->payload(),
+        packet_from_upstream()->duration());
+    packet_from_upstream().reset();
+    return true;
+  }
+
+  if (buffer_ == nullptr) {
+    // The upstream output isn't LPCM, and the stage hasn't supplied a buffer.
+    // We'll supply frames right out of the packet payload and reset the
+    // packet pointer when the frames are exhausted.
+    DCHECK(!mix_);
+    end_of_stream_ = packet_from_upstream()->end_of_stream();
+    lpcm_supply_.Set(
+        packet_from_upstream()->payload(),
+        packet_from_upstream()->duration(),
+        packet_exhausted_function_);
+    return true;
+  }
+
+  // The upstream output isn't LPCM, and the stage has supplied a buffer. That
+  // means we have to copy or mix from the packet to the supplied buffer. We
+  // initialize packet_frames_ for that purpose and call CopyOrMixFrames.
+  packet_frames_.Set(
+      packet_from_upstream()->payload(),
+      packet_from_upstream()->duration(),
+      packet_exhausted_function_);
+
+  return CopyOrMixFrames();
+}
+
+LpcmStageInput* LpcmStageInput::get_lpcm() {
+  return this;
+}
+
+bool LpcmStageInput::connected_to_lpcm() {
+  return mate().get_lpcm() != nullptr;
+}
+
+bool LpcmStageInput::CopyOrMixFrames() {
+  DCHECK(buffer_);
+  DCHECK(demand_frames_.buffer());
+
+  uint64_t frame_count = std::min(
+      packet_frames_.frame_count(),
+      demand_frames_.frame_count());
+
+  if (frame_count == 0) {
+    return false;
+  }
+
+  DCHECK(packet_from_upstream());
+
+  if (mix_) {
+    lpcm_util_->Mix(
+        packet_frames_.buffer(),
+        demand_frames_.buffer(),
+        frame_count);
+  } else {
+    lpcm_util_->Copy(
+        packet_frames_.buffer(),
+        demand_frames_.buffer(),
+        frame_count);
+  }
+
+  bool end_of_stream = packet_from_upstream()->end_of_stream();
+
+  packet_frames_.Advance(frame_count);
+  demand_frames_.Advance(frame_count);
+
+  if (demand_frames_.frame_count() == 0 ||
+      (packet_frames_.frame_count() == 0 && end_of_stream)) {
+    end_of_stream_ = end_of_stream;
+    lpcm_supply_.Set(buffer_, frame_count_ - demand_frames_.frame_count());
+    demand_frames_.Reset();
+    return true;
+  }
+
+  return false;
+}
+
+void* LpcmStageInput::GetDemandBuffer(uint64_t frame_count) {
+  if (demand_buffer_.size() < frame_count) {
+    demand_buffer_.clear();
+    demand_buffer_.resize(frame_count);
+  }
+  return &demand_buffer_[0];
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/lpcm_stage_input.h b/services/media/framework/stages/lpcm_stage_input.h
new file mode 100644
index 0000000..b5ce154
--- /dev/null
+++ b/services/media/framework/stages/lpcm_stage_input.h
@@ -0,0 +1,102 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_INPUT_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_INPUT_H_
+
+#include <vector>
+
+#include "services/media/framework/lpcm_util.h"
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/models/lpcm_frame_buffer.h"
+#include "services/media/framework/stages/stage_input.h"
+
+namespace mojo {
+namespace media {
+
+class LpcmStageOutput;
+
+// Represents a stage's connector to an adjacent upstream stage, with LPCM
+// optmizations.
+class LpcmStageInput : public StageInput {
+ public:
+  LpcmStageInput();
+
+  ~LpcmStageInput();
+
+  // Sets the stream type.
+  void set_stream_type(const LpcmStreamType& stream_type);
+
+  // Suggests possible demand frame count, useful when !connected_to_lpcm().
+  void SuggestDemand(uint64_t frame_count, Engine* engine);
+
+  // Updates the demand signalled to the connected upstream output.
+  void SetLpcmDemand(
+      void* buffer,
+      uint64_t frame_count,
+      bool mix,
+      bool synchronous,
+      Engine* engine);
+
+  // Indicates whether demand is pending.
+  bool demand_pending() const {
+    return demand_pending_;
+  }
+
+  // Returns supplied frames.
+  LpcmFrameBuffer& lpcm_supply() {
+    return lpcm_supply_;
+  }
+
+  // Indicates whether we've hit end-of-stream.
+  bool end_of_stream() const {
+    return end_of_stream_;
+  }
+
+  // Indicates the frame count originally demanded.
+  uint64_t demand_frame_count() const {
+    return frame_count_;
+  }
+
+  // StageInput overrides.
+  bool SupplyPacketFromOutput(PacketPtr packet) override;
+
+  LpcmStageInput* get_lpcm() override;
+
+ private:
+  static const uint64_t kDefaultFrameCount = 512;
+
+  bool connected_to_lpcm();
+
+  // Copies or mixes frames from packet_frames_ to demand_frames_. This is only
+  // used when !connected_to_lpcm() and the stage has supplied a buffer. The
+  // return value indicates whether the demand was met.
+  bool CopyOrMixFrames();
+
+  // Ensures demand_buffer_ can accommodate frame_count frames and returns it.
+  void* GetDemandBuffer(uint64_t frame_count);
+
+  bool demand_pending_;
+
+  void* buffer_;
+  uint64_t frame_count_;
+  bool mix_;
+  bool synchronous_;
+
+  LpcmFrameBuffer lpcm_supply_; // Frames supplied to this stage.
+  LpcmFrameBuffer packet_frames_; // Source for packet payload copy/mix.
+  LpcmFrameBuffer demand_frames_; // Destination for packet payload copy/mix.
+
+  bool end_of_stream_;
+
+  std::unique_ptr<LpcmUtil> lpcm_util_;
+  std::vector<uint8_t> demand_buffer_;
+
+  LpcmFrameBuffer::ExhaustedCallback packet_exhausted_function_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_INPUT_H_
diff --git a/services/media/framework/stages/lpcm_stage_output.cc b/services/media/framework/stages/lpcm_stage_output.cc
new file mode 100644
index 0000000..87cb033
--- /dev/null
+++ b/services/media/framework/stages/lpcm_stage_output.cc
@@ -0,0 +1,85 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/stages/lpcm_stage_input.h"
+#include "services/media/framework/stages/lpcm_stage_output.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+LpcmStageOutput::LpcmStageOutput() :
+    buffer_(nullptr),
+    frame_count_(0),
+    mix_(false),
+    synchronous_(false),
+    next_presentation_time_(0) {}
+
+LpcmStageOutput::~LpcmStageOutput() {}
+
+void LpcmStageOutput::set_stream_type(const LpcmStreamType& stream_type) {
+  lpcm_demand_.set_bytes_per_frame(stream_type.bytes_per_frame());
+}
+
+void LpcmStageOutput::SupplyFrames(bool end_of_stream, Engine* engine) {
+  DCHECK(engine);
+  DCHECK(connected());
+  DCHECK(end_of_stream || buffer_);
+
+  uint64_t duration = frame_count_ - lpcm_demand_.frame_count();
+  if (allocator_ != nullptr) {
+    SupplyPacketInternal(
+        Packet::Create(
+            next_presentation_time_,
+            duration,
+            end_of_stream,
+            duration * lpcm_demand_.bytes_per_frame(),
+            duration == 0 ? nullptr : buffer_,
+            allocator_),
+        engine);
+  } else {
+    SupplyPacketInternal(
+        Packet::CreateNoAllocator(
+            next_presentation_time_,
+            duration,
+            end_of_stream,
+            duration * lpcm_demand_.bytes_per_frame(),
+            duration == 0 ? nullptr : buffer_),
+        engine);
+  }
+
+  next_presentation_time_ += duration;
+
+  buffer_ = nullptr;
+  frame_count_ = 0;
+  mix_ = false;
+  synchronous_ = false;
+  lpcm_demand_.Reset();
+}
+
+Allocator* LpcmStageOutput::Prepare(bool can_accept_allocator) {
+  // The stage isn't concerned with allocators, but we'll need one if our mate
+  // isn't LPCM.
+  Allocator* allocator = StageOutput::Prepare(true);
+  DCHECK(!connected_to_lpcm() || allocator == nullptr);
+  if (!connected_to_lpcm()) {
+    // Our mate isn't lpcm, so we'll need an allocator to create packet buffers.
+    // Use the provided one or the default.
+    allocator_ = allocator == nullptr ? Allocator::GetDefault() : allocator;
+  }
+
+  return nullptr;
+}
+
+LpcmStageOutput* LpcmStageOutput::get_lpcm() {
+  return this;
+}
+
+bool LpcmStageOutput::connected_to_lpcm() {
+  return mate().get_lpcm() != nullptr;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/lpcm_stage_output.h b/services/media/framework/stages/lpcm_stage_output.h
new file mode 100644
index 0000000..a60868a
--- /dev/null
+++ b/services/media/framework/stages/lpcm_stage_output.h
@@ -0,0 +1,120 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_OUTPUT_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_OUTPUT_H_
+
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/models/lpcm_frame_buffer.h"
+#include "services/media/framework/stages/stage_output.h"
+
+namespace mojo {
+namespace media {
+
+class LpcmStageInput;
+
+// Represents a stage's connector to an adjacent downstream stage.
+class LpcmStageOutput : public StageOutput {
+ public:
+  LpcmStageOutput();
+
+  ~LpcmStageOutput();
+
+  // Sets the stream type.
+  void set_stream_type(const LpcmStreamType& stream_type);
+
+  // Gets demand suggestion for the stage's input.
+  uint64_t demand_suggestion() const {
+    if (lpcm_demand_.frame_count() != 0) {
+      return lpcm_demand_.frame_count();
+    }
+
+    return demand() == Demand::kPositive ? kDefaultFrameCount : 0;
+  }
+
+  // Gets the presentation time of the next packet to be supplied. Used for
+  // adaptation to non-lpcm input.
+  int64_t next_presentation_time() const {
+    return next_presentation_time_;
+  }
+
+  // Sets the presentation time for the next packet to be supplied. Used for
+  // adaptation to non-lpcm input.
+  void set_next_presentation_time(int64_t next_presentation_time) {
+    next_presentation_time_ = next_presentation_time;
+  }
+
+  // Gets the demand signalled from downstream.
+  LpcmFrameBuffer& lpcm_demand(uint64_t suggested_frame_count = 0) {
+    if (lpcm_demand_.frame_count() == 0 &&
+        demand() == Demand::kPositive &&
+        !connected_to_lpcm()) {
+      DCHECK(buffer_ == nullptr);
+      if (suggested_frame_count == 0) {
+        suggested_frame_count = kDefaultFrameCount;
+      }
+      DCHECK(allocator_);
+      void* buffer = allocator_->AllocatePayloadBuffer(
+          suggested_frame_count * lpcm_demand_.bytes_per_frame());
+      if (buffer != nullptr) {
+        UpdateLpcmDemand(buffer, suggested_frame_count, false, false);
+      }
+    }
+    return lpcm_demand_;
+  }
+
+  // Indicates whether supplied frames should be mixed.
+  bool mix() const {
+    return mix_;
+  }
+
+  // Indicates whether supplied frames need to be delivered synchronously.
+  bool synchronous() const {
+    return synchronous_;
+  }
+
+  // Indicates that all demanded frames have been supplied or we've hit end of
+  // stream. Called only by Stage::Update implementations.
+  void SupplyFrames(bool end_of_stream, Engine* engine);
+
+  // Demands LPCM frames. Called only by LpcmStageInput instances and
+  // LpcmStageOutput::lpcm_demand.
+  void UpdateLpcmDemand(
+      void* buffer,
+      uint64_t frame_count,
+      bool mix,
+      bool synchronous) {
+    buffer_ = buffer;
+    frame_count_ = frame_count;
+    mix_ = mix;
+    synchronous_ = synchronous;
+
+    lpcm_demand_.Set(buffer, frame_count);
+  }
+
+  // StageOutput override.
+  Allocator* Prepare(bool can_accept_allocator) override;
+
+  LpcmStageOutput* get_lpcm() override;
+
+ private:
+  static const uint64_t kDefaultFrameCount = 512;
+
+  bool connected_to_lpcm();
+
+  void* buffer_;
+  uint64_t frame_count_;
+  bool mix_;
+  bool synchronous_;
+
+  LpcmFrameBuffer lpcm_demand_;
+
+  int64_t next_presentation_time_;
+  Allocator* allocator_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_STAGE_OUTPUT_H_
diff --git a/services/media/framework/stages/lpcm_transform_stage.cc b/services/media/framework/stages/lpcm_transform_stage.cc
new file mode 100644
index 0000000..3f42e01
--- /dev/null
+++ b/services/media/framework/stages/lpcm_transform_stage.cc
@@ -0,0 +1,68 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/lpcm_transform_stage.h"
+
+namespace mojo {
+namespace media {
+
+LpcmTransformStage::LpcmTransformStage(LpcmTransformPtr transform) :
+    transform_(transform) {
+  DCHECK(transform_);
+  input_.set_stream_type(transform_->input_stream_type());
+  output_.set_stream_type(transform_->output_stream_type());
+}
+
+LpcmTransformStage::~LpcmTransformStage() {}
+
+uint32_t LpcmTransformStage::input_count() const {
+  return 1;
+};
+
+StageInput& LpcmTransformStage::input(uint32_t index) {
+  DCHECK_EQ(index, 0u);
+  return input_;
+}
+
+uint32_t LpcmTransformStage::output_count() const {
+  return 1;
+}
+
+StageOutput& LpcmTransformStage::output(uint32_t index) {
+  DCHECK_EQ(index, 0u);
+  return output_;
+}
+
+bool LpcmTransformStage::Prepare(UpdateCallback update_callback) {
+  output_.Prepare(false);
+  input_.Prepare(nullptr, false);
+  return false;
+}
+
+void LpcmTransformStage::Update(Engine* engine) {
+  DCHECK(engine);
+
+  LpcmFrameBuffer& supply = input_.lpcm_supply();
+
+  if (supply.frame_count() != 0 || input_.end_of_stream()) {
+    // TODO(dalesat): Assumes 1-1.
+    LpcmFrameBuffer& demand = output_.lpcm_demand(supply.frame_count());
+
+    if (demand.frame_count() != 0) {
+      if (supply.frame_count() != 0) {
+        transform_->TransformFrames(&supply, &demand, output_.mix());
+      }
+
+      if (demand.frame_count() == 0 || input_.end_of_stream()) {
+        output_.SupplyFrames(input_.end_of_stream(), engine);
+      }
+    }
+  }
+
+  // TODO(dalesat): Assumes 1-1.
+  input_.SuggestDemand(output_.demand_suggestion(), engine);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/lpcm_transform_stage.h b/services/media/framework/stages/lpcm_transform_stage.h
new file mode 100644
index 0000000..2c4f613
--- /dev/null
+++ b/services/media/framework/stages/lpcm_transform_stage.h
@@ -0,0 +1,43 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_TRANSFORM_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_TRANSFORM_STAGE_H_
+
+#include "services/media/framework/models/lpcm_transform.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts an LpcmTransform.
+class LpcmTransformStage : public Stage {
+ public:
+  LpcmTransformStage(LpcmTransformPtr transform);
+
+  ~LpcmTransformStage() override;
+
+  // Stage implementation.
+  uint32_t input_count() const override;
+
+  StageInput& input(uint32_t index) override;
+
+  uint32_t output_count() const override;
+
+  StageOutput& output(uint32_t index) override;
+
+  bool Prepare(UpdateCallback update_callback) override;
+
+  void Update(Engine* engine) override;
+
+ private:
+  LpcmStageInput input_;
+  LpcmStageOutput output_;
+  LpcmTransformPtr transform_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_LPCM_TRANSFORM_STAGE_H_
diff --git a/services/media/framework/stages/packet_transform_stage.cc b/services/media/framework/stages/packet_transform_stage.cc
new file mode 100644
index 0000000..2564f12
--- /dev/null
+++ b/services/media/framework/stages/packet_transform_stage.cc
@@ -0,0 +1,73 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/packet_transform_stage.h"
+
+namespace mojo {
+namespace media {
+
+PacketTransformStage::PacketTransformStage(
+    PacketTransformPtr transform) :
+    transform_(transform),
+    allocator_(nullptr),
+    input_packet_is_new_(true) {
+  DCHECK(transform_);
+}
+
+PacketTransformStage::~PacketTransformStage() {}
+
+uint32_t PacketTransformStage::input_count() const {
+  return 1;
+};
+
+StageInput& PacketTransformStage::input(uint32_t index) {
+  DCHECK_EQ(index, 0u);
+  return input_;
+}
+
+uint32_t PacketTransformStage::output_count() const {
+  return 1;
+}
+
+StageOutput& PacketTransformStage::output(uint32_t index) {
+  DCHECK_EQ(index, 0u);
+  return output_;
+}
+
+bool PacketTransformStage::Prepare(const UpdateCallback update_callback) {
+  allocator_ = output_.Prepare(true);
+  if (allocator_ == nullptr) {
+    allocator_ = Allocator::GetDefault();
+  }
+  input_.Prepare(nullptr, false);
+  return false;
+}
+
+void PacketTransformStage::Update(Engine* engine) {
+  DCHECK(engine);
+  DCHECK(allocator_);
+
+  if (input_.packet_from_upstream() && output_.demand() != Demand::kNegative) {
+    PacketPtr output_packet;
+    if (transform_->TransformPacket(
+        input_.packet_from_upstream(),
+        input_packet_is_new_,
+        allocator_,
+        &output_packet)) {
+      input_.packet_from_upstream().reset();
+      input_packet_is_new_ = true;
+    } else {
+      input_packet_is_new_ = false;
+    }
+
+    if (output_packet) {
+      output_.SupplyPacket(std::move(output_packet), engine);
+    }
+  }
+
+  input_.SetDemand(output_.demand(), engine);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/packet_transform_stage.h b/services/media/framework/stages/packet_transform_stage.h
new file mode 100644
index 0000000..ab03e78
--- /dev/null
+++ b/services/media/framework/stages/packet_transform_stage.h
@@ -0,0 +1,45 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_PACKET_TRANSFORM_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_PACKET_TRANSFORM_STAGE_H_
+
+#include "services/media/framework/models/packet_transform.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts a PacketTransform.
+class PacketTransformStage : public Stage {
+ public:
+  PacketTransformStage(PacketTransformPtr transform);
+
+  ~PacketTransformStage() override;
+
+  // Stage implementation.
+  uint32_t input_count() const override;
+
+  StageInput& input(uint32_t index) override;
+
+  uint32_t output_count() const override;
+
+  StageOutput& output(uint32_t index) override;
+
+  bool Prepare(const UpdateCallback update_callback) override;
+
+  void Update(Engine* engine) override;
+
+ private:
+  StageInput input_;
+  StageOutput output_;
+  PacketTransformPtr transform_;
+  Allocator* allocator_;
+  bool input_packet_is_new_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_PACKET_TRANSFORM_STAGE_H_
diff --git a/services/media/framework/stages/stage.cc b/services/media/framework/stages/stage.cc
new file mode 100644
index 0000000..e46eead
--- /dev/null
+++ b/services/media/framework/stages/stage.cc
@@ -0,0 +1,25 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+Stage::Stage() :
+    prepared_(false),
+    in_supply_backlog_(false),
+    in_demand_backlog_(false) {}
+
+Stage::~Stage() {}
+
+bool Stage::Prepare(UpdateCallback update_callback) {
+  return false;
+}
+
+void Stage::Prime() {}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/stage.h b/services/media/framework/stages/stage.h
new file mode 100644
index 0000000..20b21c4
--- /dev/null
+++ b/services/media/framework/stages/stage.h
@@ -0,0 +1,72 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_H_
+
+#include <vector>
+
+#include "services/media/framework/packet.h"
+#include "services/media/framework/stages/lpcm_stage_input.h"
+#include "services/media/framework/stages/lpcm_stage_output.h"
+#include "services/media/framework/stages/stage_input.h"
+#include "services/media/framework/stages/stage_output.h"
+
+namespace mojo {
+namespace media {
+
+class Engine;
+
+// Host for a source, sink or transform.
+class Stage {
+ public:
+  using UpdateCallback = std::function<void(Stage* stage)>;
+
+  Stage();
+
+  virtual ~Stage();
+
+  // Returns the number of input connections.
+  virtual uint32_t input_count() const = 0;
+
+  // Returns the indicated input connection.
+  virtual StageInput& input(uint32_t index) = 0;
+
+  // Returns the number of output connections.
+  virtual uint32_t output_count() const = 0;
+
+  // Returns the indicated output connection.
+  virtual StageOutput& output(uint32_t index) = 0;
+
+  // Prepares the stage for operation, providing a callback used to signal the
+  // need to update this stage. Returns true if the stage will call the
+  // callback, false if not. The default implementation of this method returns
+  // false.
+  // TODO(dalesat): Should this be const UpdateCallback&?
+  virtual bool Prepare(UpdateCallback update_callback);
+
+  // Initiates demand. Called on sink stages after the graph is prepared. The
+  // default implementation does nothing.
+  virtual void Prime();
+
+  // Performs processing.
+  virtual void Update(Engine* engine) = 0;
+
+  // Returns a bool indicating whether the stage is prepared.
+  bool prepared() {
+    return prepared_;
+  }
+
+ private:
+  bool prepared_;
+  bool in_supply_backlog_;
+  bool in_demand_backlog_;
+
+  friend class Engine;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_H_
diff --git a/services/media/framework/stages/stage_input.cc b/services/media/framework/stages/stage_input.cc
new file mode 100644
index 0000000..0f5312a
--- /dev/null
+++ b/services/media/framework/stages/stage_input.cc
@@ -0,0 +1,73 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/stages/stage.h"
+#include "services/media/framework/stages/stage_input.h"
+
+namespace mojo {
+namespace media {
+
+StageInput::StageInput() :
+    upstream_stage_(nullptr),
+    output_index_(0),
+    allocator_(nullptr),
+    must_allocate_(false) {}
+
+StageInput::~StageInput() {}
+
+void StageInput::connect(Stage* upstream_stage, uint32_t output_index) {
+  DCHECK(upstream_stage);
+  DCHECK(output_index < upstream_stage->output_count());
+  DCHECK(upstream_stage_ == nullptr);
+  upstream_stage_ = upstream_stage;
+  output_index_ = output_index;
+}
+
+StageOutput& StageInput::mate() const {
+  DCHECK(upstream_stage_);
+  DCHECK(output_index_ < upstream_stage_->output_count());
+  return upstream_stage_->output(output_index_);
+}
+
+void StageInput::Prepare(Allocator* allocator, bool must_allocate) {
+  DCHECK(allocator != nullptr || must_allocate == false);
+  allocator_ = allocator;
+  must_allocate_ = must_allocate;
+}
+
+Allocator* StageInput::allocator() const {
+  DCHECK(connected());
+  DCHECK(mate().downstream_stage()->prepared());
+  return allocator_;
+}
+
+bool StageInput::must_allocate() const {
+  DCHECK(connected());
+  DCHECK(mate().downstream_stage()->prepared());
+  return must_allocate_;
+}
+
+void StageInput::SetDemand(Demand demand, Engine* engine) const {
+  DCHECK(engine);
+  DCHECK(connected());
+
+  if (mate().UpdateDemand(demand)) {
+    engine->PushToDemandBacklogUnsafe(upstream_stage());
+  }
+}
+
+bool StageInput::SupplyPacketFromOutput(PacketPtr packet) {
+  DCHECK(packet);
+  DCHECK(!packet_from_upstream_);
+  packet_from_upstream_ = std::move(packet);
+  return true;
+}
+
+LpcmStageInput* StageInput::get_lpcm() {
+  return nullptr;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/stage_input.h b/services/media/framework/stages/stage_input.h
new file mode 100644
index 0000000..2fe61c6
--- /dev/null
+++ b/services/media/framework/stages/stage_input.h
@@ -0,0 +1,83 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_INPUT_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_INPUT_H_
+
+#include "services/media/framework/models/demand.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+class Stage;
+class Engine;
+class StageOutput;
+class LpcmStageInput;
+
+// Represents a stage's connector to an adjacent upstream stage.
+class StageInput {
+ public:
+  StageInput();
+
+  ~StageInput();
+
+  // The stage to which this input is connected.
+  Stage* upstream_stage() const { return upstream_stage_; }
+
+  // The index of the output to which this input is connected.
+  uint32_t output_index() const { return output_index_; }
+
+  // Establishes a connection. Called only by the engine.
+  void connect(Stage* upstream_stage, uint32_t output_index);
+
+  // Breaks a connection. Called only by the engine.
+  void disconnect() {
+    upstream_stage_ = nullptr;
+    output_index_ = 0;
+  }
+
+  // Determines whether the input is connected to an output.
+  bool connected() const { return upstream_stage_ != nullptr; }
+
+  // The connected output.
+  StageOutput& mate() const;
+
+  // Prepare to move packets by providing an allocator for the upstream stage
+  // if we have one and indicating whether we require its use.
+  void Prepare(Allocator* allocator, bool must_allocate);
+
+  // Returns the allocator provided by this input.
+  Allocator* allocator() const;
+
+  // Determines whether the input requires the use of its allocator.
+  bool must_allocate() const;
+
+  // A packet supplied from upstream.
+  PacketPtr& packet_from_upstream() { return packet_from_upstream_; }
+
+  // Updates mate's demand. Called only by Stage::Update implementations.
+  void SetDemand(Demand demand, Engine* engine) const;
+
+  // Updates packet_from_upstream. Return value indicates whether the stage for
+  // this input should be added to the supply backlog. Called only by
+  // StageOutput instances.
+  virtual bool SupplyPacketFromOutput(PacketPtr packet);
+
+  // Returns the LPCM specialization if this instance is an LpcmStageInput,
+  // nullptr otherwise.
+  virtual LpcmStageInput* get_lpcm();
+
+ private:
+  Stage* upstream_stage_;
+  uint32_t output_index_;
+  Allocator* allocator_;
+  bool must_allocate_;
+  PacketPtr packet_from_upstream_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_INPUT_H_
diff --git a/services/media/framework/stages/stage_output.cc b/services/media/framework/stages/stage_output.cc
new file mode 100644
index 0000000..34d921f
--- /dev/null
+++ b/services/media/framework/stages/stage_output.cc
@@ -0,0 +1,122 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/engine.h"
+#include "services/media/framework/stages/stage.h"
+#include "services/media/framework/stages/stage_output.h"
+
+namespace mojo {
+namespace media {
+
+StageOutput::StageOutput() :
+    downstream_stage_(nullptr),
+    input_index_(0),
+    demand_(Demand::kNegative),
+    copy_allocator_(nullptr) {}
+
+StageOutput::~StageOutput() {}
+
+void StageOutput::connect(Stage* downstream_stage, uint32_t input_index) {
+  DCHECK(downstream_stage);
+  DCHECK(input_index < downstream_stage->input_count());
+  DCHECK(downstream_stage_ == nullptr);
+  downstream_stage_ = downstream_stage;
+  input_index_ = input_index;
+}
+
+StageInput& StageOutput::mate() const {
+  DCHECK(downstream_stage_);
+  DCHECK(input_index_ < downstream_stage_->input_count());
+  return downstream_stage_->input(input_index_);
+}
+
+Allocator* StageOutput::Prepare(bool can_accept_allocator) {
+  DCHECK(connected());
+  StageInput& mate = this->mate();
+  copy_allocator_ = nullptr;
+
+  if (can_accept_allocator) {
+    // We can use an allocator. Use our mate's allocator, if it has one.
+    return mate.allocator();
+  } else if (mate.must_allocate()) {
+    // We can't use an allocator, but our mate needs us to. We'll need to copy
+    // every packet.
+    copy_allocator_ = mate.allocator();
+    DCHECK(copy_allocator_);
+    return nullptr;
+  } else {
+    // We can't use an allocator, and our mate doesn't need us to.
+    return nullptr;
+  }
+}
+
+Demand StageOutput::demand() const {
+  DCHECK(connected());
+
+  // Return negative demand if mate() already has a packet.
+  // We check demand_ here to possibly avoid the second check.
+  if (demand_ == Demand::kNegative || mate().packet_from_upstream()) {
+    return Demand::kNegative;
+  }
+
+  return demand_;
+}
+
+void StageOutput::SupplyPacket(PacketPtr packet, Engine* engine) const {
+  DCHECK(packet);
+  DCHECK(engine);
+  DCHECK(connected());
+
+  if (copy_allocator_ != nullptr) {
+    // Need to copy the packet due to an allocation conflict.
+    uint64_t size = packet->size();
+    void *buffer;
+
+    if (size == 0) {
+      buffer = nullptr;
+    } else {
+      buffer = copy_allocator_->AllocatePayloadBuffer(size);
+      if (buffer == nullptr) {
+        // Starved for buffer space.
+        return;
+      }
+      memcpy(buffer, packet->payload(), size);
+    }
+
+    packet = Packet::Create(
+        packet->presentation_time(),
+        packet->duration(),
+        packet->end_of_stream(),
+        size,
+        buffer,
+        copy_allocator_);
+  }
+
+  SupplyPacketInternal(std::move(packet), engine);
+}
+
+bool StageOutput::UpdateDemand(Demand demand) {
+  if (demand_ == demand) {
+    return false;
+  }
+  demand_ = demand;
+  return true;
+}
+
+LpcmStageOutput* StageOutput::get_lpcm() {
+  return nullptr;
+}
+
+void StageOutput::SupplyPacketInternal(PacketPtr packet, Engine* engine)
+    const {
+  DCHECK(packet);
+  DCHECK(engine);
+  DCHECK(connected());
+  if (mate().SupplyPacketFromOutput(std::move(packet))) {
+    engine->PushToSupplyBacklogUnsafe(downstream_stage());
+  }
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/stage_output.h b/services/media/framework/stages/stage_output.h
new file mode 100644
index 0000000..8cf72bd
--- /dev/null
+++ b/services/media/framework/stages/stage_output.h
@@ -0,0 +1,81 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_OUTPUT_H_
+#define SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_OUTPUT_H_
+
+#include "services/media/framework/allocator.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+class Stage;
+class Engine;
+class StageInput;
+class LpcmStageOutput;
+
+// Represents a stage's connector to an adjacent downstream stage.
+class StageOutput {
+ public:
+  StageOutput();
+
+  ~StageOutput();
+
+  // The stage to which this output is connected.
+  Stage* downstream_stage() const { return downstream_stage_; }
+
+  // The index of the input to which this output is connected.
+  uint32_t input_index() const { return input_index_; }
+
+  // Establishes a connection. Called only by the engine.
+  void connect(Stage* downstream_stage, uint32_t input_index);
+
+  // Breaks a connection. Called only by the engine.
+  void disconnect() {
+    downstream_stage_ = nullptr;
+    input_index_ = 0;
+  }
+
+  // Determines whether the output is connected to an input.
+  bool connected() const {
+    return downstream_stage_ != nullptr;
+  }
+
+  // The connected input.
+  StageInput& mate() const;
+
+  // Gets ready to move packets by negotiating the use of allocators. If the
+  // downstream input provides an allocator, and we can use it, this method
+  // returns the provided allocator. Otherwise, it returns nullptr.
+  virtual Allocator* Prepare(bool can_accept_allocator);
+
+  // Demand signalled from downstream, or kNegative if the downstream input
+  // is currently holding a packet.
+  Demand demand() const;
+
+  // Supplies a packet to mate. Called only by Stage::Update implementations.
+  void SupplyPacket(PacketPtr packet, Engine* engine) const;
+
+  // Updates packet demand. Called only by StageInput instances.
+  bool UpdateDemand(Demand demand);
+
+  // Returns the LPCM specialization if this instance is an LpcmStageOutput,
+  // nullptr otherwise.
+  virtual LpcmStageOutput* get_lpcm();
+
+ protected:
+  void SupplyPacketInternal(PacketPtr packet, Engine* engine) const;
+
+ private:
+  Stage* downstream_stage_;
+  uint32_t input_index_;
+  Demand demand_;
+  Allocator* copy_allocator_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_ENGINE_STAGE_OUTPUT_H_
diff --git a/services/media/framework/stream_type.cc b/services/media/framework/stream_type.cc
new file mode 100644
index 0000000..473d62b
--- /dev/null
+++ b/services/media/framework/stream_type.cc
@@ -0,0 +1,339 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+BytesPtr Bytes::Clone() const {
+  return BytesPtr(new Bytes(*this));
+}
+
+StreamType::StreamType(Scheme scheme) : scheme_(scheme) {}
+
+StreamType::~StreamType() {}
+
+const MultiplexedStreamType* StreamType::multiplexed() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+const LpcmStreamType* StreamType::lpcm() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+const CompressedAudioStreamType* StreamType::compressed_audio() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+const VideoStreamType* StreamType::video() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+StreamTypePtr StreamType::Clone() const {
+  return Create(scheme());
+}
+
+StreamTypesPtr StreamTypes::Clone() const {
+  StreamTypes* result = new StreamTypes(size());
+  for (const StreamTypePtr& stream_type : *this) {
+    result->push_back(stream_type.Clone());
+  }
+  return StreamTypesPtr(result);
+}
+
+StreamTypeSet::StreamTypeSet(StreamType::Scheme scheme) : scheme_(scheme) {}
+
+StreamTypeSet::~StreamTypeSet() {}
+
+StreamTypeSetsPtr StreamTypeSets::Clone() const {
+  StreamTypeSets* result = new StreamTypeSets(size());
+  for (const StreamTypeSetPtr& stream_type_set : *this) {
+    result->push_back(stream_type_set.Clone());
+  }
+  return StreamTypeSetsPtr(result);
+}
+
+const MultiplexedStreamTypeSet* StreamTypeSet::multiplexed() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+const LpcmStreamTypeSet* StreamTypeSet::lpcm() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+const CompressedAudioStreamTypeSet* StreamTypeSet::compressed_audio() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+const VideoStreamTypeSet* StreamTypeSet::video() const {
+  NOTREACHED();
+  return nullptr;
+}
+
+StreamTypeSetPtr StreamTypeSet::Clone() const {
+  return Create(scheme());
+}
+
+MultiplexedStreamType::MultiplexedStreamType(
+    StreamTypePtr multiplex_type,
+    StreamTypesPtr substream_types) :
+    StreamType(Scheme::kMultiplexed),
+    multiplex_type_(std::move(multiplex_type)),
+    substream_types_(std::move(substream_types)) {}
+
+MultiplexedStreamType::~MultiplexedStreamType() {}
+
+const MultiplexedStreamType* MultiplexedStreamType::multiplexed() const {
+  return this;
+}
+
+StreamTypePtr MultiplexedStreamType::Clone() const {
+  return Create(multiplex_type().Clone(), substream_types().Clone());
+}
+
+MultiplexedStreamTypeSet::MultiplexedStreamTypeSet(
+    StreamTypeSetPtr multiplex_type_set,
+    StreamTypeSetsPtr substream_type_sets) :
+    StreamTypeSet(StreamType::Scheme::kMultiplexed),
+    multiplex_type_set_(std::move(multiplex_type_set)),
+    substream_type_sets_(std::move(substream_type_sets)) {}
+
+MultiplexedStreamTypeSet::~MultiplexedStreamTypeSet() {}
+
+const MultiplexedStreamTypeSet* MultiplexedStreamTypeSet::multiplexed() const {
+  return this;
+}
+
+StreamTypeSetPtr MultiplexedStreamTypeSet::Clone() const {
+  return Create(multiplex_type_set().Clone(), substream_type_sets().Clone());
+}
+
+LpcmStreamType::LpcmStreamType(
+    SampleFormat sample_format,
+    uint32_t channels,
+    uint32_t frames_per_second) :
+    LpcmStreamType(
+        Scheme::kLpcm,
+        sample_format,
+        channels,
+        frames_per_second) {}
+
+LpcmStreamType::LpcmStreamType(
+    Scheme scheme,
+    SampleFormat sample_format,
+    uint32_t channels,
+    uint32_t frames_per_second) :
+    StreamType(scheme),
+    sample_format_(sample_format),
+    channels_(channels),
+    frames_per_second_(frames_per_second),
+    sample_size_(SampleSizeFromFormat(sample_format)) {}
+
+LpcmStreamType::~LpcmStreamType() {}
+
+const LpcmStreamType* LpcmStreamType::lpcm() const {
+  return this;
+}
+
+// static
+uint32_t LpcmStreamType::SampleSizeFromFormat(
+    SampleFormat sample_format) {
+  switch(sample_format) {
+    case SampleFormat::kUnsigned8:
+      return sizeof(uint8_t);
+    case SampleFormat::kSigned16:
+      return sizeof(int16_t);
+    case SampleFormat::kSigned24In32:
+      return sizeof(int32_t);
+    case SampleFormat::kFloat:
+      return sizeof(float);
+    case SampleFormat::kUnknown:
+    case SampleFormat::kAny:
+      return 0;
+  }
+}
+
+StreamTypePtr LpcmStreamType::Clone() const {
+  return Create(sample_format(), channels(), frames_per_second());
+}
+
+LpcmStreamTypeSet::LpcmStreamTypeSet(
+    LpcmStreamType::SampleFormat sample_format,
+    Range<uint32_t> channels,
+    Range<uint32_t> frames_per_second) :
+    LpcmStreamTypeSet(
+        StreamType::Scheme::kLpcm,
+        sample_format,
+        channels,
+        frames_per_second) {}
+
+LpcmStreamTypeSet::LpcmStreamTypeSet(
+    StreamType::Scheme scheme,
+    LpcmStreamType::SampleFormat sample_format,
+    Range<uint32_t> channels,
+    Range<uint32_t> frames_per_second) :
+    StreamTypeSet(scheme),
+    sample_format_(sample_format),
+    channels_(channels),
+    frames_per_second_(frames_per_second) {}
+
+LpcmStreamTypeSet::~LpcmStreamTypeSet() {}
+
+const LpcmStreamTypeSet* LpcmStreamTypeSet::lpcm() const {
+  return this;
+}
+
+bool LpcmStreamTypeSet::contains(const LpcmStreamType& type) const {
+  return
+      (sample_format() == type.sample_format() ||
+          sample_format() == LpcmStreamType::SampleFormat::kAny) &&
+      channels().contains(type.frames_per_second()) &&
+      frames_per_second().contains(type.frames_per_second());
+}
+
+StreamTypeSetPtr LpcmStreamTypeSet::Clone() const {
+  return Create(sample_format(), channels(), frames_per_second());
+}
+
+CompressedAudioStreamType::CompressedAudioStreamType(
+  AudioEncoding encoding,
+  SampleFormat sample_format,
+  uint32_t channels,
+  uint32_t frames_per_second,
+  BytesPtr encoding_details) :
+  LpcmStreamType(
+      Scheme::kCompressedAudio,
+      sample_format,
+      channels,
+      frames_per_second),
+  encoding_(encoding),
+  encoding_details_(std::move(encoding_details)) {}
+
+CompressedAudioStreamType::~CompressedAudioStreamType() {}
+
+const CompressedAudioStreamType* CompressedAudioStreamType::compressed_audio()
+    const {
+  return this;
+}
+
+StreamTypePtr CompressedAudioStreamType::Clone() const {
+  return Create(
+      encoding(),
+      sample_format(),
+      channels(),
+      frames_per_second(),
+      encoding_details().Clone());
+}
+
+CompressedAudioStreamTypeSet::CompressedAudioStreamTypeSet(
+  CompressedAudioStreamType::AudioEncoding encoding,
+  CompressedAudioStreamType::SampleFormat sample_format,
+  Range<uint32_t> channels,
+  Range<uint32_t> frames_per_second) :
+  LpcmStreamTypeSet(
+      StreamType::Scheme::kCompressedAudio,
+      sample_format,
+      channels,
+      frames_per_second),
+  encoding_(encoding) {}
+
+CompressedAudioStreamTypeSet::~CompressedAudioStreamTypeSet() {}
+
+const CompressedAudioStreamTypeSet*
+    CompressedAudioStreamTypeSet::compressed_audio() const {
+  return this;
+}
+
+bool CompressedAudioStreamTypeSet::contains(
+    const CompressedAudioStreamType& type) const {
+  return
+      (encoding() == type.encoding() ||
+          encoding() == CompressedAudioStreamType::AudioEncoding::kAny) &&
+      (sample_format() == type.sample_format() ||
+          sample_format() == LpcmStreamType::SampleFormat::kAny) &&
+      channels().contains(type.frames_per_second()) &&
+      frames_per_second().contains(type.frames_per_second());
+}
+
+StreamTypeSetPtr CompressedAudioStreamTypeSet::Clone() const {
+  return Create(
+      encoding(),
+      sample_format(),
+      channels(),
+      frames_per_second());
+}
+
+VideoStreamType::VideoStreamType(
+  VideoEncoding encoding,
+  VideoProfile profile,
+  PixelFormat pixel_format,
+  ColorSpace color_space,
+  uint32_t width,
+  uint32_t height,
+  uint32_t coded_width,
+  uint32_t coded_height,
+  BytesPtr encoding_details) :
+  StreamType(StreamType::Scheme::kVideo),
+  encoding_(encoding),
+  profile_(profile),
+  pixel_format_(pixel_format),
+  color_space_(color_space),
+  width_(width),
+  height_(height),
+  coded_width_(coded_width),
+  coded_height_(coded_height),
+  encoding_details_(std::move(encoding_details)) {}
+
+VideoStreamType::~VideoStreamType() {}
+
+const VideoStreamType* VideoStreamType::video() const {
+  return this;
+}
+
+StreamTypePtr VideoStreamType::Clone() const {
+  return Create(
+      encoding(),
+      profile(),
+      pixel_format(),
+      color_space(),
+      width(),
+      height(),
+      coded_width(),
+      coded_height(),
+      encoding_details().Clone());
+}
+
+VideoStreamTypeSet::VideoStreamTypeSet(
+  VideoStreamType::VideoEncoding encoding,
+  Range<uint32_t> width,
+  Range<uint32_t> height) :
+  StreamTypeSet(StreamType::Scheme::kVideo),
+  encoding_(encoding),
+  width_(width),
+  height_(height) {}
+
+VideoStreamTypeSet::~VideoStreamTypeSet() {}
+
+const VideoStreamTypeSet* VideoStreamTypeSet::video() const {
+  return this;
+}
+
+StreamTypeSetPtr VideoStreamTypeSet::Clone() const {
+  return Create(
+      encoding(),
+      width(),
+      height());
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework/stream_type.h b/services/media/framework/stream_type.h
new file mode 100644
index 0000000..84d4456
--- /dev/null
+++ b/services/media/framework/stream_type.h
@@ -0,0 +1,637 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STREAM_TYPE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STREAM_TYPE_H_
+
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "base/logging.h"
+#include "services/media/framework/ptr.h"
+
+namespace mojo {
+namespace media {
+
+class StreamType;
+class StreamTypes;
+class MultiplexedStreamType;
+class LpcmStreamType;
+class CompressedAudioStreamType;
+class VideoStreamType;
+
+typedef UniquePtr<StreamType> StreamTypePtr;
+typedef UniquePtr<StreamTypes> StreamTypesPtr;
+
+// TODO(dalesat): Get rid of this class.
+class StreamTypes : public std::vector<StreamTypePtr> {
+ public:
+  static StreamTypesPtr Create(size_t size) {
+    return StreamTypesPtr(new StreamTypes(size));
+  }
+
+  explicit StreamTypes(size_t size) : std::vector<StreamTypePtr>(size) {}
+
+  StreamTypesPtr Clone() const;
+};
+
+class Bytes;
+typedef UniquePtr<Bytes> BytesPtr;
+
+// TODO(dalesat): Get rid of this class.
+class Bytes : public std::vector<uint8_t> {
+ public:
+  static BytesPtr Create(size_t size) {
+    return BytesPtr(new Bytes(size));
+  }
+
+  static BytesPtr Create(uint8_t* data, size_t size) {
+    BytesPtr result = Create(size);
+    if (size != 0) {
+      DCHECK(result->data());
+      DCHECK(data);
+      std::memcpy(result->data(), data, size);
+    }
+    return result;
+  }
+
+  explicit Bytes(size_t size) : std::vector<uint8_t>(size) {}
+
+  explicit Bytes(const Bytes& copy_from) : std::vector<uint8_t>(copy_from) {}
+
+  BytesPtr Clone() const;
+};
+
+// Describes the type of a stream.
+class StreamType {
+ public:
+  enum class Scheme {
+    kUnknown,
+    kNone,
+    kAnyElementary,
+    kAnyAudio,
+    kAnyVideo,
+    kAnySubpicture,
+    kAnyText,
+    kAnyMultiplexed,
+    kAny,
+
+    kMultiplexed,
+    kLpcm,
+    kCompressedAudio,
+    kVideo
+  };
+
+  static StreamTypePtr Create(Scheme scheme) {
+    return StreamTypePtr(new StreamType(scheme));
+  }
+
+  explicit StreamType(Scheme scheme);
+
+  virtual ~StreamType();
+
+  Scheme scheme() const {
+    return scheme_;
+  }
+
+  virtual const MultiplexedStreamType* multiplexed() const;
+  virtual const LpcmStreamType* lpcm() const;
+  virtual const CompressedAudioStreamType* compressed_audio() const;
+  virtual const VideoStreamType* video() const;
+
+  virtual StreamTypePtr Clone() const;
+
+ private:
+  Scheme scheme_;
+};
+
+template<typename T>
+struct Range {
+  Range(T min_param, T max_param) : min(min_param), max(max_param) {
+    DCHECK(min_param <= max_param);
+  }
+
+  T min;
+  T max;
+
+  constexpr bool contains(const T& t) const {
+    return t >= min && t <= max;
+  }
+};
+
+class StreamTypeSet;
+class StreamTypeSets;
+class MultiplexedStreamTypeSet;
+class LpcmStreamTypeSet;
+class CompressedAudioStreamTypeSet;
+class VideoStreamTypeSet;
+
+typedef UniquePtr<StreamTypeSet> StreamTypeSetPtr;
+typedef UniquePtr<StreamTypeSets> StreamTypeSetsPtr;
+
+// TODO(dalesat): Get rid of this class.
+class StreamTypeSets : public std::vector<StreamTypeSetPtr> {
+ public:
+  static StreamTypeSetsPtr Create(size_t size) {
+    return StreamTypeSetsPtr(new StreamTypeSets(size));
+  }
+
+  StreamTypeSets(size_t size) : std::vector<StreamTypeSetPtr>(size) {}
+
+  StreamTypeSetsPtr Clone() const;
+};
+
+// Describes a set of possible stream types.
+class StreamTypeSet {
+ public:
+  static StreamTypeSetPtr Create(StreamType::Scheme scheme) {
+    return StreamTypeSetPtr(new StreamTypeSet(scheme));
+  }
+
+  StreamTypeSet(StreamType::Scheme scheme);
+
+  virtual ~StreamTypeSet();
+
+  StreamType::Scheme scheme() const {
+    return scheme_;
+  }
+
+  virtual const MultiplexedStreamTypeSet* multiplexed() const;
+  virtual const LpcmStreamTypeSet* lpcm() const;
+  virtual const CompressedAudioStreamTypeSet* compressed_audio() const;
+  virtual const VideoStreamTypeSet* video() const;
+
+  virtual StreamTypeSetPtr Clone() const;
+
+ private:
+  StreamType::Scheme scheme_;
+};
+
+// Describes the type of a multiplexed stream.
+class MultiplexedStreamType : public StreamType {
+ public:
+  static StreamTypePtr Create(
+      StreamTypePtr multiplex_type,
+      StreamTypesPtr substream_types) {
+    return StreamTypePtr(
+        new MultiplexedStreamType(
+            std::move(multiplex_type),
+            std::move(substream_types)));
+  }
+
+  MultiplexedStreamType(
+      StreamTypePtr multiplex_type,
+      StreamTypesPtr substream_types);
+
+  ~MultiplexedStreamType() override;
+
+  const MultiplexedStreamType* multiplexed() const override;
+
+  const StreamTypePtr& multiplex_type() const {
+    return multiplex_type_;
+  }
+
+  const StreamTypesPtr& substream_types() const {
+    return substream_types_;
+  }
+
+  StreamTypePtr Clone() const override;
+
+private:
+  StreamTypePtr multiplex_type_;
+  StreamTypesPtr substream_types_;
+};
+
+// Describes the type of a multiplexed stream.
+class MultiplexedStreamTypeSet : public StreamTypeSet {
+public:
+  static StreamTypeSetPtr Create(
+      StreamTypeSetPtr multiplex_type_set,
+      StreamTypeSetsPtr substream_type_sets) {
+    return StreamTypeSetPtr(
+        new MultiplexedStreamTypeSet(
+            std::move(multiplex_type_set),
+            std::move(substream_type_sets)));
+  }
+
+  MultiplexedStreamTypeSet(
+      StreamTypeSetPtr multiplex_type_set,
+      StreamTypeSetsPtr substream_type_sets);
+
+  ~MultiplexedStreamTypeSet() override;
+
+  const MultiplexedStreamTypeSet* multiplexed() const override;
+
+  const StreamTypeSetPtr& multiplex_type_set() const {
+    return multiplex_type_set_;
+  }
+
+  const StreamTypeSetsPtr& substream_type_sets() const {
+    return substream_type_sets_;
+  }
+
+  StreamTypeSetPtr Clone() const override;
+
+private:
+  StreamTypeSetPtr multiplex_type_set_;
+  StreamTypeSetsPtr substream_type_sets_;
+};
+
+// Describes the type of an LPCM stream.
+class LpcmStreamType : public StreamType {
+ public:
+  enum class SampleFormat {
+    kUnknown,
+    kAny,
+    kUnsigned8,
+    kSigned16,
+    kSigned24In32,
+    kFloat
+  };
+
+  static StreamTypePtr Create(
+      SampleFormat sample_format,
+      uint32_t channels,
+      uint32_t frames_per_second) {
+    return StreamTypePtr(new LpcmStreamType(
+        sample_format,
+        channels,
+        frames_per_second));
+  }
+
+  LpcmStreamType(
+      SampleFormat sample_format,
+      uint32_t channels,
+      uint32_t frames_per_second);
+
+  ~LpcmStreamType() override;
+
+  const LpcmStreamType* lpcm() const override;
+
+  SampleFormat sample_format() const {
+    return sample_format_;
+  }
+
+  uint32_t channels() const {
+    return channels_;
+  }
+
+  uint32_t frames_per_second() const {
+    return frames_per_second_;
+  }
+
+  uint32_t sample_size() const {
+    return sample_size_;
+  }
+
+  uint32_t bytes_per_frame() const {
+    return sample_size_ * channels_;
+  }
+
+  uint64_t min_buffer_size(uint64_t frame_count) const {
+    return frame_count * sample_size_ * channels_;
+  }
+
+  static uint32_t SampleSizeFromFormat(SampleFormat sample_format);
+
+  StreamTypePtr Clone() const override;
+
+ protected:
+  LpcmStreamType(
+      Scheme scheme,
+      SampleFormat sample_format,
+      uint32_t channels,
+      uint32_t frames_per_second);
+
+ private:
+  SampleFormat sample_format_;
+  uint32_t channels_;
+  uint32_t frames_per_second_;
+  uint32_t sample_size_;
+};
+
+// Describes a set of LPCM stream types.
+class LpcmStreamTypeSet : public StreamTypeSet {
+ public:
+  static StreamTypeSetPtr Create(
+      LpcmStreamType::SampleFormat sample_format,
+      Range<uint32_t> channels,
+      Range<uint32_t> frames_per_second) {
+    return StreamTypeSetPtr(new LpcmStreamTypeSet(
+        sample_format,
+        channels,
+        frames_per_second));
+  }
+
+  LpcmStreamTypeSet(
+      LpcmStreamType::SampleFormat sample_format,
+      Range<uint32_t> channels,
+      Range<uint32_t> frames_per_second);
+
+  ~LpcmStreamTypeSet() override;
+
+  const LpcmStreamTypeSet* lpcm() const override;
+
+  LpcmStreamType::SampleFormat sample_format() const {
+    return sample_format_;
+  }
+
+  Range<uint32_t> channels() const {
+    return channels_;
+  }
+
+  Range<uint32_t> frames_per_second() const {
+    return frames_per_second_;
+  }
+
+  bool contains(const LpcmStreamType& type) const;
+
+  StreamTypeSetPtr Clone() const override;
+
+ protected:
+  LpcmStreamTypeSet(
+      StreamType::Scheme scheme,
+      LpcmStreamType::SampleFormat sample_format,
+      Range<uint32_t> channels,
+      Range<uint32_t> frames_per_second);
+
+ private:
+  LpcmStreamType::SampleFormat sample_format_;
+  Range<uint32_t> channels_;
+  Range<uint32_t> frames_per_second_;
+};
+
+// Describes the type of a compressed audio stream.
+class CompressedAudioStreamType : public LpcmStreamType {
+ public:
+  enum class AudioEncoding {
+    kUnknown,
+    kAny,
+    kVorbis
+  };
+
+  static StreamTypePtr Create(
+      AudioEncoding encoding,
+      SampleFormat sample_format,
+      uint32_t channels,
+      uint32_t frames_per_second,
+      BytesPtr encoding_details) {
+    return StreamTypePtr(new CompressedAudioStreamType(
+        encoding,
+        sample_format,
+        channels,
+        frames_per_second,
+        std::move(encoding_details)));
+  }
+
+  CompressedAudioStreamType(
+    AudioEncoding encoding,
+    SampleFormat sample_format,
+    uint32_t channels,
+    uint32_t frames_per_second,
+    BytesPtr encoding_details);
+
+  ~CompressedAudioStreamType() override;
+
+  const CompressedAudioStreamType* compressed_audio() const override;
+
+  AudioEncoding encoding() const {
+    return encoding_;
+  }
+
+  const BytesPtr& encoding_details() const {
+    return encoding_details_;
+  }
+
+  StreamTypePtr Clone() const override;
+
+ private:
+  AudioEncoding encoding_;
+  BytesPtr encoding_details_;
+};
+
+// Describes a set of compressed audio stream types.
+class CompressedAudioStreamTypeSet : public LpcmStreamTypeSet {
+ public:
+  static StreamTypeSetPtr Create(
+      CompressedAudioStreamType::AudioEncoding encoding,
+      CompressedAudioStreamType::SampleFormat sample_format,
+      Range<uint32_t> channels,
+      Range<uint32_t> frames_per_second) {
+    return StreamTypeSetPtr(new CompressedAudioStreamTypeSet(
+        encoding,
+        sample_format,
+        channels,
+        frames_per_second));
+  }
+
+  CompressedAudioStreamTypeSet(
+    CompressedAudioStreamType::AudioEncoding encoding,
+    CompressedAudioStreamType::SampleFormat sample_format,
+    Range<uint32_t> channels,
+    Range<uint32_t> frames_per_second);
+
+  ~CompressedAudioStreamTypeSet() override;
+
+  const CompressedAudioStreamTypeSet* compressed_audio() const override;
+
+  CompressedAudioStreamType::AudioEncoding encoding() const {
+    return encoding_;
+  }
+
+  bool contains(const CompressedAudioStreamType& type) const;
+
+  StreamTypeSetPtr Clone() const override;
+
+ private:
+  CompressedAudioStreamType::AudioEncoding encoding_;
+};
+
+// Describes the type of a video stream.
+class VideoStreamType : public StreamType {
+ public:
+  enum class VideoEncoding {
+    kUnknown,
+    kAny,
+    kTheora,
+    kVp8,
+  };
+
+  enum class VideoProfile {
+    kUnknown,
+    kNotApplicable,
+    kH264Baseline,
+    kH264Main,
+    kH264Extended,
+    kH264High,
+    kH264High10,
+    kH264High422,
+    kH264High444Predictive,
+    kH264ScalableBaseline,
+    kH264ScalableHigh,
+    kH264StereoHigh,
+    kH264MultiviewHigh
+  };
+
+  enum class PixelFormat {
+    kUnknown,
+    kI420,
+    kYv12,
+    kYv16,
+    kYv12A,
+    kYv24,
+    kNv12,
+    kNv21,
+    kUyvy,
+    kYuy2,
+    kArgb,
+    kXrgb,
+    kRgb24,
+    kRgb32,
+    kMjpeg,
+    kMt21
+  };
+
+  enum class ColorSpace {
+    kUnknown,
+    kNotApplicable,
+    kJpeg,
+    kHdRec709,
+    kSdRec601
+  };
+
+  static StreamTypePtr Create(
+      VideoEncoding encoding,
+      VideoProfile profile,
+      PixelFormat pixel_format,
+      ColorSpace color_space,
+      uint32_t width,
+      uint32_t height,
+      uint32_t coded_width,
+      uint32_t coded_height,
+      BytesPtr encoding_details) {
+    return StreamTypePtr(new VideoStreamType(
+        encoding,
+        profile,
+        pixel_format,
+        color_space,
+        width,
+        height,
+        coded_width,
+        coded_height,
+        std::move(encoding_details)));
+  }
+
+  VideoStreamType(
+    VideoEncoding encoding,
+    VideoProfile profile,
+    PixelFormat pixel_format,
+    ColorSpace color_space,
+    uint32_t width,
+    uint32_t height,
+    uint32_t coded_width,
+    uint32_t coded_height,
+    BytesPtr encoding_details);
+
+  ~VideoStreamType() override;
+
+  const VideoStreamType* video() const override;
+
+  VideoEncoding encoding() const {
+    return encoding_;
+  }
+
+  VideoProfile profile() const {
+    return profile_;
+  }
+
+  PixelFormat pixel_format() const {
+    return pixel_format_;
+  }
+
+  ColorSpace color_space() const {
+    return color_space_;
+  }
+
+  uint32_t width() const {
+    return width_;
+  }
+
+  uint32_t height() const {
+    return height_;
+  }
+
+  uint32_t coded_width() const {
+    return coded_width_;
+  }
+
+  uint32_t coded_height() const {
+    return coded_height_;
+  }
+
+  const BytesPtr& encoding_details() const {
+    return encoding_details_;
+  }
+
+  StreamTypePtr Clone() const override;
+
+ private:
+  VideoEncoding encoding_;
+  VideoProfile profile_;
+  PixelFormat pixel_format_;
+  ColorSpace color_space_;
+  uint32_t width_;
+  uint32_t height_;
+  uint32_t coded_width_;
+  uint32_t coded_height_;
+  BytesPtr encoding_details_;
+};
+
+// Describes a set of video stream types.
+class VideoStreamTypeSet : public StreamTypeSet {
+ public:
+  static StreamTypeSetPtr Create(
+      VideoStreamType::VideoEncoding encoding,
+      Range<uint32_t> width,
+      Range<uint32_t> height) {
+    return StreamTypeSetPtr(new VideoStreamTypeSet(
+        encoding,
+        width,
+        height));
+  }
+
+  VideoStreamTypeSet(
+    VideoStreamType::VideoEncoding encoding,
+    Range<uint32_t> width,
+    Range<uint32_t> height);
+
+  ~VideoStreamTypeSet() override;
+
+  const VideoStreamTypeSet* video() const override;
+
+  VideoStreamType::VideoEncoding encoding() const {
+    return encoding_;
+  }
+
+  Range<uint32_t> width() const {
+    return width_;
+  }
+
+  Range<uint32_t> height() const {
+    return height_;
+  }
+
+  StreamTypeSetPtr Clone() const override;
+
+ private:
+  VideoStreamType::VideoEncoding encoding_;
+  Range<uint32_t> width_;
+  Range<uint32_t> height_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_STREAM_TYPE_H_
diff --git a/services/media/framework_create/BUILD.gn b/services/media/framework_create/BUILD.gn
new file mode 100644
index 0000000..b909fdb
--- /dev/null
+++ b/services/media/framework_create/BUILD.gn
@@ -0,0 +1,17 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//build/module_args/mojo.gni")
+import("$mojo_sdk_root/mojo/public/mojo_sdk.gni")
+
+source_set("framework_create") {
+  sources = [
+    "decoder.cc",
+    "demux.cc",
+  ]
+
+  deps = [
+    "//services/media/framework",
+  ]
+}
diff --git a/services/media/framework_create/decoder.cc b/services/media/framework_create/decoder.cc
new file mode 100644
index 0000000..ec37d59
--- /dev/null
+++ b/services/media/framework_create/decoder.cc
@@ -0,0 +1,17 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/parts/decoder.h"
+
+namespace mojo {
+namespace media {
+
+Result Decoder::Create(
+    const StreamTypePtr& stream_type,
+    DecoderPtr* decoder_out) {
+  return Result::kUnsupportedOperation;
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_create/demux.cc b/services/media/framework_create/demux.cc
new file mode 100644
index 0000000..fb48b69
--- /dev/null
+++ b/services/media/framework_create/demux.cc
@@ -0,0 +1,15 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/parts/demux.h"
+
+namespace mojo {
+namespace media {
+
+Result Demux::Create(ReaderPtr reader, DemuxPtr* demux_out) {
+  return Result::kUnsupportedOperation;
+}
+
+} // namespace media
+} // namespace mojo