Add Python util to read from a data pipe into a bytearray

The implementation of this function is inspired by
//mojo/common/data_pipe_utils.cc. It will be used in an upcoming CL.

R=qsr@chromium.org

Review URL: https://codereview.chromium.org/952223002
diff --git a/mojo/python/BUILD.gn b/mojo/python/BUILD.gn
index 041dbc0..972e0c6 100644
--- a/mojo/python/BUILD.gn
+++ b/mojo/python/BUILD.gn
@@ -2,17 +2,40 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+import("//mojo/public/python/rules.gni")
 import("//third_party/cython/rules.gni")
 
 # GYP version: mojo/mojo.gyp:mojo_python
 group("python") {
   deps = [
     ":mojo_embedder",
+    ":packaged_utils",
+    ":utils",
     ":validation_util",
     "//mojo/public/python",
   ]
 }
 
+copy("utils") {
+  sources = [
+    "mojo_utils/__init__.py",
+    "mojo_utils/data_pipe_utils.py",
+  ]
+  outputs = [
+    "$root_out_dir/python/mojo_utils/{{source_file_part}}",
+  ]
+}
+
+python_package("packaged_utils") {
+  sources = [
+    "mojo_utils/__init__.py",
+    "mojo_utils/data_pipe_utils.py",
+  ]
+  datadeps = [
+    "//mojo/public/python:mojo_system",
+  ]
+}
+
 # GYP version: mojo/mojo.gyp:mojo_python_embedder
 python_binary_module("mojo_embedder") {
   cython_sources = [ "system/mojo_embedder.pyx" ]
diff --git a/mojo/python/mojo_utils/__init__.py b/mojo/python/mojo_utils/__init__.py
new file mode 100644
index 0000000..50b23df
--- /dev/null
+++ b/mojo/python/mojo_utils/__init__.py
@@ -0,0 +1,3 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
diff --git a/mojo/python/mojo_utils/data_pipe_utils.py b/mojo/python/mojo_utils/data_pipe_utils.py
new file mode 100644
index 0000000..2a46ea5
--- /dev/null
+++ b/mojo/python/mojo_utils/data_pipe_utils.py
@@ -0,0 +1,76 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import itertools
+import sys
+
+import mojo_system
+from mojo_bindings import promise
+
+class DataPipeCopyException(Exception):
+  def __init__(self, *args, **kwargs):
+    Exception.__init__(self, *args, **kwargs)
+    self.__traceback__ = sys.exc_info()[2]
+
+
+def CopyFromDataPipe(data_pipe, deadline):
+  """
+  Returns a Promise that operates as follows:
+  - If |data_pipe| is successfully read from, the promise resolves with the
+    bytes that were read.
+  - Otherwise, the promise rejects with an exception whose message contains the
+    status from the attempted read.
+  """
+  class DataPipeCopyHelper():
+    def __init__(self, data_pipe, deadline, resolve, reject):
+      self.data_pipe = data_pipe
+      self.original_deadline = deadline
+      self.start_time = mojo_system.GetTimeTicksNow()
+      self.resolve = resolve
+      self.reject = reject
+      self.buffer_size = 1024
+      self.data = bytearray(self.buffer_size)
+      self.index = 0
+
+    def _ComputeCurrentDeadline(self):
+      if self.original_deadline == mojo_system.DEADLINE_INDEFINITE:
+        return self.original_deadline
+      elapsed_time = mojo_system.GetTimeTicksNow() - self.start_time
+      return max(0, self.original_deadline - elapsed_time)
+
+    def CopyFromDataPipeAsync(self, result):
+      while result == mojo_system.RESULT_OK:
+        assert self.index <= len(self.data)
+        if self.index == len(self.data):
+          self.buffer_size *= 2
+          self.data.extend(itertools.repeat(0, self.buffer_size))
+
+        # Careful! Have to construct a memoryview object here as otherwise the
+        # slice operation will create a copy of |data| and hence not write into
+        # |data| as desired.
+        result, read_bytes = self.data_pipe.ReadData(
+            memoryview(self.data)[self.index:])
+        if read_bytes:
+          self.index += len(read_bytes)
+        del read_bytes
+
+      if result == mojo_system.RESULT_SHOULD_WAIT:
+        data_pipe.AsyncWait(mojo_system.HANDLE_SIGNAL_READABLE,
+                            self._ComputeCurrentDeadline(),
+                            self.CopyFromDataPipeAsync)
+        return
+
+      # Treat a failed precondition as EOF.
+      if result == mojo_system.RESULT_FAILED_PRECONDITION:
+        self.resolve(self.data[:self.index])
+        return
+
+      self.reject(DataPipeCopyException("Result: %d" % result))
+
+
+  def GenerationMethod(resolve, reject):
+    helper = DataPipeCopyHelper(data_pipe, deadline, resolve, reject)
+    helper.CopyFromDataPipeAsync(mojo_system.RESULT_OK)
+
+  return promise.Promise(GenerationMethod)
diff --git a/mojo/python/tests/data_pipe_utils_unittest.py b/mojo/python/tests/data_pipe_utils_unittest.py
new file mode 100644
index 0000000..b6bc9fa
--- /dev/null
+++ b/mojo/python/tests/data_pipe_utils_unittest.py
@@ -0,0 +1,96 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import random
+
+import mojo_unittest
+from mojo_bindings import promise
+
+# pylint: disable=F0401
+import mojo_system as system
+
+# pylint: disable=F0401
+from mojo_utils import data_pipe_utils
+
+
+def _GetRandomBuffer(size):
+  random.seed(size)
+  return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
+
+
+class DataPipeCopyTest(mojo_unittest.MojoTestCase):
+  def setUp(self):
+    super(DataPipeCopyTest, self).setUp()
+    self.handles = system.DataPipe()
+    self.error = None
+
+  def tearDown(self):
+    self.handles = None
+    super(DataPipeCopyTest, self).tearDown()
+
+  def _writeDataAndClose(self, handle, data):
+    status, num_bytes_written = handle.WriteData(data)
+    handle.Close()
+    self.assertEquals(system.RESULT_OK, status)
+    self.assertEquals(len(data), num_bytes_written)
+
+  def _copyDataFromPipe(self, handle, expected_data,
+                        deadline=system.DEADLINE_INDEFINITE):
+    self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe(
+         handle, deadline), expected_data).Catch(self._CatchError)
+
+  def _CatchError(self, error):
+    if self.loop:
+      self.loop.Quit()
+    self.error = error
+
+  @promise.async
+  def _VerifyDataCopied(self, data, expected_data):
+    self.assertEquals(expected_data, data)
+    self.loop.Quit()
+
+  def _runAndCheckError(self):
+    self.loop.Run()
+    if self.error:
+      # pylint: disable=E0702
+      raise self.error
+
+  def _testEagerWrite(self, data):
+    self._writeDataAndClose(self.handles.producer_handle, data)
+    self._copyDataFromPipe(self.handles.consumer_handle, data)
+    self._runAndCheckError()
+
+  def _testDelayedWrite(self, data):
+    self._copyDataFromPipe(self.handles.consumer_handle, data)
+    self._writeDataAndClose(self.handles.producer_handle, data)
+    self._runAndCheckError()
+
+  def testTimeout(self):
+    self._copyDataFromPipe(self.handles.consumer_handle, bytearray(),
+                           deadline=100)
+    with self.assertRaises(data_pipe_utils.DataPipeCopyException):
+      self._runAndCheckError()
+
+  def testCloseProducerWithoutWriting(self):
+    self._copyDataFromPipe(self.handles.consumer_handle, bytearray())
+    self.handles.producer_handle.Close()
+    self._runAndCheckError()
+
+  def testEagerWriteOfEmptyData(self):
+    self._testEagerWrite(bytearray())
+
+  def testDelayedWriteOfEmptyData(self):
+    self._testDelayedWrite(bytearray())
+
+  def testEagerWriteOfNonEmptyData(self):
+    self._testEagerWrite(_GetRandomBuffer(1024))
+
+  def testDelayedWriteOfNonEmptyData(self):
+    self._testDelayedWrite(_GetRandomBuffer(1024))
+
+  def testEagerWriteOfLargeBuffer(self):
+    self._testEagerWrite(_GetRandomBuffer(32 * 1024))
+
+  def testDelayedWriteOfLargeBuffer(self):
+    self._testDelayedWrite(_GetRandomBuffer(32 * 1024))