Motown: Framework parts for mojo transport (producer/consumer/mediapipe) and control (audiotrack).

R=johngro@google.com

Review URL: https://codereview.chromium.org/1692443002 .
diff --git a/services/media/framework_mojo/BUILD.gn b/services/media/framework_mojo/BUILD.gn
new file mode 100644
index 0000000..cef34dc
--- /dev/null
+++ b/services/media/framework_mojo/BUILD.gn
@@ -0,0 +1,38 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+source_set("framework_mojo") {
+  sources = [
+    "audio_track_controller.cc",
+    "audio_track_controller.h",
+    "audio_track_producer.cc",
+    "audio_track_producer.h",
+    "mojo_allocator.cc",
+    "mojo_allocator.h",
+    "mojo_consumer.cc",
+    "mojo_consumer.h",
+    "mojo_formatting.cc",
+    "mojo_formatting.h",
+    "mojo_producer.cc",
+    "mojo_producer.h",
+    "mojo_pull_mode_producer.cc",
+    "mojo_pull_mode_producer.h",
+    "mojo_type_conversions.cc",
+    "mojo_type_conversions.h",
+    "push_producer_base.cc",
+    "push_producer_base.h",
+  ]
+
+  deps = [
+    "//base",
+    "//mojo/common",
+    "//mojo/public/cpp/application",
+    "//mojo/services/media/audio/interfaces",
+    "//mojo/services/media/common/cpp",
+    "//mojo/services/media/common/interfaces",
+    "//mojo/services/media/control/interfaces",
+    "//services/media/framework",
+    "//third_party/modp_b64",
+  ]
+}
diff --git a/services/media/framework_mojo/audio_track_controller.cc b/services/media/framework_mojo/audio_track_controller.cc
new file mode 100644
index 0000000..c93a013
--- /dev/null
+++ b/services/media/framework_mojo/audio_track_controller.cc
@@ -0,0 +1,162 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <list>
+
+#include "base/bind_helpers.h"
+#include "base/logging.h"
+#include "mojo/services/media/audio/interfaces/audio_server.mojom.h"
+#include "mojo/services/media/audio/interfaces/audio_track.mojom.h"
+#include "mojo/services/media/common/cpp/linear_transform.h"
+#include "mojo/services/media/common/cpp/local_time.h"
+#include "services/media/framework/conversion_pipeline_builder.h"
+#include "services/media/framework_mojo/audio_track_controller.h"
+#include "services/media/framework_mojo/mojo_type_conversions.h"
+
+namespace mojo {
+namespace media {
+
+AudioTrackController::AudioTrackController(
+    const String& url,
+    std::unique_ptr<StreamType> stream_type,
+    Graph* graph,
+    OutputRef output,
+    ApplicationImpl* app,
+    const ConstructorCallback& callback) {
+  DCHECK(stream_type);
+  DCHECK(graph);
+  DCHECK(output);
+  DCHECK(app);
+
+  AudioServerPtr audio_server;
+  app->ConnectToService(url, &audio_server);
+  audio_server->CreateTrack(GetProxy(&audio_track_));
+
+  // TODO(dalesat): Remove capture hack once c++14 is happening.
+  std::shared_ptr<StreamType> captured_stream_type(stream_type.release());
+
+  // Query the track's format capabilities.
+  audio_track_->Describe(
+    [this, callback, captured_stream_type, graph, output]
+    (AudioTrackDescriptorPtr descriptor) {
+      std::unique_ptr<StreamType> producer_stream_type;
+
+      // Add transforms to the pipeline to convert from stream_type to a type
+      // supported by the track.
+      OutputRef out = output;
+      bool result = BuildConversionPipeline(
+          *captured_stream_type,
+          *Convert(descriptor->supported_media_types),
+          graph,
+          &out,
+          &producer_stream_type);
+      if (!result) {
+        // Failed to build conversion pipeline.
+        callback.Run(nullptr);
+        return;
+      }
+
+      switch (producer_stream_type->scheme()) {
+        case StreamType::Scheme::kLpcm:
+          frames_per_second_ =
+              producer_stream_type->lpcm()->frames_per_second();
+          break;
+        case StreamType::Scheme::kCompressedAudio:
+          frames_per_second_ =
+              producer_stream_type->compressed_audio()->frames_per_second();
+          break;
+        default:
+          // Unsupported producer stream type.
+          callback.Run(nullptr);
+          return;
+      }
+
+      AudioTrackConfigurationPtr config = AudioTrackConfiguration::New();
+      config->media_type = Convert(std::move(producer_stream_type));
+
+      audio_track_->Configure(config.Pass(), GetProxy(&pipe_));
+
+      std::shared_ptr<AudioTrackProducer> sink =
+          AudioTrackProducer::Create(pipe_.Pass());
+      graph->ConnectOutputToPart(out, graph->Add(sink));
+      callback.Run(sink);
+    });
+}
+
+AudioTrackController::~AudioTrackController() {}
+
+void AudioTrackController::SetRate(
+    float rate_factor,
+    const SetRateCallback& callback) {
+  // TODO(dalesat): Set the rate at a particular local time to coordinate rate
+  // changes for multiple outputs.
+  // TODO(dalesat): Need to specify the starting media time for seek and for
+  // sources that don't start at zero.
+  if (!rate_control_.is_bound()) {
+    audio_track_->GetRateControl(GetProxy(&rate_control_));
+  }
+
+  LinearTransform::Ratio audio_rate(
+      static_cast<uint32_t>(frames_per_second_ * rate_factor), 1);
+  LinearTransform::Ratio local_time_rate(
+      LocalDuration::period::num,
+      LocalDuration::period::den);
+
+  LinearTransform::Ratio rate;
+  bool success =
+      LinearTransform::Ratio::Compose(local_time_rate, audio_rate, &rate);
+  DCHECK(success)
+      << "LinearTransform::Ratio::Compose reports loss of precision";
+
+  rate_control_->SetRate(rate.numerator, rate.denominator);
+
+  // TODO(dalesat): Replace this with a clock.
+  // The code below produces a transform that translates local time into media
+  // time in nanosecond units. That transform is delivered to the application,
+  // which uses it to implement a progress bar. This is OK for demo purposes,
+  // but we really need a clock rather than this static transform.
+  rate_control_->GetCurrentTransform(
+    [this, callback](TimelineTransformPtr transform) {
+      // Get the frame rate in local duration units.
+      LinearTransform::Ratio audio_rate(frames_per_second_, 1);
+      LinearTransform::Ratio local_time_rate(
+          LocalDuration::period::num,
+          LocalDuration::period::den);
+      LinearTransform::Ratio presentation_rate;
+      bool success = LinearTransform::Ratio::Compose(
+          local_time_rate,
+          audio_rate,
+          &presentation_rate);
+      DCHECK(success)
+          << "LinearTransform::Ratio::Compose reports loss of precision";
+
+      // Create a LinearTransform to translate from presentation units to
+      // local duration units.
+      LinearTransform local_to_presentation(0, presentation_rate, 0);
+
+      // Translate the current transform quad so the presentation time units
+      // are the same as the local time units.
+      success = local_to_presentation.DoReverseTransform(
+          transform->quad->reference_offset,
+          &transform->quad->reference_offset);
+      DCHECK(success)
+          << "LinearTransform::DoReverseTransform reports loss of precision";
+      int64_t presentation_delta;
+      success = local_to_presentation.DoReverseTransform(
+          static_cast<int64_t>(transform->quad->reference_delta),
+          &presentation_delta);
+      DCHECK(success)
+          << "LinearTransform::DoReverseTransform reports loss of precision";
+      transform->quad->reference_delta =
+          static_cast<int32_t>(presentation_delta);
+      LinearTransform::Ratio::Reduce(
+          &transform->quad->reference_delta,
+          &transform->quad->target_delta);
+
+      callback(transform.Clone());
+    });
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/audio_track_controller.h b/services/media/framework_mojo/audio_track_controller.h
new file mode 100644
index 0000000..7f9d350
--- /dev/null
+++ b/services/media/framework_mojo/audio_track_controller.h
@@ -0,0 +1,48 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_AUDIO_TRACK_CONTROLLER_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_AUDIO_TRACK_CONTROLLER_H_
+
+#include "mojo/public/cpp/application/application_impl.h"
+#include "mojo/services/media/audio/interfaces/audio_track.mojom.h"
+#include "mojo/services/media/common/interfaces/media_pipe.mojom.h"
+#include "services/media/framework/graph.h"
+#include "services/media/framework/stream_type.h"
+#include "services/media/framework_mojo/audio_track_producer.h"
+
+namespace mojo {
+namespace media {
+
+// Controls an audio track.
+class AudioTrackController {
+ public:
+  using ConstructorCallback =
+      Callback<void(std::shared_ptr<AudioTrackProducer>)>;
+  using SetRateCallback = std::function<void(TimelineTransformPtr)>;
+
+  AudioTrackController(
+      const String& url,
+      std::unique_ptr<StreamType> stream_type,
+      Graph* graph,
+      OutputRef output,
+      ApplicationImpl* app,
+      const ConstructorCallback& callback);
+
+  ~AudioTrackController();
+
+  // Sets the rate.
+  void SetRate(float rate_factor, const SetRateCallback& callback);
+
+ private:
+  AudioTrackPtr audio_track_;
+  RateControlPtr rate_control_;
+  MediaPipePtr pipe_;
+  uint32_t frames_per_second_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MOJO_AUDIO_TRACK_CONTROLLER_H_
diff --git a/services/media/framework_mojo/audio_track_producer.cc b/services/media/framework_mojo/audio_track_producer.cc
new file mode 100644
index 0000000..275848e
--- /dev/null
+++ b/services/media/framework_mojo/audio_track_producer.cc
@@ -0,0 +1,47 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework_mojo/audio_track_producer.h"
+
+namespace mojo {
+namespace media {
+
+AudioTrackProducer::AudioTrackProducer(MediaPipePtr pipe) :
+    pipe_(pipe.Pass()) {
+  DCHECK(pipe_.is_bound());
+
+  DCHECK(!mojo_allocator_.initialized());
+  mojo_allocator_.InitNew(256 * 1024); // TODO(dalesat): Made up!
+
+  pipe_->SetBuffer(
+      mojo_allocator_.GetDuplicateHandle(),
+      mojo_allocator_.size());
+}
+
+AudioTrackProducer::~AudioTrackProducer() {}
+
+bool AudioTrackProducer::IsConnected() {
+  return pipe_.is_bound();
+}
+
+void AudioTrackProducer::PushPacketInternal(
+    Packet* packet_raw_ptr,
+    MediaPacketPtr media_packet) {
+  DCHECK(pipe_.is_bound());
+  pipe_->SendPacket(
+      media_packet.Pass(),
+      [this, packet_raw_ptr](MediaPipe::SendResult result) {
+        PacketPtr packet = PacketPtr(packet_raw_ptr);
+        PushCompleted(packet);
+      });
+}
+
+void AudioTrackProducer::FlushPipe(const FlushPipeCallback& callback) {
+  DCHECK(pipe_.is_bound());
+  pipe_->Flush(callback);
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/audio_track_producer.h b/services/media/framework_mojo/audio_track_producer.h
new file mode 100644
index 0000000..4795822
--- /dev/null
+++ b/services/media/framework_mojo/audio_track_producer.h
@@ -0,0 +1,45 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_AUDIO_TRACK_PRODUCER_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_AUDIO_TRACK_PRODUCER_H_
+
+#include "services/media/framework_mojo/push_producer_base.h"
+
+namespace mojo {
+namespace media {
+
+// Delivers a stream to an audio track's MediaPipe.
+class AudioTrackProducer : public PushProducerBase {
+ public:
+  using FlushPipeCallback = mojo::Callback<void()>;
+
+  static std::shared_ptr<AudioTrackProducer> Create(
+      MediaPipePtr pipe) {
+    return std::shared_ptr<AudioTrackProducer>(
+        new AudioTrackProducer(pipe.Pass()));
+  }
+
+  ~AudioTrackProducer() override;
+
+  // Tells the connected pipe to flush.
+  void FlushPipe(const FlushPipeCallback& callback);
+
+ protected:
+  // PushProducerBase overrrides.
+  bool IsConnected() override;
+
+  void PushPacketInternal(Packet* packet_raw_ptr, MediaPacketPtr media_packet)
+      override;
+
+ private:
+  AudioTrackProducer(MediaPipePtr pipe);
+
+  MediaPipePtr pipe_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MOJO_AUDIO_TRACK_PRODUCER_SINK_H_
diff --git a/services/media/framework_mojo/mojo_allocator.cc b/services/media/framework_mojo/mojo_allocator.cc
new file mode 100644
index 0000000..6622357
--- /dev/null
+++ b/services/media/framework_mojo/mojo_allocator.cc
@@ -0,0 +1,21 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework_mojo/mojo_allocator.h"
+
+namespace mojo {
+namespace media {
+
+MojoAllocator::~MojoAllocator() {}
+
+void* MojoAllocator::AllocatePayloadBuffer(size_t size) {
+  return AllocateRegion(size);
+}
+
+void MojoAllocator::ReleasePayloadBuffer(size_t size, void* buffer) {
+  ReleaseRegion(size, buffer);
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework_mojo/mojo_allocator.h b/services/media/framework_mojo/mojo_allocator.h
new file mode 100644
index 0000000..6a0fda4
--- /dev/null
+++ b/services/media/framework_mojo/mojo_allocator.h
@@ -0,0 +1,30 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_ALLOCATOR_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_ALLOCATOR_H_
+
+#include "mojo/services/media/common/cpp/shared_media_buffer_allocator.h"
+#include "services/media/framework/payload_allocator.h"
+
+namespace mojo {
+namespace media {
+
+// Just like SharedMediaBufferAllocator but implements PayloadAllocator.
+class MojoAllocator :
+    public SharedMediaBufferAllocator,
+    public PayloadAllocator {
+ public:
+  ~MojoAllocator() override;
+
+  // PayloadAllocator implementation.
+  void* AllocatePayloadBuffer(size_t size) override;
+
+  void ReleasePayloadBuffer(size_t size, void* buffer) override;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_ALLOCATOR_H_
diff --git a/services/media/framework_mojo/mojo_consumer.cc b/services/media/framework_mojo/mojo_consumer.cc
new file mode 100644
index 0000000..4bd75cf
--- /dev/null
+++ b/services/media/framework_mojo/mojo_consumer.cc
@@ -0,0 +1,99 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "services/media/framework_mojo/mojo_consumer.h"
+
+namespace mojo {
+namespace media {
+
+void MojoConsumerMediaConsumer::Flush(const FlushCallback& callback) {
+  MediaConsumerFlush(callback);
+}
+
+MojoConsumer::MojoConsumer() {}
+
+MojoConsumer::~MojoConsumer() {}
+
+void MojoConsumer::AddBinding(InterfaceRequest<MediaConsumer> consumer) {
+  bindings_.AddBinding(this, consumer.Pass());
+  DCHECK(base::MessageLoop::current());
+  task_runner_ = base::MessageLoop::current()->task_runner();
+  DCHECK(task_runner_);
+}
+
+void MojoConsumer::SetFlushRequestedCallback(
+    const FlushRequestedCallback& callback) {
+  flush_requested_callback_ = callback;
+}
+
+void MojoConsumer::SetBuffer(
+    ScopedSharedBufferHandle buffer,
+    uint64_t size,
+    const SetBufferCallback& callback) {
+  buffer_.InitFromHandle(buffer.Pass(), size);
+  callback.Run();
+}
+
+void MojoConsumer::PushPacket(
+    MediaPacketPtr media_packet,
+    const PushPacketCallback& callback) {
+  DCHECK(supply_callback_);
+  supply_callback_(PacketImpl::Create(
+      media_packet.Pass(),
+      callback,
+      task_runner_,
+      buffer_));
+}
+
+void MojoConsumer::MediaConsumerFlush(const FlushCallback& callback) {
+  if (flush_requested_callback_) {
+    flush_requested_callback_(callback);
+  } else {
+    LOG(WARNING) << "flush requested but no callback registered";
+    callback.Run();
+  }
+}
+
+bool MojoConsumer::can_accept_allocator() const {
+  return false;
+}
+
+void MojoConsumer::set_allocator(PayloadAllocator* allocator) {
+  NOTREACHED();
+}
+
+void MojoConsumer::SetSupplyCallback(const SupplyCallback& supply_callback) {
+  supply_callback_ = supply_callback;
+}
+
+void MojoConsumer::SetDownstreamDemand(Demand demand) {}
+
+MojoConsumer::PacketImpl::PacketImpl(
+    MediaPacketPtr media_packet,
+    const PushPacketCallback& callback,
+    scoped_refptr<base::SingleThreadTaskRunner> task_runner,
+    const MappedSharedBuffer& buffer) :
+    media_packet_(media_packet.Pass()),
+    callback_(callback),
+    task_runner_(task_runner),
+    payload_(buffer.PtrFromOffset(media_packet_->payload->offset)) {}
+
+MojoConsumer::PacketImpl::~PacketImpl() {}
+
+// static
+void MojoConsumer::PacketImpl::RunCallback(const PushPacketCallback& callback) {
+  callback.Run();
+}
+
+void MojoConsumer::PacketImpl::Release() {
+  // TODO(dalesat): Is there a cleaner way to do this?
+  task_runner_->PostTask(FROM_HERE, base::Bind(&RunCallback, callback_));
+  delete this;
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/mojo_consumer.h b/services/media/framework_mojo/mojo_consumer.h
new file mode 100644
index 0000000..9dcca11
--- /dev/null
+++ b/services/media/framework_mojo/mojo_consumer.h
@@ -0,0 +1,134 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_CONSUMER_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_CONSUMER_H_
+
+
+#include "base/single_thread_task_runner.h"
+#include "base/task_runner.h"
+#include "mojo/common/binding_set.h"
+#include "mojo/services/media/common/cpp/mapped_shared_buffer.h"
+#include "mojo/services/media/common/interfaces/media_transport.mojom.h"
+#include "services/media/framework/models/active_source.h"
+
+namespace mojo {
+namespace media {
+
+// Implements MediaConsumer::Flush on behalf of MediaConsumer to avoid name
+// conflict with Part::Flush.
+class MojoConsumerMediaConsumer : public MediaConsumer {
+  // MediaConsumer implementation.
+  void Flush(const FlushCallback& callback) override;
+
+  // Implements MediaConsumer::Flush.
+  virtual void MediaConsumerFlush(const FlushCallback& callback) = 0;
+};
+
+// Implements MediaConsumer to receive a stream from across mojo.
+class MojoConsumer : public MojoConsumerMediaConsumer, public ActiveSource {
+ public:
+  using FlushRequestedCallback = std::function<void(const FlushCallback&)>;
+
+  static std::shared_ptr<MojoConsumer> Create() {
+    return std::shared_ptr<MojoConsumer>(new MojoConsumer());
+  }
+
+  ~MojoConsumer() override;
+
+  // Adds a binding.
+  void AddBinding(InterfaceRequest<MediaConsumer> consumer);
+
+  // Sets a callback signalling that a flush has been requested from the
+  // MediaConsumer client.
+  void SetFlushRequestedCallback(const FlushRequestedCallback& callback);
+
+  // MediaConsumer implementation.
+  void SetBuffer(
+      ScopedSharedBufferHandle buffer,
+      uint64_t size,
+      const SetBufferCallback& callback) override;
+
+  void PushPacket(MediaPacketPtr packet, const PushPacketCallback& callback)
+      override;
+
+  void MediaConsumerFlush(const FlushCallback& callback) override;
+
+  // ActiveSource implementation.
+  bool can_accept_allocator() const override;
+
+  void set_allocator(PayloadAllocator* allocator) override;
+
+  void SetSupplyCallback(const SupplyCallback& supply_callback) override;
+
+  void SetDownstreamDemand(Demand demand) override;
+
+ private:
+  MojoConsumer();
+
+  // Specialized packet implementation.
+  class PacketImpl : public Packet {
+   public:
+    static PacketPtr Create(
+        MediaPacketPtr media_packet,
+        const PushPacketCallback& callback,
+        scoped_refptr<base::SingleThreadTaskRunner> task_runner,
+        const MappedSharedBuffer& buffer) {
+      return PacketPtr(new PacketImpl(
+          media_packet.Pass(),
+          callback,
+          task_runner,
+          buffer));
+    }
+
+    // Packet implementation.
+    int64_t presentation_time() const override {
+      return media_packet_->pts;
+    }
+
+    uint64_t duration() const override {
+      return media_packet_->duration;
+    }
+
+    bool end_of_stream() const override {
+      return media_packet_->end_of_stream;
+    }
+
+    size_t size() const override {
+      return media_packet_->payload->length;
+    }
+
+    void* payload() const override { return payload_; }
+
+   protected:
+    void Release() override;
+
+   private:
+    PacketImpl(
+        MediaPacketPtr media_packet,
+        const PushPacketCallback& callback,
+        scoped_refptr<base::SingleThreadTaskRunner> task_runner,
+        const MappedSharedBuffer& buffer);
+
+    ~PacketImpl() override;
+
+    static void RunCallback(const PushPacketCallback& callback);
+
+    MediaPacketPtr media_packet_;
+    const PushPacketCallback callback_;
+    scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+    void* payload_;
+  };
+
+  BindingSet<MediaConsumer> bindings_;
+  FlushRequestedCallback flush_requested_callback_;
+  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+  MappedSharedBuffer buffer_;
+  SupplyCallback supply_callback_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_CONSUMER_H_
diff --git a/services/media/framework_mojo/mojo_formatting.cc b/services/media/framework_mojo/mojo_formatting.cc
new file mode 100644
index 0000000..c7f313e
--- /dev/null
+++ b/services/media/framework_mojo/mojo_formatting.cc
@@ -0,0 +1,380 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework_mojo/mojo_formatting.h"
+
+namespace mojo {
+namespace media {
+
+template<typename T>
+std::ostream& operator<<(std::ostream& os, const InterfacePtr<T>& value) {
+  if (!value.is_bound()) {
+    return os << "<not bound>" << std::endl;
+  } else {
+    return os << "<bound>" << std::endl;
+  }
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const MediaSourceStreamDescriptorPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "uint32_t index: " << int(value->index) << std::endl;
+  os << begl << "MediaTypePtr media_type: " << value->media_type;
+  os << begl << "MediaTypePtr original_media_type: " <<
+      value->original_media_type;
+  return os << outdent;
+}
+
+std::ostream& operator<<(std::ostream& os, const MediaTypePtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "MediaTypeScheme scheme: " <<
+      StringFromMediaTypeScheme(value->scheme) << std::endl;
+  os << begl << "MediaTypeDetailsPtr details: " << value->details;
+  return os << outdent;
+}
+
+std::ostream& operator<<(std::ostream& os, const MediaTypeSetPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "MediaTypeScheme scheme: " <<
+      StringFromMediaTypeScheme(value->scheme) << std::endl;
+  os << begl << "MediaTypeSetDetailsPtr details: " << value->details;
+  return os << outdent;
+}
+
+std::ostream& operator<<(std::ostream& os, const MediaTypeDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else   if (value->has_unknown_tag()) {
+    return os << "<empty>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  if (value->is_multiplexed()) {
+    return os << begl << "MultiplexedMediaTypeDetailsPtr* multiplexed: " <<
+        value->get_multiplexed() << outdent;
+  }
+  if (value->is_lpcm()) {
+    return os << begl << "LpcmMediaTypeDetailsPtr* lpcm: " <<
+        value->get_lpcm() << outdent;
+  }
+  if (value->is_compressed_audio()) {
+    return os << begl <<
+        "CompressedAudiomMediaTypeDetailsPtr* compressed_audio: " <<
+        value->get_compressed_audio() << outdent;
+  }
+  if (value->is_video()) {
+    return os << begl << "VideoMediaTypeDetailsPtr* video: " <<
+        value->get_video() << outdent;
+  }
+  return os << begl << "UNKNOWN TAG" << std::endl << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const MediaTypeSetDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else   if (value->has_unknown_tag()) {
+    return os << "<empty>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  if (value->is_multiplexed()) {
+    return os << begl << "MultiplexedMediaTypeSetDetailsPtr* multiplexed: " <<
+        value->get_multiplexed() << outdent;
+  }
+  if (value->is_lpcm()) {
+    return os << begl << "LpcmMediaTypeSetDetailsPtr* lpcm: " <<
+        value->get_lpcm() << outdent;
+  }
+  if (value->is_compressed_audio()) {
+    return os << begl <<
+        "CompressedAudioMediaTypeSetDetailsPtr* compressed_audio: " <<
+        value->get_compressed_audio() << outdent;
+  }
+  if (value->is_video()) {
+    return os << begl << "VideoMediaTypeSetDetailsPtr* video: " <<
+        value->get_video() << outdent;
+  }
+  return os << begl << "UNKNOWN TAG" << std::endl << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const MultiplexedMediaTypeDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "MediaTypePtr multiplex_type: " << value->multiplex_type;
+  os << begl << "Array<MediaTypePtr> substream_types: " <<
+      value->substream_types;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const MultiplexedMediaTypeSetDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "MediaTypeSetPtr multiplex_type_set: " <<
+      value->multiplex_type_set;
+  os << begl << "Array<MediaTypeSetPtr> substream_type_sets: " <<
+      value->substream_type_sets;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const LpcmMediaTypeDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "LpcmSampleFormat sample_format: " <<
+      StringFromLpcmSampleFormat(value->sample_format) << std::endl;
+  os << begl << "uint32_t channels: " << int(value->channels) << std::endl;
+  os << begl << "uint32_t frames_per_second: " << value->frames_per_second <<
+      std::endl;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const LpcmMediaTypeSetDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "LpcmSampleFormat sample_format: " <<
+      StringFromLpcmSampleFormat(value->sample_format) << std::endl;
+  os << begl << "uint32_t min_channels: " << int(value->min_channels) <<
+      std::endl;
+  os << begl << "uint32_t max_channels: " << int(value->max_channels) <<
+      std::endl;
+  os << begl << "uint32_t min_frames_per_second: " <<
+      value->min_frames_per_second << std::endl;
+  os << begl << "uint32_t max_cframes_per_second: " <<
+      value->max_frames_per_second << std::endl;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const CompressedAudioMediaTypeDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "AudioEncoding encoding: " <<
+      StringFromAudioEncoding(value->encoding) << std::endl;
+  os << begl << "LpcmSampleFormat sample_format: " <<
+      StringFromLpcmSampleFormat(value->sample_format) << std::endl;
+  os << begl << "uint32_t channels: " << int(value->channels) << std::endl;
+  os << begl << "uint32_t frames_per_second: " << value->frames_per_second <<
+      std::endl;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const CompressedAudioMediaTypeSetDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "AudioEncoding encoding: " <<
+      StringFromAudioEncoding(value->encoding) << std::endl;
+  os << begl << "LpcmSampleFormat sample_format: " <<
+      StringFromLpcmSampleFormat(value->sample_format) << std::endl;
+  os << begl << "uint32_t min_channels: " << int(value->min_channels) <<
+      std::endl;
+  os << begl << "uint32_t max_channels: " << int(value->max_channels) <<
+      std::endl;
+  os << begl << "uint32_t min_frames_per_second: " <<
+      value->min_frames_per_second << std::endl;
+  os << begl << "uint32_t max_cframes_per_second: " <<
+      value->max_frames_per_second << std::endl;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const VideoMediaTypeDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "VideoEncoding encoding: " <<
+      StringFromVideoEncoding(value->encoding) << std::endl;
+  os << begl << "VideoProfile profile: " << value->profile << std::endl;
+  os << begl << "PixelFormat pixel_format: " <<
+      value->pixel_format << std::endl;
+  os << begl << "ColorSpace color_space: " << value->color_space << std::endl;
+  os << begl << "uint32_t width: " << value->width << std::endl;
+  os << begl << "uint32_t height: " << value->height << std::endl;
+  os << begl << "uint32_t coded_width: " << value->coded_width << std::endl;
+  os << begl << "uint32_t coded_height: " << value->coded_height << std::endl;
+  return os << outdent;
+}
+
+std::ostream& operator<<(
+    std::ostream& os,
+    const VideoMediaTypeSetDetailsPtr& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  os << indent;
+  os << begl << "VideoEncoding encoding: " <<
+      StringFromVideoEncoding(value->encoding) << std::endl;
+  os << begl << "uint32_t min_width: " << value->min_width << std::endl;
+  os << begl << "uint32_t max_width: " << value->max_width << std::endl;
+  os << begl << "uint32_t min_height: " << value->min_height << std::endl;
+  os << begl << "uint32_t max_height: " << value->max_height << std::endl;
+  return os << outdent;
+}
+
+const char* StringFromMediaTypeScheme(MediaTypeScheme value) {
+  switch (value) {
+    case MediaTypeScheme::UNKNOWN:
+      return "UNKNOWN";
+    case MediaTypeScheme::NONE:
+      return "NONE";
+    case MediaTypeScheme::ANY_ELEMENTARY:
+      return "ANY_ELEMENTARY";
+    case MediaTypeScheme::ANY_AUDIO:
+      return "ANY_AUDIO";
+    case MediaTypeScheme::ANY_VIDEO:
+      return "ANY_VIDEO";
+    case MediaTypeScheme::ANY_TEXT:
+      return "ANY_TEXT";
+    case MediaTypeScheme::ANY_SUBPICTURE:
+      return "ANY_SUBPICTURE";
+    case MediaTypeScheme::ANY_MULTIPLEXED:
+      return "ANY_MULTIPLEXED";
+    case MediaTypeScheme::MULTIPLEXED:
+      return "MULTIPLEXED";
+    case MediaTypeScheme::ANY:
+      return "ANY";
+    case MediaTypeScheme::LPCM:
+      return "LPCM";
+    case MediaTypeScheme::COMPRESSED_AUDIO:
+      return "COMPRESSED_AUDIO";
+    case MediaTypeScheme::VIDEO:
+      return "VIDEO";
+  }
+  return "UNKNOWN SCHEME";
+}
+
+const char* StringFromLpcmSampleFormat(LpcmSampleFormat value) {
+  switch (value) {
+    case LpcmSampleFormat::UNKNOWN:
+      return "UNKNOWN";
+    case LpcmSampleFormat::ANY:
+      return "ANY";
+    case LpcmSampleFormat::UNSIGNED_8:
+      return "UNSIGNED_8";
+    case LpcmSampleFormat::SIGNED_16:
+      return "SIGNED_16";
+    case LpcmSampleFormat::SIGNED_24_IN_32:
+      return "SIGNED_24_IN_32";
+    case LpcmSampleFormat::FLOAT:
+      return "FLOAT";
+  }
+  return "UNKNOWN FORMAT";
+}
+
+const char* StringFromAudioEncoding(AudioEncoding value) {
+  switch (value) {
+    case AudioEncoding::UNKNOWN:
+      return "UNKNOWN";
+    case AudioEncoding::ANY:
+      return "ANY";
+    case AudioEncoding::VORBIS:
+      return "VORBIS";
+  }
+  return "UNKNOWN AUDIO ENCODING";
+}
+
+const char* StringFromVideoEncoding(VideoEncoding value) {
+  switch (value) {
+    case VideoEncoding::UNKNOWN:
+      return "UNKNOWN";
+    case VideoEncoding::ANY:
+      return "ANY";
+    case VideoEncoding::THEORA:
+      return "THEORA";
+    case VideoEncoding::VP8:
+      return "VP8";
+  }
+  return "UNKNOWN VIDEO ENCODING";
+}
+
+const char* StringFromMediaState(MediaState value) {
+  switch (value) {
+    case MediaState::FAULT:
+      return "FAULT";
+    case MediaState::UNPREPARED:
+      return "UNPREPARED";
+    case MediaState::PAUSED:
+      return "PAUSED";
+    case MediaState::PLAYING:
+      return "PLAYING";
+    case MediaState::ENDED:
+      return "ENDED";
+  }
+  return "UNKNOWN MEDIA STATE";
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/mojo_formatting.h b/services/media/framework_mojo/mojo_formatting.h
new file mode 100644
index 0000000..a54bd5b
--- /dev/null
+++ b/services/media/framework_mojo/mojo_formatting.h
@@ -0,0 +1,84 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_FORMATTING_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_FORMATTING_H_
+
+#include "mojo/services/media/common/interfaces/media_common.mojom.h"
+#include "mojo/services/media/common/interfaces/media_pipe.mojom.h"
+#include "mojo/services/media/common/interfaces/media_types.mojom.h"
+#include "mojo/services/media/control/interfaces/media_source.mojom.h"
+#include "services/media/framework/formatting.h"
+
+namespace mojo {
+namespace media {
+
+// See services/media/framework/ostream.h for details.
+
+// Mojo defines versions of operator<< for this that produce only numbers.
+const char* StringFromMediaTypeScheme(MediaTypeScheme value);
+const char* StringFromLpcmSampleFormat(LpcmSampleFormat value);
+const char* StringFromAudioEncoding(AudioEncoding value);
+const char* StringFromVideoEncoding(VideoEncoding value);
+const char* StringFromMediaState(MediaState value);
+
+// The following overloads add newlines.
+
+template<typename T>
+std::ostream& operator<<(std::ostream& os, const InterfacePtr<T>& value);
+
+template<typename T>
+std::ostream& operator<<(std::ostream& os, const Array<T>& value) {
+  if (!value) {
+    return os << "<nullptr>" << std::endl;
+  } else if (value.size() == 0) {
+      return os << "<empty>" << std::endl;
+  } else {
+    os << std::endl;
+  }
+
+  int index = 0;
+  for (T& element : const_cast<Array<T>&>(value)) {
+    os << "[" << index++ << "]: " << element;
+  }
+
+  return os;
+}
+
+std::ostream& operator<<(std::ostream& os, const MediaTypePtr& value);
+std::ostream& operator<<(std::ostream& os, const MediaTypeSetPtr& value);
+std::ostream& operator<<(std::ostream& os, const MediaTypeDetailsPtr& value);
+std::ostream& operator<<(std::ostream& os, const MediaTypeSetDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const MultiplexedMediaTypeDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const MultiplexedMediaTypeSetDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const LpcmMediaTypeDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const LpcmMediaTypeSetDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const CompressedAudioMediaTypeDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const CompressedAudioMediaTypeSetDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const VideoMediaTypeDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const VideoMediaTypeSetDetailsPtr& value);
+std::ostream& operator<<(
+    std::ostream& os,
+    const MediaSourceStreamDescriptorPtr& value);
+
+} // namespace media
+} // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_FORMATTING_H_
diff --git a/services/media/framework_mojo/mojo_producer.cc b/services/media/framework_mojo/mojo_producer.cc
new file mode 100644
index 0000000..b772e92
--- /dev/null
+++ b/services/media/framework_mojo/mojo_producer.cc
@@ -0,0 +1,64 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework_mojo/mojo_producer.h"
+
+namespace mojo {
+namespace media {
+
+MojoProducer::MojoProducer() {}
+
+MojoProducer::~MojoProducer() {}
+
+void MojoProducer::AddBinding(InterfaceRequest<MediaProducer> producer) {
+  bindings_.AddBinding(this, producer.Pass());
+}
+
+void MojoProducer::FlushConsumer(const FlushConsumerCallback& callback) {
+  DCHECK(consumer_.is_bound());
+  consumer_->Flush(callback);
+}
+
+void MojoProducer::Connect(
+    InterfaceHandle<MediaConsumer> consumer,
+    const ConnectCallback& callback) {
+  DCHECK(consumer);
+
+  consumer_ = MediaConsumerPtr::Create(std::move(consumer));
+
+  if (!mojo_allocator_.initialized()) {
+    mojo_allocator_.InitNew(256 * 1024); // TODO(dalesat): Made up!
+  }
+
+  consumer_->SetBuffer(
+      mojo_allocator_.GetDuplicateHandle(),
+      mojo_allocator_.size(),
+      [callback]() {
+    callback.Run();
+  });
+}
+
+void MojoProducer::Disconnect() {
+  OnConnectionLost();
+  consumer_.reset();
+}
+
+bool MojoProducer::IsConnected() {
+  return consumer_.is_bound();
+}
+
+void MojoProducer::PushPacketInternal(
+    Packet* packet_raw_ptr,
+    MediaPacketPtr media_packet) {
+  consumer_->PushPacket(
+    media_packet.Pass(),
+    [this, packet_raw_ptr]() {
+      PacketPtr packet = PacketPtr(packet_raw_ptr);
+      PushCompleted(packet);
+    });
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/mojo_producer.h b/services/media/framework_mojo/mojo_producer.h
new file mode 100644
index 0000000..78023b3
--- /dev/null
+++ b/services/media/framework_mojo/mojo_producer.h
@@ -0,0 +1,56 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_PRODUCER_SINK_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_PRODUCER_SINK_H_
+
+#include "mojo/common/binding_set.h"
+#include "mojo/services/media/common/interfaces/media_transport.mojom.h"
+#include "services/media/framework_mojo/push_producer_base.h"
+
+namespace mojo {
+namespace media {
+
+// Implements MediaProducer to forward a stream across mojo.
+class MojoProducer : public MediaProducer, public PushProducerBase {
+ public:
+  using FlushConsumerCallback = mojo::Callback<void()>;
+
+  static std::shared_ptr<MojoProducer> Create() {
+    return std::shared_ptr<MojoProducer>(new MojoProducer());
+  }
+
+  ~MojoProducer() override;
+
+  // Adds a binding.
+  void AddBinding(InterfaceRequest<MediaProducer> producer);
+
+  // Tells the connected consumer to flush.
+  void FlushConsumer(const FlushConsumerCallback& callback);
+
+  // MediaProducer implementation.
+  void Connect(
+      InterfaceHandle<MediaConsumer> consumer,
+      const ConnectCallback& callback) override;
+
+  void Disconnect() override;
+
+ protected:
+  // PushProducerBase overrrides.
+  bool IsConnected() override;
+
+  void PushPacketInternal(Packet* packet_raw_ptr, MediaPacketPtr media_packet)
+      override;
+
+ private:
+  MojoProducer();
+
+  BindingSet<MediaProducer> bindings_;
+  MediaConsumerPtr consumer_;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_PRODUCER_SINK_H_
diff --git a/services/media/framework_mojo/mojo_pull_mode_producer.cc b/services/media/framework_mojo/mojo_pull_mode_producer.cc
new file mode 100644
index 0000000..596d618
--- /dev/null
+++ b/services/media/framework_mojo/mojo_pull_mode_producer.cc
@@ -0,0 +1,208 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/logging.h"
+#include "services/media/framework_mojo/mojo_pull_mode_producer.h"
+
+namespace mojo {
+namespace media {
+
+MojoPullModeProducer::MojoPullModeProducer() :
+    state_(MediaState::UNPREPARED),
+    demand_(Demand::kNegative),
+    presentation_time_(0),
+    cached_packet_(nullptr) {}
+
+MojoPullModeProducer::~MojoPullModeProducer() {
+  base::AutoLock lock(lock_);
+}
+
+void MojoPullModeProducer::AddBinding(
+    InterfaceRequest<MediaPullModeProducer> producer) {
+  bindings_.AddBinding(this, producer.Pass());
+}
+
+void MojoPullModeProducer::GetBuffer(const GetBufferCallback& callback) {
+  if (!mojo_allocator_.initialized()) {
+    mojo_allocator_.InitNew(256 * 1024); // TODO(dalesat): Made up!
+  }
+
+  {
+    base::AutoLock lock(lock_);
+    if (state_ == MediaState::UNPREPARED) {
+      state_ = MediaState::PAUSED;
+    }
+  }
+
+  callback.Run(mojo_allocator_.GetDuplicateHandle());
+
+  DCHECK(!cached_packet_);
+  DCHECK(demand_callback_);
+  demand_callback_(Demand::kPositive);
+}
+
+void MojoPullModeProducer::PullPacket(
+    MediaPacketPtr to_release,
+    const PullPacketCallback& callback) {
+  if (to_release) {
+    // The client has piggy-backed a release on this pull request.
+    ReleasePacket(to_release.Pass());
+  }
+
+  {
+    base::AutoLock lock(lock_);
+
+    if (state_ == MediaState::UNPREPARED) {
+      // The consumer has yet to call GetBuffer. This request will have to wait.
+      pending_pulls_.push_back(callback);
+      return;
+    }
+
+    DCHECK(mojo_allocator_.initialized());
+
+    // If there are no pending requests, see if we can handle this now. If
+    // requests are pending, add the callback to the pending queue.
+    if (!pending_pulls_.empty() || !MaybeHandlePullUnsafe(callback)) {
+      pending_pulls_.push_back(callback);
+    }
+
+    DCHECK(!cached_packet_);
+  }
+
+  DCHECK(demand_callback_);
+  demand_callback_(Demand::kPositive);
+}
+
+void MojoPullModeProducer::ReleasePacket(MediaPacketPtr to_release) {
+  {
+    base::AutoLock lock(lock_);
+    uint64_t size = to_release->payload ? to_release->payload->length : 0;
+    void* payload = size == 0 ? nullptr :
+        mojo_allocator_.PtrFromOffset(to_release->payload->offset);
+
+    for (auto iterator = unreleased_packets_.begin(); true; ++iterator) {
+      if (iterator == unreleased_packets_.end()) {
+        DCHECK(false) << "released packet has bad offset and/or size";
+        break;
+      }
+
+      if ((*iterator)->payload() == payload && (*iterator)->size() == size) {
+        unreleased_packets_.erase(iterator);
+        break;
+      }
+    }
+
+    // TODO(dalesat): What if the allocator has starved?
+  }
+
+  DCHECK(demand_callback_);
+  demand_callback_(cached_packet_ ? Demand::kNegative : Demand::kPositive);
+}
+
+PayloadAllocator* MojoPullModeProducer::allocator() {
+  return mojo_allocator_.initialized() ? &mojo_allocator_ : nullptr;
+}
+
+void MojoPullModeProducer::SetDemandCallback(
+    const DemandCallback& demand_callback) {
+  demand_callback_ = demand_callback;
+}
+
+void MojoPullModeProducer::Prime() {
+  DCHECK(demand_callback_);
+  demand_callback_(Demand::kNeutral);
+}
+
+Demand MojoPullModeProducer::SupplyPacket(PacketPtr packet) {
+  base::AutoLock lock(lock_);
+  DCHECK(demand_ != Demand::kNegative) << "packet pushed with negative demand";
+  DCHECK(state_ != MediaState::ENDED) << "packet pushed after end-of-stream";
+
+  DCHECK(!cached_packet_);
+
+  // If there's no binding on the stream, throw the packet away. This can
+  // happen if a pull client disconnects unexpectedly.
+  if (bindings_.size() == 0) {
+    demand_ = Demand::kNegative;
+    state_ = MediaState::UNPREPARED;
+    // TODO(dalesat): More shutdown?
+    return demand_;
+  }
+
+  // Accept the packet and handle pending pulls with it.
+  cached_packet_ = std::move(packet);
+
+  HandlePendingPullsUnsafe();
+
+  demand_ = cached_packet_ ? Demand::kNegative : Demand::kPositive;
+  return demand_;
+}
+
+void MojoPullModeProducer::HandlePendingPullsUnsafe() {
+  lock_.AssertAcquired();
+
+  while (!pending_pulls_.empty()) {
+    DCHECK(mojo_allocator_.initialized());
+
+    if (MaybeHandlePullUnsafe(pending_pulls_.front())) {
+      pending_pulls_.pop_front();
+    } else {
+      break;
+    }
+  }
+}
+
+bool MojoPullModeProducer::MaybeHandlePullUnsafe(
+    const PullPacketCallback& callback) {
+  DCHECK(!callback.is_null());
+  lock_.AssertAcquired();
+
+  if (state_ == MediaState::ENDED) {
+    // At end-of-stream. Respond with empty end-of-stream packet.
+    HandlePullWithPacketUnsafe(
+        callback,
+        Packet::CreateEndOfStream(presentation_time_));
+    return true;
+  }
+
+  if (!cached_packet_) {
+    // Waiting for packet or end-of-stream indication.
+    return false;
+  }
+
+  HandlePullWithPacketUnsafe(callback, std::move(cached_packet_));
+  return true;
+}
+
+void MojoPullModeProducer::HandlePullWithPacketUnsafe(
+    const PullPacketCallback& callback,
+    PacketPtr packet) {
+  DCHECK(packet);
+  lock_.AssertAcquired();
+
+  // TODO(dalesat): Use TaskRunner for this callback.
+  callback.Run(CreateMediaPacket(packet));
+  unreleased_packets_.push_back(std::move(packet));
+}
+
+MediaPacketPtr MojoPullModeProducer::CreateMediaPacket(
+    const PacketPtr& packet) {
+  DCHECK(packet);
+
+  MediaPacketRegionPtr region = MediaPacketRegion::New();
+  region->offset = mojo_allocator_.OffsetFromPtr(packet->payload());
+  region->length = packet->size();
+
+  MediaPacketPtr media_packet = MediaPacket::New();
+  media_packet->pts = packet->presentation_time();
+  media_packet->duration = packet->duration();
+  media_packet->end_of_stream = packet->end_of_stream();
+  media_packet->payload = region.Pass();
+  presentation_time_ = packet->presentation_time() + packet->duration();
+
+  return media_packet.Pass();
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/mojo_pull_mode_producer.h b/services/media/framework_mojo/mojo_pull_mode_producer.h
new file mode 100644
index 0000000..3ff360f
--- /dev/null
+++ b/services/media/framework_mojo/mojo_pull_mode_producer.h
@@ -0,0 +1,101 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_PULL_MODE_PRODUCER_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_PULL_MODE_PRODUCER_H_
+
+#include <deque>
+
+#include "base/synchronization/lock.h"
+#include "mojo/common/binding_set.h"
+#include "mojo/services/media/common/interfaces/media_state.mojom.h"
+#include "mojo/services/media/common/interfaces/media_transport.mojom.h"
+#include "services/media/framework/models/active_sink.h"
+#include "services/media/framework_mojo/mojo_allocator.h"
+
+namespace mojo {
+namespace media {
+
+// Implements MediaPullModeProducer to forward a stream across mojo.
+class MojoPullModeProducer :
+    public MediaPullModeProducer,
+    public ActiveSink {
+ public:
+  static std::shared_ptr<MojoPullModeProducer> Create() {
+    return std::shared_ptr<MojoPullModeProducer>(new MojoPullModeProducer());
+  }
+
+  ~MojoPullModeProducer() override;
+
+  // Adds a binding.
+  void AddBinding(InterfaceRequest<MediaPullModeProducer> producer);
+
+  // MediaPullModeProducer implementation.
+  void GetBuffer(const GetBufferCallback& callback) override;
+
+  void PullPacket(
+      MediaPacketPtr to_release,
+      const PullPacketCallback& callback) override;
+
+  void ReleasePacket(MediaPacketPtr to_release) override;
+
+  // ActiveSink implementation.
+  PayloadAllocator* allocator() override;
+
+  void SetDemandCallback(const DemandCallback& demand_callback) override;
+
+  void Prime() override;
+
+  Demand SupplyPacket(PacketPtr packet) override;
+
+ private:
+  MojoPullModeProducer();
+
+  // Handles as many pending pulls as possible.
+  // MUST BE CALLED WITH lock_ TAKEN.
+  void HandlePendingPullsUnsafe();
+
+  // Attempts to handle a pull and indicates whether it was handled.
+  // MUST BE CALLED WITH lock_ TAKEN.
+  bool MaybeHandlePullUnsafe(const PullPacketCallback& callback);
+
+  // Runs the callback with a new MediaPacket created from the given Packet.
+  // MUST BE CALLED WITH lock_ TAKEN.
+  void HandlePullWithPacketUnsafe(
+      const PullPacketCallback& callback,
+      PacketPtr packet);
+
+  // Creates a MediaPacket from a Packet.
+  MediaPacketPtr CreateMediaPacket(const PacketPtr& packet);
+
+  BindingSet<MediaPullModeProducer> bindings_;
+
+  DemandCallback demand_callback_;
+
+  // Allocates from the shared buffer.
+  MojoAllocator mojo_allocator_;
+
+  mutable base::Lock lock_;
+  // THE FIELDS BELOW SHOULD ONLY BE ACCESSED WITH lock_ TAKEN.
+  MediaState state_;
+  Demand demand_;
+  int64_t presentation_time_;
+
+  // pending_pulls_ contains the callbacks for the pull requests that have yet
+  // to be satisfied. unreleased_packets_ contains the packets that have been
+  // delivered via pull but have not yet been released. cached_packet_ is a
+  // packet waiting for a pull (we keep one ready so we can be preparing a
+  // packet between pulls). If cached_packet_ isn't nullptr, pending_pulls_
+  // should be empty. We signal positive demand when cached_packet_ is nullptr
+  // and negative demand when it isn't.
+  std::deque<PullPacketCallback> pending_pulls_;
+  std::deque<PacketPtr> unreleased_packets_;
+  PacketPtr cached_packet_;
+  // THE FIELDS ABOVE SHOULD ONLY BE ACCESSED WITH lock_ TAKEN.
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_PULL_MODE_PRODUCER_H_
diff --git a/services/media/framework_mojo/mojo_type_conversions.cc b/services/media/framework_mojo/mojo_type_conversions.cc
new file mode 100644
index 0000000..8e38142
--- /dev/null
+++ b/services/media/framework_mojo/mojo_type_conversions.cc
@@ -0,0 +1,749 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework_mojo/mojo_type_conversions.h"
+#include "third_party/modp_b64/modp_b64.h"
+
+namespace mojo {
+namespace media {
+
+StreamType::Scheme Convert(
+    MediaTypeScheme media_type_scheme) {
+  switch (media_type_scheme) {
+    case MediaTypeScheme::UNKNOWN:
+      return StreamType::Scheme::kUnknown;
+    case MediaTypeScheme::NONE:
+      return StreamType::Scheme::kNone;
+    case MediaTypeScheme::ANY_ELEMENTARY:
+      return StreamType::Scheme::kAnyElementary;
+    case MediaTypeScheme::ANY_AUDIO:
+      return StreamType::Scheme::kAnyAudio;
+    case MediaTypeScheme::ANY_VIDEO:
+      return StreamType::Scheme::kAnyVideo;
+    case MediaTypeScheme::ANY_SUBPICTURE:
+      return StreamType::Scheme::kAnySubpicture;
+    case MediaTypeScheme::ANY_TEXT:
+      return StreamType::Scheme::kAnyText;
+    case MediaTypeScheme::ANY_MULTIPLEXED:
+      return StreamType::Scheme::kAnyMultiplexed;
+    case MediaTypeScheme::ANY:
+      return StreamType::Scheme::kAny;
+    case MediaTypeScheme::MULTIPLEXED:
+      return StreamType::Scheme::kMultiplexed;
+    case MediaTypeScheme::LPCM:
+      return StreamType::Scheme::kLpcm;
+    case MediaTypeScheme::COMPRESSED_AUDIO:
+      return StreamType::Scheme::kCompressedAudio;
+    case MediaTypeScheme::VIDEO:
+      return StreamType::Scheme::kVideo;
+  }
+  return StreamType::Scheme::kUnknown;
+}
+
+LpcmStreamType::SampleFormat Convert(LpcmSampleFormat lpcm_sample_format) {
+  switch (lpcm_sample_format) {
+    case LpcmSampleFormat::UNKNOWN:
+      return LpcmStreamType::SampleFormat::kUnknown;
+    case LpcmSampleFormat::ANY:
+      return LpcmStreamType::SampleFormat::kAny;
+    case LpcmSampleFormat::UNSIGNED_8:
+      return LpcmStreamType::SampleFormat::kUnsigned8;
+    case LpcmSampleFormat::SIGNED_16:
+      return LpcmStreamType::SampleFormat::kSigned16;
+    case LpcmSampleFormat::SIGNED_24_IN_32:
+      return LpcmStreamType::SampleFormat::kSigned24In32;
+    case LpcmSampleFormat::FLOAT:
+      return LpcmStreamType::SampleFormat::kFloat;
+  }
+  return LpcmStreamType::SampleFormat::kUnknown;
+}
+
+CompressedAudioStreamType::AudioEncoding Convert(
+    AudioEncoding audio_encoding) {
+  switch (audio_encoding) {
+    case AudioEncoding::UNKNOWN:
+      return CompressedAudioStreamType::AudioEncoding::kUnknown;
+    case AudioEncoding::ANY:
+      return CompressedAudioStreamType::AudioEncoding::kAny;
+    case AudioEncoding::VORBIS:
+      return CompressedAudioStreamType::AudioEncoding::kVorbis;
+  }
+  return CompressedAudioStreamType::AudioEncoding::kUnknown;
+}
+
+VideoStreamType::VideoEncoding Convert(VideoEncoding video_encoding) {
+  switch (video_encoding) {
+    case VideoEncoding::UNKNOWN:
+      return VideoStreamType::VideoEncoding::kUnknown;
+    case VideoEncoding::ANY:
+      return VideoStreamType::VideoEncoding::kAny;
+    case VideoEncoding::THEORA:
+      return VideoStreamType::VideoEncoding::kTheora;
+    case VideoEncoding::VP8:
+      return VideoStreamType::VideoEncoding::kVp8;
+  }
+  return VideoStreamType::VideoEncoding::kUnknown;
+}
+
+VideoStreamType::VideoProfile Convert(VideoProfile video_profile) {
+  switch (video_profile) {
+    case VideoProfile::UNKNOWN:
+      return VideoStreamType::VideoProfile::kUnknown;
+    case VideoProfile::NOT_APPLICABLE:
+      return VideoStreamType::VideoProfile::kNotApplicable;
+    case VideoProfile::H264_BASELINE:
+      return VideoStreamType::VideoProfile::kH264Baseline;
+    case VideoProfile::H264_MAIN:
+      return VideoStreamType::VideoProfile::kH264Main;
+    case VideoProfile::H264_EXTENDED:
+      return VideoStreamType::VideoProfile::kH264Extended;
+    case VideoProfile::H264_HIGH:
+      return VideoStreamType::VideoProfile::kH264High;
+    case VideoProfile::H264_HIGH10:
+      return VideoStreamType::VideoProfile::kH264High10;
+    case VideoProfile::H264_HIGH422:
+      return VideoStreamType::VideoProfile::kH264High422;
+    case VideoProfile::H264_HIGH444_PREDICTIVE:
+      return VideoStreamType::VideoProfile::kH264High444Predictive;
+    case VideoProfile::H264_SCALABLE_BASELINE:
+      return VideoStreamType::VideoProfile::kH264ScalableBaseline;
+    case VideoProfile::H264_SCALABLE_HIGH:
+      return VideoStreamType::VideoProfile::kH264ScalableHigh;
+    case VideoProfile::H264_STEREO_HIGH:
+      return VideoStreamType::VideoProfile::kH264StereoHigh;
+    case VideoProfile::H264_MULTIVIEW_HIGH:
+      return VideoStreamType::VideoProfile::kH264MultiviewHigh;
+  }
+  return VideoStreamType::VideoProfile::kUnknown;
+}
+
+VideoStreamType::PixelFormat Convert(PixelFormat pixel_format) {
+  switch (pixel_format) {
+    case PixelFormat::UNKNOWN:
+      return VideoStreamType::PixelFormat::kUnknown;
+    case PixelFormat::I420:
+      return VideoStreamType::PixelFormat::kI420;
+    case PixelFormat::YV12:
+      return VideoStreamType::PixelFormat::kYv12;
+    case PixelFormat::YV16:
+      return VideoStreamType::PixelFormat::kYv16;
+    case PixelFormat::YV12A:
+      return VideoStreamType::PixelFormat::kYv12A;
+    case PixelFormat::YV24:
+      return VideoStreamType::PixelFormat::kYv24;
+    case PixelFormat::NV12:
+      return VideoStreamType::PixelFormat::kNv12;
+    case PixelFormat::NV21:
+      return VideoStreamType::PixelFormat::kNv21;
+    case PixelFormat::UYVY:
+      return VideoStreamType::PixelFormat::kUyvy;
+    case PixelFormat::YUY2:
+      return VideoStreamType::PixelFormat::kYuy2;
+    case PixelFormat::ARGB:
+      return VideoStreamType::PixelFormat::kArgb;
+    case PixelFormat::XRGB:
+      return VideoStreamType::PixelFormat::kXrgb;
+    case PixelFormat::RGB24:
+      return VideoStreamType::PixelFormat::kRgb24;
+    case PixelFormat::RGB32:
+      return VideoStreamType::PixelFormat::kRgb32;
+    case PixelFormat::MJPEG:
+      return VideoStreamType::PixelFormat::kMjpeg;
+    case PixelFormat::MT21:
+      return VideoStreamType::PixelFormat::kMt21;
+  }
+  return VideoStreamType::PixelFormat::kUnknown;
+}
+
+VideoStreamType::ColorSpace Convert(ColorSpace color_space) {
+  switch (color_space) {
+    case ColorSpace::UNKNOWN:
+      return VideoStreamType::ColorSpace::kUnknown;
+    case ColorSpace::NOT_APPLICABLE:
+      return VideoStreamType::ColorSpace::kNotApplicable;
+    case ColorSpace::JPEG:
+      return VideoStreamType::ColorSpace::kJpeg;
+    case ColorSpace::HD_REC709:
+      return VideoStreamType::ColorSpace::kHdRec709;
+    case ColorSpace::SD_REC601:
+      return VideoStreamType::ColorSpace::kSdRec601;
+  }
+  return VideoStreamType::ColorSpace::kUnknown;
+}
+
+std::unique_ptr<StreamType> Convert(const MediaTypePtr& media_type) {
+  if (!media_type) {
+    return nullptr;
+  }
+
+  switch (media_type->scheme) {
+    case MediaTypeScheme::MULTIPLEXED:
+      return MultiplexedStreamType::Create(
+          Convert(media_type->details->get_multiplexed()->multiplex_type),
+          Convert(media_type->details->get_multiplexed()->substream_types));
+    case MediaTypeScheme::LPCM:
+      return LpcmStreamType::Create(
+          Convert(media_type->details->get_lpcm()->sample_format),
+          media_type->details->get_lpcm()->channels,
+          media_type->details->get_lpcm()->frames_per_second);
+    case MediaTypeScheme::COMPRESSED_AUDIO:
+      return CompressedAudioStreamType::Create(
+          Convert(media_type->details->get_compressed_audio()->encoding),
+          Convert(media_type->details->get_compressed_audio()->sample_format),
+          media_type->details->get_compressed_audio()->channels,
+          media_type->details->get_compressed_audio()->frames_per_second,
+          Convert(media_type->details->get_compressed_audio()->
+              extra_data_base64));
+    case MediaTypeScheme::VIDEO:
+      return VideoStreamType::Create(
+          Convert(media_type->details->get_video()->encoding),
+          Convert(media_type->details->get_video()->profile),
+          Convert(media_type->details->get_video()->pixel_format),
+          Convert(media_type->details->get_video()->color_space),
+          media_type->details->get_video()->width,
+          media_type->details->get_video()->height,
+          media_type->details->get_video()->coded_width,
+          media_type->details->get_video()->coded_height,
+          Convert(media_type->details->get_video()->extra_data_base64));
+    default:
+      return StreamType::Create(Convert(media_type->scheme));
+  }
+  return nullptr;
+}
+
+std::unique_ptr<std::vector<std::unique_ptr<StreamType>>> Convert(
+    const Array<MediaTypePtr>& media_types) {
+  if (!media_types) {
+    return nullptr;
+  }
+
+  std::unique_ptr<std::vector<std::unique_ptr<StreamType>>> result =
+      std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>(
+          new std::vector<std::unique_ptr<StreamType>>(media_types.size()));
+  for (size_t i = 0; i < media_types.size(); i++) {
+    (*result)[i] = Convert(media_types[i]);
+  }
+  return result;
+}
+
+std::unique_ptr<StreamTypeSet> Convert(const MediaTypeSetPtr& media_type_set) {
+  if (!media_type_set) {
+    return nullptr;
+  }
+
+  switch (media_type_set->scheme) {
+    case MediaTypeScheme::MULTIPLEXED:
+      return MultiplexedStreamTypeSet::Create(
+          Convert(media_type_set->details->get_multiplexed()->
+              multiplex_type_set),
+          Convert(media_type_set->details->get_multiplexed()->
+              substream_type_sets));
+    case MediaTypeScheme::LPCM:
+      return LpcmStreamTypeSet::Create(
+          Convert(media_type_set->details->get_lpcm()->sample_format),
+          Range<uint32_t>(
+              media_type_set->details->get_lpcm()->min_channels,
+              media_type_set->details->get_lpcm()->max_channels),
+          Range<uint32_t>(
+              media_type_set->details->get_lpcm()->min_frames_per_second,
+              media_type_set->details->get_lpcm()->max_frames_per_second));
+    case MediaTypeScheme::COMPRESSED_AUDIO:
+      return CompressedAudioStreamTypeSet::Create(
+          Convert(media_type_set->details->get_compressed_audio()->encoding),
+          Convert(media_type_set->details->get_compressed_audio()->
+              sample_format),
+          Range<uint32_t>(
+              media_type_set->details->get_lpcm()->min_channels,
+              media_type_set->details->get_lpcm()->max_channels),
+          Range<uint32_t>(
+              media_type_set->details->get_lpcm()->min_frames_per_second,
+              media_type_set->details->get_lpcm()->max_frames_per_second));
+    case MediaTypeScheme::VIDEO:
+      return VideoStreamTypeSet::Create(
+          Convert(media_type_set->details->get_video()->encoding),
+          Range<uint32_t>(
+              media_type_set->details->get_video()->min_width,
+              media_type_set->details->get_video()->max_width),
+          Range<uint32_t>(
+              media_type_set->details->get_video()->min_height,
+              media_type_set->details->get_video()->max_height));
+    default:
+      return StreamTypeSet::Create(Convert(media_type_set->scheme));
+  }
+
+  return nullptr;
+}
+
+std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>> Convert(
+    const Array<MediaTypeSetPtr>& media_type_sets) {
+  if (!media_type_sets) {
+    return nullptr;
+  }
+
+  std::vector<std::unique_ptr<StreamTypeSet>>* result =
+      new std::vector<std::unique_ptr<StreamTypeSet>>(media_type_sets.size());
+  for (size_t i = 0; i < media_type_sets.size(); i++) {
+    (*result)[i] = Convert(media_type_sets[i]);
+  }
+  return std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>(result);
+}
+
+std::unique_ptr<Bytes> Convert(const String& base64) {
+  if (base64.is_null()) {
+    return nullptr;
+  }
+
+  size_t base64_size = base64.size();
+  size_t byte_count = modp_b64_decode_len(base64_size);
+  std::unique_ptr<Bytes> bytes = Bytes::Create(byte_count);
+
+  byte_count = modp_b64_decode(
+      reinterpret_cast<char*>(bytes->data()),
+      base64.data(),
+      base64_size);
+
+  if (byte_count == MODP_B64_ERROR) {
+    return nullptr;
+  }
+
+  return bytes;
+}
+
+std::unique_ptr<Metadata> Convert(const MediaMetadataPtr& media_metadata) {
+  if (!media_metadata) {
+    return nullptr;
+  }
+
+  return Metadata::Create(
+      media_metadata->duration,
+      media_metadata->title,
+      media_metadata->artist,
+      media_metadata->album,
+      media_metadata->publisher,
+      media_metadata->genre,
+      media_metadata->composer);
+}
+
+// Creates a MediaTypeScheme from a StreamType::Scheme.
+MediaTypeScheme Convert(StreamType::Scheme scheme) {
+  switch (scheme) {
+    case StreamType::Scheme::kUnknown:
+      return MediaTypeScheme::UNKNOWN;
+    case StreamType::Scheme::kNone:
+      return MediaTypeScheme::NONE;
+    case StreamType::Scheme::kAnyElementary:
+      return MediaTypeScheme::ANY_ELEMENTARY;
+    case StreamType::Scheme::kAnyAudio:
+      return MediaTypeScheme::ANY_AUDIO;
+    case StreamType::Scheme::kAnyVideo:
+      return MediaTypeScheme::ANY_VIDEO;
+    case StreamType::Scheme::kAnySubpicture:
+      return MediaTypeScheme::ANY_SUBPICTURE;
+    case StreamType::Scheme::kAnyText:
+      return MediaTypeScheme::ANY_TEXT;
+    case StreamType::Scheme::kAnyMultiplexed:
+      return MediaTypeScheme::ANY_MULTIPLEXED;
+    case StreamType::Scheme::kAny:
+      return MediaTypeScheme::ANY;
+    case StreamType::Scheme::kMultiplexed:
+      return MediaTypeScheme::MULTIPLEXED;
+    case StreamType::Scheme::kLpcm:
+      return MediaTypeScheme::LPCM;
+    case StreamType::Scheme::kCompressedAudio:
+      return MediaTypeScheme::COMPRESSED_AUDIO;
+    case StreamType::Scheme::kVideo:
+      return MediaTypeScheme::VIDEO;
+  }
+
+  return MediaTypeScheme::UNKNOWN;
+}
+
+// Creates an LpcmSampleFormat from an LpcmStreamType::SampleFormat.
+LpcmSampleFormat Convert(LpcmStreamType::SampleFormat sample_format) {
+  switch (sample_format) {
+    case LpcmStreamType::SampleFormat::kUnknown:
+      return LpcmSampleFormat::UNKNOWN;
+    case LpcmStreamType::SampleFormat::kAny:
+      return LpcmSampleFormat::ANY;
+    case LpcmStreamType::SampleFormat::kUnsigned8:
+      return LpcmSampleFormat::UNSIGNED_8;
+    case LpcmStreamType::SampleFormat::kSigned16:
+      return LpcmSampleFormat::SIGNED_16;
+    case LpcmStreamType::SampleFormat::kSigned24In32:
+      return LpcmSampleFormat::SIGNED_24_IN_32;
+    case LpcmStreamType::SampleFormat::kFloat:
+      return LpcmSampleFormat::FLOAT;
+  }
+
+  return LpcmSampleFormat::UNKNOWN;
+}
+
+// Creates an AudioEncoding from a CompressedAudioStreamType::VideoEncoding.
+AudioEncoding Convert(CompressedAudioStreamType::AudioEncoding audio_encoding) {
+  switch (audio_encoding) {
+    case CompressedAudioStreamType::AudioEncoding::kUnknown:
+      return AudioEncoding::UNKNOWN;
+    case CompressedAudioStreamType::AudioEncoding::kAny:
+      return AudioEncoding::ANY;
+    case CompressedAudioStreamType::AudioEncoding::kVorbis:
+      return AudioEncoding::VORBIS;
+  }
+
+  return AudioEncoding::UNKNOWN;
+}
+
+// Creates a VideoEncoding from a VideoStreamType::VideoEncoding.
+VideoEncoding Convert(VideoStreamType::VideoEncoding video_encoding) {
+  switch (video_encoding) {
+    case VideoStreamType::VideoEncoding::kUnknown:
+      return VideoEncoding::UNKNOWN;
+    case VideoStreamType::VideoEncoding::kAny:
+      return VideoEncoding::ANY;
+    case VideoStreamType::VideoEncoding::kTheora:
+      return VideoEncoding::THEORA;
+    case VideoStreamType::VideoEncoding::kVp8:
+      return VideoEncoding::VP8;
+  }
+
+  return VideoEncoding::UNKNOWN;
+}
+
+// Creates a VideoProfile from a VideoStreamType::VideoProfile.
+VideoProfile Convert(VideoStreamType::VideoProfile video_profile) {
+  switch (video_profile) {
+    case VideoStreamType::VideoProfile::kUnknown:
+      return VideoProfile::UNKNOWN;
+    case VideoStreamType::VideoProfile::kNotApplicable:
+      return VideoProfile::NOT_APPLICABLE;
+    case VideoStreamType::VideoProfile::kH264Baseline:
+      return VideoProfile::H264_BASELINE;
+    case VideoStreamType::VideoProfile::kH264Main:
+      return VideoProfile::H264_MAIN;
+    case VideoStreamType::VideoProfile::kH264Extended:
+      return VideoProfile::H264_EXTENDED;
+    case VideoStreamType::VideoProfile::kH264High:
+      return VideoProfile::H264_HIGH;
+    case VideoStreamType::VideoProfile::kH264High10:
+      return VideoProfile::H264_HIGH10;
+    case VideoStreamType::VideoProfile::kH264High422:
+      return VideoProfile::H264_HIGH422;
+    case VideoStreamType::VideoProfile::kH264High444Predictive:
+      return VideoProfile::H264_HIGH444_PREDICTIVE;
+    case VideoStreamType::VideoProfile::kH264ScalableBaseline:
+      return VideoProfile::H264_SCALABLE_BASELINE;
+    case VideoStreamType::VideoProfile::kH264ScalableHigh:
+      return VideoProfile::H264_SCALABLE_HIGH;
+    case VideoStreamType::VideoProfile::kH264StereoHigh:
+      return VideoProfile::H264_STEREO_HIGH;
+    case VideoStreamType::VideoProfile::kH264MultiviewHigh:
+      return VideoProfile::H264_MULTIVIEW_HIGH;
+  }
+
+  return VideoProfile::UNKNOWN;
+}
+
+// Creates a PixelFormat from a VideoStreamType::PixelFormat.
+PixelFormat Convert(VideoStreamType::PixelFormat pixel_format) {
+  switch (pixel_format) {
+    case VideoStreamType::PixelFormat::kUnknown:
+      return PixelFormat::UNKNOWN;
+    case VideoStreamType::PixelFormat::kI420:
+      return PixelFormat::I420;
+    case VideoStreamType::PixelFormat::kYv12:
+      return PixelFormat::YV12;
+    case VideoStreamType::PixelFormat::kYv16:
+      return PixelFormat::YV16;
+    case VideoStreamType::PixelFormat::kYv12A:
+      return PixelFormat::YV12A;
+    case VideoStreamType::PixelFormat::kYv24:
+      return PixelFormat::YV24;
+    case VideoStreamType::PixelFormat::kNv12:
+      return PixelFormat::NV12;
+    case VideoStreamType::PixelFormat::kNv21:
+      return PixelFormat::NV21;
+    case VideoStreamType::PixelFormat::kUyvy:
+      return PixelFormat::UYVY;
+    case VideoStreamType::PixelFormat::kYuy2:
+      return PixelFormat::YUY2;
+    case VideoStreamType::PixelFormat::kArgb:
+      return PixelFormat::ARGB;
+    case VideoStreamType::PixelFormat::kXrgb:
+      return PixelFormat::XRGB;
+    case VideoStreamType::PixelFormat::kRgb24:
+      return PixelFormat::RGB24;
+    case VideoStreamType::PixelFormat::kRgb32:
+      return PixelFormat::RGB32;
+    case VideoStreamType::PixelFormat::kMjpeg:
+      return PixelFormat::MJPEG;
+    case VideoStreamType::PixelFormat::kMt21:
+      return PixelFormat::MT21;
+  }
+
+  return PixelFormat::UNKNOWN;
+}
+
+// Creates a ColorSpace from a VideoStreamType::ColorSpace.
+ColorSpace Convert(VideoStreamType::ColorSpace color_space) {
+  switch (color_space) {
+    case VideoStreamType::ColorSpace::kUnknown:
+      return ColorSpace::UNKNOWN;
+    case VideoStreamType::ColorSpace::kNotApplicable:
+      return ColorSpace::NOT_APPLICABLE;
+    case VideoStreamType::ColorSpace::kJpeg:
+      return ColorSpace::JPEG;
+    case VideoStreamType::ColorSpace::kHdRec709:
+      return ColorSpace::HD_REC709;
+    case VideoStreamType::ColorSpace::kSdRec601:
+      return ColorSpace::SD_REC601;
+  }
+
+  return ColorSpace::UNKNOWN;
+}
+
+// Creates a MediaType from a StreamType.
+MediaTypePtr Convert(const std::unique_ptr<StreamType>& stream_type) {
+  if (stream_type == nullptr) {
+    return nullptr;
+  }
+
+  switch (stream_type->scheme()) {
+    case StreamType::Scheme::kMultiplexed: {
+      MultiplexedMediaTypeDetailsPtr multiplexed_details =
+          MultiplexedMediaTypeDetails::New();
+      multiplexed_details->multiplex_type =
+          Convert(stream_type->multiplexed()->multiplex_type());
+      multiplexed_details->substream_types =
+          Convert(stream_type->multiplexed()->substream_types());
+      MediaTypeDetailsPtr details = MediaTypeDetails::New();
+      details->set_multiplexed(multiplexed_details.Pass());
+      MediaTypePtr media_type = MediaType::New();
+      media_type->scheme = MediaTypeScheme::MULTIPLEXED;
+      media_type->details = details.Pass();
+      return media_type;
+    }
+    case StreamType::Scheme::kLpcm: {
+      LpcmMediaTypeDetailsPtr lpcm_details =
+          LpcmMediaTypeDetails::New();
+      lpcm_details->sample_format =
+          Convert(stream_type->lpcm()->sample_format());
+      lpcm_details->channels = stream_type->lpcm()->channels();
+      lpcm_details->frames_per_second =
+          stream_type->lpcm()->frames_per_second();
+      MediaTypeDetailsPtr details = MediaTypeDetails::New();
+      details->set_lpcm(lpcm_details.Pass());
+      MediaTypePtr media_type = MediaType::New();
+      media_type->scheme = MediaTypeScheme::LPCM;
+      media_type->details = details.Pass();
+      return media_type;
+    }
+    case StreamType::Scheme::kCompressedAudio: {
+      CompressedAudioMediaTypeDetailsPtr compressed_audio_details =
+          CompressedAudioMediaTypeDetails::New();
+      compressed_audio_details->encoding =
+          Convert(stream_type->compressed_audio()->encoding());
+      compressed_audio_details->sample_format =
+          Convert(stream_type->compressed_audio()->sample_format());
+      compressed_audio_details->channels =
+          stream_type->compressed_audio()->channels();
+      compressed_audio_details->frames_per_second =
+          stream_type->compressed_audio()->frames_per_second();
+      compressed_audio_details->extra_data_base64 =
+          Convert(stream_type->compressed_audio()->encoding_details());
+      MediaTypeDetailsPtr details = MediaTypeDetails::New();
+      details->set_compressed_audio(compressed_audio_details.Pass());
+      MediaTypePtr media_type = MediaType::New();
+      media_type->scheme = MediaTypeScheme::COMPRESSED_AUDIO;
+      media_type->details = details.Pass();
+      return media_type;
+    }
+    case StreamType::Scheme::kVideo: {
+      VideoMediaTypeDetailsPtr video_details =
+          VideoMediaTypeDetails::New();
+      video_details->encoding = Convert(stream_type->video()->encoding());
+      video_details->profile = Convert(stream_type->video()->profile());
+      video_details->pixel_format =
+          Convert(stream_type->video()->pixel_format());
+      video_details->color_space = Convert(stream_type->video()->color_space());
+      video_details->width = stream_type->video()->width();
+      video_details->height = stream_type->video()->height();
+      video_details->coded_width = stream_type->video()->coded_width();
+      video_details->coded_height = stream_type->video()->coded_height();
+      video_details->extra_data_base64 =
+          Convert(stream_type->video()->encoding_details());
+      MediaTypeDetailsPtr details = MediaTypeDetails::New();
+      details->set_video(video_details.Pass());
+      MediaTypePtr media_type = MediaType::New();
+      media_type->scheme = MediaTypeScheme::VIDEO;
+      media_type->details = details.Pass();
+      return media_type;
+    }
+    default: {
+      MediaTypePtr media_type = MediaType::New();
+      media_type->scheme = Convert(stream_type->scheme());
+      return media_type;
+    }
+  }
+}
+
+// Creates an array of MediaTypes from std::vector<std::unique_ptr<StreamType>>.
+Array<MediaTypePtr> Convert(
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>&
+        stream_types) {
+  if (stream_types == nullptr) {
+    return nullptr;
+  }
+
+  Array<MediaTypePtr> result = Array<MediaTypePtr>::New(stream_types->size());
+  for (const std::unique_ptr<StreamType>& stream_type : *stream_types) {
+    result.push_back(Convert(stream_type));
+  }
+  return result;
+}
+
+// Creates a MediaTypeSet from a StreamTypeSet.
+MediaTypeSetPtr Convert(const std::unique_ptr<StreamTypeSet>& stream_type_set) {
+  if (stream_type_set == nullptr) {
+    return nullptr;
+  }
+
+  switch (stream_type_set->scheme()) {
+    case StreamType::Scheme::kMultiplexed: {
+      MultiplexedMediaTypeSetDetailsPtr multiplexed_details =
+          MultiplexedMediaTypeSetDetails::New();
+      multiplexed_details->multiplex_type_set =
+          Convert(stream_type_set->multiplexed()->multiplex_type_set());
+      multiplexed_details->substream_type_sets =
+          Convert(stream_type_set->multiplexed()->substream_type_sets());
+      MediaTypeSetDetailsPtr details = MediaTypeSetDetails::New();
+      details->set_multiplexed(multiplexed_details.Pass());
+      MediaTypeSetPtr media_type_set = MediaTypeSet::New();
+      media_type_set->scheme = MediaTypeScheme::MULTIPLEXED;
+      media_type_set->details = details.Pass();
+      return media_type_set;
+    }
+    case StreamType::Scheme::kLpcm: {
+      LpcmMediaTypeSetDetailsPtr lpcm_details =
+          LpcmMediaTypeSetDetails::New();
+      lpcm_details->sample_format =
+          Convert(stream_type_set->lpcm()->sample_format());
+      lpcm_details->min_channels = stream_type_set->lpcm()->channels().min;
+      lpcm_details->max_channels = stream_type_set->lpcm()->channels().max;
+      lpcm_details->min_frames_per_second =
+          stream_type_set->lpcm()->frames_per_second().min;
+      lpcm_details->max_frames_per_second =
+          stream_type_set->lpcm()->frames_per_second().max;
+      MediaTypeSetDetailsPtr details = MediaTypeSetDetails::New();
+      details->set_lpcm(lpcm_details.Pass());
+      MediaTypeSetPtr media_type_set = MediaTypeSet::New();
+      media_type_set->scheme = MediaTypeScheme::LPCM;
+      media_type_set->details = details.Pass();
+      return media_type_set;
+    }
+    case StreamType::Scheme::kCompressedAudio: {
+      CompressedAudioMediaTypeSetDetailsPtr compressed_audio_details =
+          CompressedAudioMediaTypeSetDetails::New();
+      compressed_audio_details->encoding =
+          Convert(stream_type_set->compressed_audio()->encoding());
+      compressed_audio_details->sample_format =
+          Convert(stream_type_set->compressed_audio()->sample_format());
+      compressed_audio_details->min_channels =
+          stream_type_set->compressed_audio()->channels().min;
+      compressed_audio_details->max_channels =
+          stream_type_set->compressed_audio()->channels().max;
+      compressed_audio_details->min_frames_per_second =
+          stream_type_set->compressed_audio()->frames_per_second().min;
+      compressed_audio_details->max_frames_per_second =
+          stream_type_set->compressed_audio()->frames_per_second().max;
+      MediaTypeSetDetailsPtr details = MediaTypeSetDetails::New();
+      details->set_compressed_audio(compressed_audio_details.Pass());
+      MediaTypeSetPtr media_type_set = MediaTypeSet::New();
+      media_type_set->scheme = MediaTypeScheme::COMPRESSED_AUDIO;
+      media_type_set->details = details.Pass();
+      return media_type_set;
+    }
+    case StreamType::Scheme::kVideo: {
+      VideoMediaTypeSetDetailsPtr video_details =
+          VideoMediaTypeSetDetails::New();
+      video_details->encoding = Convert(stream_type_set->video()->encoding());
+      video_details->min_width = stream_type_set->video()->width().min;
+      video_details->max_width = stream_type_set->video()->width().max;
+      video_details->min_height = stream_type_set->video()->height().min;
+      video_details->max_height = stream_type_set->video()->height().max;
+      MediaTypeSetDetailsPtr details = MediaTypeSetDetails::New();
+      details->set_video(video_details.Pass());
+      MediaTypeSetPtr media_type_set = MediaTypeSet::New();
+      media_type_set->scheme = MediaTypeScheme::VIDEO;
+      media_type_set->details = details.Pass();
+      return media_type_set;
+    }
+    default: {
+      MediaTypeSetPtr media_type_set = MediaTypeSet::New();
+      media_type_set->scheme = Convert(stream_type_set->scheme());
+      return media_type_set;
+    }
+  }
+}
+
+// Creates an array of MediaTypeSets from
+// std::vector<std::unique_ptr<StreamTypeSet>>.
+Array<MediaTypeSetPtr> Convert(
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        stream_type_sets) {
+  if (stream_type_sets == nullptr) {
+    return nullptr;
+  }
+
+  Array<MediaTypeSetPtr> result =
+      Array<MediaTypeSetPtr>::New(stream_type_sets->size());
+  for (const std::unique_ptr<StreamTypeSet>& stream_type_set :
+      *stream_type_sets) {
+    result.push_back(Convert(stream_type_set));
+  }
+  return result;
+}
+
+// Creates a base64-encoded string from Bytes.
+String Convert(const std::unique_ptr<Bytes>& bytes) {
+  if (bytes == nullptr) {
+    return nullptr;
+  }
+
+  std::string temp;
+  temp.resize(modp_b64_encode_len(bytes->size()));  // Makes room for null byte.
+
+  // modp_b64_encode_len() returns at least 1, so temp[0] is safe to use.
+  size_t output_size = modp_b64_encode(
+      &(temp[0]),
+      reinterpret_cast<char*>(bytes->data()),
+      bytes->size());
+
+  temp.resize(output_size);  // Strips off null byte.
+
+  return String(temp);
+}
+
+MediaMetadataPtr Convert(const std::unique_ptr<Metadata>& metadata) {
+  if (metadata == nullptr) {
+    return nullptr;
+  }
+
+  MediaMetadataPtr result = MediaMetadata::New();
+  result->duration = metadata->duration_ns();
+  result->title =
+      metadata->title().empty() ? String() : String(metadata->title());
+  result->artist =
+      metadata->artist().empty() ? String() : String(metadata->artist());
+  result->album =
+      metadata->album().empty() ? String() : String(metadata->album());
+  result->publisher =
+      metadata->publisher().empty() ? String() : String(metadata->publisher());
+  result->genre =
+      metadata->genre().empty() ? String() : String(metadata->genre());
+  result->composer =
+      metadata->composer().empty() ? String() : String(metadata->composer());
+  return result;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework_mojo/mojo_type_conversions.h b/services/media/framework_mojo/mojo_type_conversions.h
new file mode 100644
index 0000000..bb9e18e
--- /dev/null
+++ b/services/media/framework_mojo/mojo_type_conversions.h
@@ -0,0 +1,105 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_TYPE_CONVERSIONS_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_TYPE_CONVERSIONS_H_
+
+#include "mojo/services/media/common/interfaces/media_metadata.mojom.h"
+#include "mojo/services/media/common/interfaces/media_types.mojom.h"
+#include "services/media/framework/metadata.h"
+#include "services/media/framework/stream_type.h"
+
+namespace mojo {
+namespace media {
+
+// Creates a StreamType::Scheme from a MediaTypeScheme.
+StreamType::Scheme Convert(MediaTypeScheme media_type_scheme);
+
+// Creates an LpcmStreamType::SampleFormat from an LpcmSampleFormat.
+LpcmStreamType::SampleFormat Convert(LpcmSampleFormat lpcm_sample_format);
+
+// Creates a CompressedAudioStreamType::VideoEncoding from an AudioEncoding.
+CompressedAudioStreamType::AudioEncoding Convert(AudioEncoding audio_encoding);
+
+// Creates a VideoStreamType::VideoEncoding from a VideoEncoding.
+VideoStreamType::VideoEncoding Convert(VideoEncoding video_encoding);
+
+// Creates a VideoStreamType::VideoProfile from a VideoProfile.
+VideoStreamType::VideoProfile Convert(VideoProfile video_profile);
+
+// Creates a VideoStreamType::PixelFormat from a PixelFormat.
+VideoStreamType::PixelFormat Convert(PixelFormat pixel_format);
+
+// Creates a VideoStreamType::ColorSpace from a ColorSpace.
+VideoStreamType::ColorSpace Convert(ColorSpace color_space);
+
+// Creates a StreamType from a MediaType.
+std::unique_ptr<StreamType> Convert(const MediaTypePtr& media_type);
+
+// Creates std::vector<std::unique_ptr<StreamType>> from an array of MediaTypes.
+std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>
+    Convert(const Array<MediaTypePtr>& media_types);
+
+// Creates a StreamTypeSet from a MediaTypeSet.
+std::unique_ptr<StreamTypeSet> Convert(const MediaTypeSetPtr& media_type_set);
+
+// Creates std::vector<std::unique_ptr<StreamTypeSet>> from an array of
+// MediaTypeSets.
+std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>> Convert(
+    const Array<MediaTypeSetPtr>& media_type_sets);
+
+// Creates Bytes from a base64-encoded string.
+std::unique_ptr<Bytes> Convert(const String& base64);
+
+// Creates Metadata from MediaMetadata.
+std::unique_ptr<Metadata> Convert(const MediaMetadataPtr& media_metadata);
+
+// Creates a MediaTypeScheme from a StreamType::Scheme.
+MediaTypeScheme Convert(StreamType::Scheme scheme);
+
+// Creates an LpcmSampleFormat from an LpcmStreamType::SampleFormat.
+LpcmSampleFormat Convert(LpcmStreamType::SampleFormat sample_format);
+
+// Creates an AudioEncoding from a CompressedAudioStreamType::VideoEncoding.
+AudioEncoding Convert(CompressedAudioStreamType::AudioEncoding audio_encoding);
+
+// Creates a VideoEncoding from a VideoStreamType::VideoEncoding.
+VideoEncoding Convert(VideoStreamType::VideoEncoding video_encoding);
+
+// Creates a VideoProfile from a VideoStreamType::VideoProfile.
+VideoProfile Convert(VideoStreamType::VideoProfile video_profile);
+
+// Creates a PixelFormat from a VideoStreamType::PixelFormat.
+PixelFormat Convert(VideoStreamType::PixelFormat pixel_format);
+
+// Creates a ColorSpace from a VideoStreamType::ColorSpace.
+ColorSpace Convert(VideoStreamType::ColorSpace color_space);
+
+// Creates a MediaType from a StreamType.
+MediaTypePtr Convert(const std::unique_ptr<StreamType>& stream_type);
+
+// Creates an array of MediaTypes from std::vector<std::unique_ptr<StreamType>>.
+Array<MediaTypePtr> Convert(
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamType>>>&
+      stream_types);
+
+// Creates a MediaTypeSet from a StreamTypeSet.
+MediaTypeSetPtr Convert(const std::unique_ptr<StreamTypeSet>& stream_type_set);
+
+// Creates an array of MediaTypeSets from
+// std::vector<std::unique_ptr<StreamTypeSet>>.
+Array<MediaTypeSetPtr> Convert(
+    const std::unique_ptr<std::vector<std::unique_ptr<StreamTypeSet>>>&
+        stream_type_sets);
+
+// Creates a base64-encoded string from Bytes.
+String Convert(const std::unique_ptr<Bytes>& bytes);
+
+// Creates MediaMetadata from Metadata.
+MediaMetadataPtr Convert(const std::unique_ptr<Metadata>& metadata);
+
+}  // namespace media
+}  // namespace mojo
+
+#endif // SERVICES_MEDIA_FRAMEWORK_MOJO_MOJO_TYPE_CONVERSIONS_H_
diff --git a/services/media/framework_mojo/push_producer_base.cc b/services/media/framework_mojo/push_producer_base.cc
new file mode 100644
index 0000000..3d729da
--- /dev/null
+++ b/services/media/framework_mojo/push_producer_base.cc
@@ -0,0 +1,152 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "services/media/framework_mojo/push_producer_base.h"
+
+namespace mojo {
+namespace media {
+
+PushProducerBase::PushProducerBase() :
+    state_(MediaState::UNPREPARED),
+    end_of_stream_(false),
+    max_pushes_outstanding_(0),
+    current_pushes_outstanding_(0) {
+  task_runner_ = base::MessageLoop::current()->task_runner();
+  DCHECK(task_runner_);
+}
+
+PushProducerBase::~PushProducerBase() {
+  base::AutoLock lock(lock_);
+}
+
+void PushProducerBase::SetStatusCallback(
+    const StatusCallback& callback) {
+  status_callback_ = callback;
+}
+
+PayloadAllocator* PushProducerBase::allocator() {
+  return &mojo_allocator_;
+}
+
+void PushProducerBase::SetDemandCallback(
+    const DemandCallback& demand_callback) {
+  demand_callback_ = demand_callback;
+}
+
+void PushProducerBase::Prime() {
+  {
+    base::AutoLock lock(lock_);
+    max_pushes_outstanding_ = 10; // TODO(dalesat): Made up!
+  }
+
+  DCHECK(demand_callback_);
+  demand_callback_(Demand::kPositive);
+  SetState(MediaState::PAUSED);
+}
+
+Demand PushProducerBase::SupplyPacket(PacketPtr packet) {
+  DCHECK(packet);
+
+  // If we're no longer connected, throw the packet away.
+  if (!IsConnected()) {
+    SetState(MediaState::UNPREPARED);
+    // TODO(dalesat): More shutdown?
+    return Demand::kNegative;
+  }
+
+  Demand demand;
+
+  {
+    base::AutoLock lock(lock_);
+    DCHECK(current_pushes_outstanding_ < max_pushes_outstanding_);
+    DCHECK(!end_of_stream_) << "packet pushed after end-of-stream";
+
+    ++current_pushes_outstanding_;
+
+    if (packet->end_of_stream()) {
+      end_of_stream_ = true;
+      demand = Demand::kNegative;
+      max_pushes_outstanding_ = 0;
+    } else {
+      demand = current_pushes_outstanding_ < max_pushes_outstanding_ ?
+          Demand::kPositive :
+          Demand::kNegative;
+    }
+  }
+
+  MediaPacketPtr media_packet = CreateMediaPacket(packet);
+  task_runner_->PostTask(FROM_HERE, base::Bind(
+      &PushProducerBase::PushPacket,
+      base::Unretained(this),
+      packet.release(),
+      base::Passed(media_packet.Pass())));
+
+  return demand;
+}
+
+void PushProducerBase::PushPacket(
+    Packet* packet_raw_ptr,
+    MediaPacketPtr media_packet) {
+  PushPacketInternal(packet_raw_ptr, media_packet.Pass());
+}
+
+void PushProducerBase::PushCompleted(const PacketPtr& packet) {
+  DCHECK(packet);
+
+  Demand demand;
+
+  {
+    base::AutoLock lock(lock_);
+    DCHECK(current_pushes_outstanding_);
+    demand = --current_pushes_outstanding_ < max_pushes_outstanding_ ?
+        Demand::kPositive :
+        Demand::kNegative;
+  }
+
+  DCHECK(demand_callback_);
+  demand_callback_(demand);
+
+  if (end_of_stream_ && packet->end_of_stream()) {
+    SetState(MediaState::ENDED);
+  }
+}
+
+void PushProducerBase::OnConnectionLost() {
+  DCHECK(demand_callback_);
+  demand_callback_(Demand::kNegative);
+  SetState(MediaState::UNPREPARED);
+}
+
+void PushProducerBase::SetState(MediaState state) {
+  if (state_ != state) {
+    state_ = state;
+    if (status_callback_) {
+      status_callback_(state_);
+    }
+  }
+}
+
+MediaPacketPtr PushProducerBase::CreateMediaPacket(
+    const PacketPtr& packet) {
+  DCHECK(packet);
+
+  MediaPacketRegionPtr region = MediaPacketRegion::New();
+  region->offset = mojo_allocator_.OffsetFromPtr(packet->payload());
+  region->length = packet->size();
+
+  MediaPacketPtr media_packet = MediaPacket::New();
+  media_packet->pts = packet->presentation_time();
+  media_packet->duration = packet->duration();
+  media_packet->end_of_stream = packet->end_of_stream();
+  media_packet->payload = region.Pass();
+
+  return media_packet.Pass();
+}
+
+} // namespace media
+} // namespace mojo
diff --git a/services/media/framework_mojo/push_producer_base.h b/services/media/framework_mojo/push_producer_base.h
new file mode 100644
index 0000000..f4b6235
--- /dev/null
+++ b/services/media/framework_mojo/push_producer_base.h
@@ -0,0 +1,91 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_MOJO_PUSH_PRODUCER_BASE_H_
+#define SERVICES_MEDIA_FRAMEWORK_MOJO_PUSH_PRODUCER_BASE_H_
+
+#include "base/single_thread_task_runner.h"
+#include "base/synchronization/lock.h"
+#include "base/task_runner.h"
+#include "mojo/services/media/common/interfaces/media_pipe.mojom.h"
+#include "mojo/services/media/common/interfaces/media_state.mojom.h"
+#include "services/media/framework/models/active_sink.h"
+#include "services/media/framework_mojo/mojo_allocator.h"
+
+namespace mojo {
+namespace media {
+
+// Abstract base class for push-mode producers.
+// TODO(dalesat): Merge producers once transport definitions are merged.
+class PushProducerBase : public ActiveSink {
+ public:
+  using StatusCallback = std::function<void(MediaState)>;
+
+  PushProducerBase();
+
+  ~PushProducerBase() override;
+
+  // Sets a callback for reporting status updates.
+  void SetStatusCallback(const StatusCallback& callback);
+
+  // ActiveSink implementation.
+  PayloadAllocator* allocator() override;
+
+  void SetDemandCallback(const DemandCallback& demand_callback) override;
+
+  void Prime() override;
+
+  Demand SupplyPacket(PacketPtr packet) override;
+
+ protected:
+  // Overridden in subclasses to verify that the producer is still connected.
+  virtual bool IsConnected() = 0;
+
+  // Overridden in subclasses to push a packet to the consumer/pipe.
+  // packet_raw_ptr should be wrapped in a PacketPtr for proper lifetime
+  // management.
+  // TODO(dalesat): Don't use a raw pointer, if possible.
+  virtual void PushPacketInternal(
+      Packet* packet_raw_ptr,
+      MediaPacketPtr media_packet) = 0;
+
+  // Called by subclasses when a push completes.
+  void PushCompleted(const PacketPtr& packet);
+
+  // Called by subclasses when the connection to the consumer is lost.
+  void OnConnectionLost();
+
+  // Allocates from the shared buffer.
+  MojoAllocator mojo_allocator_;
+
+ private:
+  // Calls PushPacketInternal
+  void PushPacket(
+      Packet* packet_raw_ptr,
+      MediaPacketPtr media_packet);
+
+  // Sets the current state and calls the registered callback, if there is one.
+  void SetState(MediaState state);
+
+  // Creates a MediaPacket from a Packet.
+  MediaPacketPtr CreateMediaPacket(const PacketPtr& packet);
+
+  StatusCallback status_callback_;
+
+  mutable base::Lock lock_;
+  // THE FIELDS BELOW SHOULD ONLY BE ACCESSED WITH lock_ TAKEN.
+  MediaState state_;
+  bool end_of_stream_;
+  DemandCallback demand_callback_;
+  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+  // TODO(dalesat): Base this logic on presentation time or duration.
+  uint32_t max_pushes_outstanding_;
+  uint32_t current_pushes_outstanding_;
+  // THE FIELDS ABOVE SHOULD ONLY BE ACCESSED WITH lock_ TAKEN.
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_MOJO_PUSH_PRODUCER_BASE_H_