Motown: Add ActiveMultistreamSource model in preparation for the ffmpeg demux with async I/O
The current I/O model used for readers is synchronous, because ffmpeg's I/O model is synchronous.
This is a problem for mojo reader implementations. The solution is to have the ffmpeg demux
running on its own thread, which means that it will produce packets asynchronously. The current
framework model used for the demux is MultistreamSource, which can't handle asynchronous packet
production. ActiveMultistreamSource is an aysnc version of MultistreamSource.

There are a few minor fixes in this CL as well.

R=kulakowski@chromium.org

Review URL: https://codereview.chromium.org/1833323002 .
diff --git a/examples/media_test/media_test_app.cc b/examples/media_test/media_test_app.cc
index 5db2771..c3d91ac 100644
--- a/examples/media_test/media_test_app.cc
+++ b/examples/media_test/media_test_app.cc
@@ -62,8 +62,6 @@
       std::cout << "    genre      <none>" << std::endl;
       std::cout << "    composer   <none>" << std::endl << std::endl;
       std::cout << std::endl << std::endl << kUp << std::flush;
-    } else {
-      std::cout << std::endl;
     }
 
     CreateNewMediaTest();
@@ -136,7 +134,7 @@
       // Do nothing.
     } else if (metadata) {
       metadata_shown_ = true;
-      std::cout << "    duration   " << std::setprecision(1)
+      std::cout << "    duration   " << std::fixed << std::setprecision(1)
                 << double(metadata->duration) / ns_per_second << " seconds"
                 << clear_line() << std::endl;
       std::cout << "    title      "
@@ -158,7 +156,7 @@
                 << (metadata->composer ? metadata->composer : "<none>")
                 << clear_line() << std::endl
                 << std::endl;
-    } else {
+    } else if (paint_) {
       std::cout << "    duration   <none>" << kClearLine << std::endl;
       std::cout << "    title      <none>" << kClearLine << std::endl;
       std::cout << "    artist     <none>" << kClearLine << std::endl;
diff --git a/services/media/framework/BUILD.gn b/services/media/framework/BUILD.gn
index 7747bc0..81c4882 100644
--- a/services/media/framework/BUILD.gn
+++ b/services/media/framework/BUILD.gn
@@ -20,6 +20,7 @@
     "metadata.cc",
     "metadata.h",
     "models/active_multistream_sink.h",
+    "models/active_multistream_source.h",
     "models/active_sink.h",
     "models/active_source.h",
     "models/demand.h",
@@ -46,6 +47,8 @@
     "safe_clone.h",
     "stages/active_multistream_sink_stage.cc",
     "stages/active_multistream_sink_stage.h",
+    "stages/active_multistream_source_stage.cc",
+    "stages/active_multistream_source_stage.h",
     "stages/active_sink_stage.cc",
     "stages/active_sink_stage.h",
     "stages/active_source_stage.cc",
@@ -60,6 +63,8 @@
     "stages/stage.h",
     "stages/transform_stage.cc",
     "stages/transform_stage.h",
+    "stages/util.cc",
+    "stages/util.h",
     "stream_type.cc",
     "stream_type.h",
   ]
diff --git a/services/media/framework/graph.h b/services/media/framework/graph.h
index f0b6482..0d89ebc 100644
--- a/services/media/framework/graph.h
+++ b/services/media/framework/graph.h
@@ -9,6 +9,7 @@
 
 #include "services/media/framework/engine.h"
 #include "services/media/framework/refs.h"
+#include "services/media/framework/stages/active_multistream_source_stage.h"
 #include "services/media/framework/stages/active_sink_stage.h"
 #include "services/media/framework/stages/active_source_stage.h"
 #include "services/media/framework/stages/multistream_source_stage.h"
@@ -40,6 +41,7 @@
 DEFINE_STAGE_CREATOR(Transform, TransformStage);
 DEFINE_STAGE_CREATOR(ActiveSource, ActiveSourceStage);
 DEFINE_STAGE_CREATOR(ActiveSink, ActiveSinkStage);
+DEFINE_STAGE_CREATOR(ActiveMultistreamSource, ActiveMultistreamSourceStage);
 
 #undef DEFINE_STAGE_CREATOR
 
diff --git a/services/media/framework/models/active_multistream_source.h b/services/media/framework/models/active_multistream_source.h
new file mode 100644
index 0000000..22b1b11
--- /dev/null
+++ b/services/media/framework/models/active_multistream_source.h
@@ -0,0 +1,38 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_MEDIA_MODELS_ACTIVE_MULTISTREAM_SOURCE_H_
+#define MOJO_MEDIA_MODELS_ACTIVE_MULTISTREAM_SOURCE_H_
+
+#include "services/media/framework/models/part.h"
+#include "services/media/framework/packet.h"
+
+namespace mojo {
+namespace media {
+
+// Asynchronous source of packets for multiple streams.
+class ActiveMultistreamSource : public Part {
+ public:
+  using SupplyCallback =
+      std::function<void(size_t output_index, PacketPtr packet)>;
+
+  ~ActiveMultistreamSource() override {}
+
+  // TODO(dalesat): Support dynamic output creation.
+
+  // Returns the number of streams the source produces.
+  virtual size_t stream_count() const = 0;
+
+  // Sets the callback that supplies a packet asynchronously.
+  virtual void SetSupplyCallback(const SupplyCallback& supply_callback) = 0;
+
+  // Requests a packet from the source to be supplied asynchronously via
+  // the supply callback.
+  virtual void RequestPacket() = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // MOJO_MEDIA_MODELS_ACTIVE_MULTISTREAM_SOURCE_H_
diff --git a/services/media/framework/models/multistream_source.h b/services/media/framework/models/multistream_source.h
index 0782353..9c70808 100644
--- a/services/media/framework/models/multistream_source.h
+++ b/services/media/framework/models/multistream_source.h
@@ -11,8 +11,7 @@
 namespace mojo {
 namespace media {
 
-// Synchronous source of packets for multiple streams. This is currently used
-// by Demux, though it would be better if Demux were asynchronous.
+// Synchronous source of packets for multiple streams.
 class MultistreamSource : public Part {
  public:
   ~MultistreamSource() override {}
diff --git a/services/media/framework/stages/active_multistream_source_stage.cc b/services/media/framework/stages/active_multistream_source_stage.cc
new file mode 100644
index 0000000..a6859bb
--- /dev/null
+++ b/services/media/framework/stages/active_multistream_source_stage.cc
@@ -0,0 +1,117 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/active_multistream_source_stage.h"
+#include "services/media/framework/stages/util.h"
+
+namespace mojo {
+namespace media {
+
+ActiveMultistreamSourceStage::ActiveMultistreamSourceStage(
+    std::shared_ptr<ActiveMultistreamSource> source)
+    : source_(source) {
+  DCHECK(source);
+  outputs_.resize(source->stream_count());
+
+  supply_function_ = [this](size_t output_index, PacketPtr packet) {
+    DCHECK(!cached_packet_) << "source supplied unrequested packet";
+    DCHECK(output_index < outputs_.size());
+    DCHECK(packet);
+
+    cached_packet_output_index_ = output_index;
+    cached_packet_ = std::move(packet);
+
+    if (cached_packet_->end_of_stream()) {
+      ended_streams_++;
+    }
+
+    Output& output = outputs_[cached_packet_output_index_];
+    if (output.demand() != Demand::kNegative) {
+      RequestUpdate();
+    }
+  };
+
+  source_->SetSupplyCallback(supply_function_);
+}
+
+ActiveMultistreamSourceStage::~ActiveMultistreamSourceStage() {}
+
+size_t ActiveMultistreamSourceStage::input_count() const {
+  return 0;
+};
+
+Input& ActiveMultistreamSourceStage::input(size_t index) {
+  CHECK(false) << "input requested from source";
+  abort();
+}
+
+size_t ActiveMultistreamSourceStage::output_count() const {
+  return outputs_.size();
+}
+
+Output& ActiveMultistreamSourceStage::output(size_t index) {
+  DCHECK(index < outputs_.size());
+  return outputs_[index];
+}
+
+PayloadAllocator* ActiveMultistreamSourceStage::PrepareInput(size_t index) {
+  CHECK(false) << "PrepareInput called on source";
+  return nullptr;
+}
+
+void ActiveMultistreamSourceStage::PrepareOutput(
+    size_t index,
+    PayloadAllocator* allocator,
+    const UpstreamCallback& callback) {
+  DCHECK(index < outputs_.size());
+
+  if (allocator != nullptr) {
+    // Currently, we don't support a source that uses provided allocators. If
+    // we're provided an allocator, the output must have it so supplied packets
+    // can be copied.
+    outputs_[index].SetCopyAllocator(allocator);
+  }
+}
+
+void ActiveMultistreamSourceStage::UnprepareOutput(
+    size_t index,
+    const UpstreamCallback& callback) {
+  DCHECK(index < outputs_.size());
+  outputs_[index].SetCopyAllocator(nullptr);
+}
+
+void ActiveMultistreamSourceStage::Update(Engine* engine) {
+  DCHECK(engine);
+
+  if (cached_packet_) {
+    Output& output = outputs_[cached_packet_output_index_];
+    if (output.demand() != Demand::kNegative) {
+      // cached_packet_ is intended for an output which will accept packets.
+      output.SupplyPacket(std::move(cached_packet_), engine);
+    }
+  }
+
+  if (!cached_packet_ && HasPositiveDemand(outputs_)) {
+    // We have no cached packet and positive demand. Request a packet.
+    source_->RequestPacket();
+  }
+}
+
+void ActiveMultistreamSourceStage::FlushInput(
+    size_t index,
+    const DownstreamCallback& callback) {
+  CHECK(false) << "FlushInput called on source";
+}
+
+void ActiveMultistreamSourceStage::FlushOutput(size_t index) {
+  DCHECK(index < outputs_.size());
+  DCHECK(source_);
+  outputs_[index].Flush();
+  cached_packet_.reset(nullptr);
+  cached_packet_output_index_ = 0;
+  ended_streams_ = 0;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/active_multistream_source_stage.h b/services/media/framework/stages/active_multistream_source_stage.h
new file mode 100644
index 0000000..625c4e3
--- /dev/null
+++ b/services/media/framework/stages/active_multistream_source_stage.h
@@ -0,0 +1,58 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_MULTISTREAM_SOURCE_STAGE_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_MULTISTREAM_SOURCE_STAGE_H_
+
+#include <vector>
+
+#include "services/media/framework/models/active_multistream_source.h"
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+// A stage that hosts an ActiveMultistreamSource.
+class ActiveMultistreamSourceStage : public Stage {
+ public:
+  ActiveMultistreamSourceStage(std::shared_ptr<ActiveMultistreamSource> source);
+
+  ~ActiveMultistreamSourceStage() override;
+
+  // Stage implementation.
+  size_t input_count() const override;
+
+  Input& input(size_t index) override;
+
+  size_t output_count() const override;
+
+  Output& output(size_t index) override;
+
+  PayloadAllocator* PrepareInput(size_t index) override;
+
+  void PrepareOutput(size_t index,
+                     PayloadAllocator* allocator,
+                     const UpstreamCallback& callback) override;
+
+  void UnprepareOutput(size_t index, const UpstreamCallback& callback) override;
+
+  void Update(Engine* engine) override;
+
+  void FlushInput(size_t index, const DownstreamCallback& callback) override;
+
+  void FlushOutput(size_t index) override;
+
+ private:
+  std::vector<Output> outputs_;
+  std::shared_ptr<ActiveMultistreamSource> source_;
+  ActiveMultistreamSource::SupplyCallback supply_function_;
+  PacketPtr cached_packet_;
+  size_t cached_packet_output_index_;
+  size_t ended_streams_ = 0;
+};
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_ACTIVE_MULTISTREAM_SOURCE_STAGE_H_
diff --git a/services/media/framework/stages/active_sink_stage.cc b/services/media/framework/stages/active_sink_stage.cc
index 7c1d90a..1c4f924 100644
--- a/services/media/framework/stages/active_sink_stage.cc
+++ b/services/media/framework/stages/active_sink_stage.cc
@@ -38,7 +38,7 @@
 
 Output& ActiveSinkStage::output(size_t index) {
   CHECK(false) << "output requested from sink";
-  return *(static_cast<Output*>(nullptr));
+  abort();
 }
 
 PayloadAllocator* ActiveSinkStage::PrepareInput(size_t index) {
diff --git a/services/media/framework/stages/active_source_stage.cc b/services/media/framework/stages/active_source_stage.cc
index 1ae91b8..a38200e 100644
--- a/services/media/framework/stages/active_source_stage.cc
+++ b/services/media/framework/stages/active_source_stage.cc
@@ -30,7 +30,7 @@
 
 Input& ActiveSourceStage::input(size_t index) {
   CHECK(false) << "input requested from source";
-  return *(static_cast<Input*>(nullptr));
+  abort();
 }
 
 size_t ActiveSourceStage::output_count() const {
diff --git a/services/media/framework/stages/multistream_source_stage.cc b/services/media/framework/stages/multistream_source_stage.cc
index 65f687f..7f28ba0 100644
--- a/services/media/framework/stages/multistream_source_stage.cc
+++ b/services/media/framework/stages/multistream_source_stage.cc
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 #include "services/media/framework/stages/multistream_source_stage.h"
+#include "services/media/framework/stages/util.h"
 
 namespace mojo {
 namespace media {
@@ -22,7 +23,7 @@
 
 Input& MultistreamSourceStage::input(size_t index) {
   CHECK(false) << "input requested from source";
-  return *(static_cast<Input*>(nullptr));
+  abort();
 }
 
 size_t MultistreamSourceStage::output_count() const {
@@ -61,16 +62,8 @@
 void MultistreamSourceStage::Update(Engine* engine) {
   DCHECK(engine);
 
-  bool has_positive_demand = false;
-  for (Output& output : outputs_) {
-    if (output.demand() == Demand::kPositive) {
-      has_positive_demand = true;
-      break;
-    }
-  }
-
   while (true) {
-    if (cached_packet_ && has_positive_demand) {
+    if (cached_packet_ && HasPositiveDemand(outputs_)) {
       DCHECK(cached_packet_output_index_ < outputs_.size());
       Output& output = outputs_[cached_packet_output_index_];
 
diff --git a/services/media/framework/stages/util.cc b/services/media/framework/stages/util.cc
new file mode 100644
index 0000000..0b3393a
--- /dev/null
+++ b/services/media/framework/stages/util.cc
@@ -0,0 +1,21 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "services/media/framework/stages/util.h"
+
+namespace mojo {
+namespace media {
+
+bool HasPositiveDemand(const std::vector<Output>& outputs) {
+  for (const Output& output : outputs) {
+    if (output.demand() == Demand::kPositive) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+}  // namespace media
+}  // namespace mojo
diff --git a/services/media/framework/stages/util.h b/services/media/framework/stages/util.h
new file mode 100644
index 0000000..d04b2df
--- /dev/null
+++ b/services/media/framework/stages/util.h
@@ -0,0 +1,20 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SERVICES_MEDIA_FRAMEWORK_STAGES_UTIL_H_
+#define SERVICES_MEDIA_FRAMEWORK_STAGES_UTIL_H_
+
+#include <vector>
+
+#include "services/media/framework/stages/stage.h"
+
+namespace mojo {
+namespace media {
+
+bool HasPositiveDemand(const std::vector<Output>& outputs);
+
+}  // namespace media
+}  // namespace mojo
+
+#endif  // SERVICES_MEDIA_FRAMEWORK_STAGES_UTIL_H_
diff --git a/services/media/framework_ffmpeg/ffmpeg_decoder_base.cc b/services/media/framework_ffmpeg/ffmpeg_decoder_base.cc
index 2c37acd..ce03f96 100644
--- a/services/media/framework_ffmpeg/ffmpeg_decoder_base.cc
+++ b/services/media/framework_ffmpeg/ffmpeg_decoder_base.cc
@@ -11,7 +11,7 @@
 
 FfmpegDecoderBase::FfmpegDecoderBase(AvCodecContextPtr av_codec_context)
     : av_codec_context_(std::move(av_codec_context)),
-      av_frame_ptr_(av_frame_alloc()) {
+      av_frame_ptr_(ffmpeg::AvFrame::Create()) {
   DCHECK(av_codec_context_);
 }
 
diff --git a/services/media/framework_mojo/mojo_producer.h b/services/media/framework_mojo/mojo_producer.h
index ba37aab..16865d1 100644
--- a/services/media/framework_mojo/mojo_producer.h
+++ b/services/media/framework_mojo/mojo_producer.h
@@ -7,7 +7,6 @@
 
 #include "base/single_thread_task_runner.h"
 #include "base/synchronization/lock.h"
-#include "base/task_runner.h"
 #include "mojo/common/binding_set.h"
 #include "mojo/services/media/common/interfaces/media_state.mojom.h"
 #include "mojo/services/media/common/interfaces/media_transport.mojom.h"