Add Python util to read from a data pipe into a bytearray
The implementation of this function is inspired by
//mojo/common/data_pipe_utils.cc. It will be used in an upcoming CL.
R=qsr@chromium.org
Review URL: https://codereview.chromium.org/952223002
diff --git a/mojo/python/BUILD.gn b/mojo/python/BUILD.gn
index 041dbc0..972e0c6 100644
--- a/mojo/python/BUILD.gn
+++ b/mojo/python/BUILD.gn
@@ -2,17 +2,40 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import("//mojo/public/python/rules.gni")
import("//third_party/cython/rules.gni")
# GYP version: mojo/mojo.gyp:mojo_python
group("python") {
deps = [
":mojo_embedder",
+ ":packaged_utils",
+ ":utils",
":validation_util",
"//mojo/public/python",
]
}
+copy("utils") {
+ sources = [
+ "mojo_utils/__init__.py",
+ "mojo_utils/data_pipe_utils.py",
+ ]
+ outputs = [
+ "$root_out_dir/python/mojo_utils/{{source_file_part}}",
+ ]
+}
+
+python_package("packaged_utils") {
+ sources = [
+ "mojo_utils/__init__.py",
+ "mojo_utils/data_pipe_utils.py",
+ ]
+ datadeps = [
+ "//mojo/public/python:mojo_system",
+ ]
+}
+
# GYP version: mojo/mojo.gyp:mojo_python_embedder
python_binary_module("mojo_embedder") {
cython_sources = [ "system/mojo_embedder.pyx" ]
diff --git a/mojo/python/mojo_utils/__init__.py b/mojo/python/mojo_utils/__init__.py
new file mode 100644
index 0000000..50b23df
--- /dev/null
+++ b/mojo/python/mojo_utils/__init__.py
@@ -0,0 +1,3 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
diff --git a/mojo/python/mojo_utils/data_pipe_utils.py b/mojo/python/mojo_utils/data_pipe_utils.py
new file mode 100644
index 0000000..2a46ea5
--- /dev/null
+++ b/mojo/python/mojo_utils/data_pipe_utils.py
@@ -0,0 +1,76 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import itertools
+import sys
+
+import mojo_system
+from mojo_bindings import promise
+
+class DataPipeCopyException(Exception):
+ def __init__(self, *args, **kwargs):
+ Exception.__init__(self, *args, **kwargs)
+ self.__traceback__ = sys.exc_info()[2]
+
+
+def CopyFromDataPipe(data_pipe, deadline):
+ """
+ Returns a Promise that operates as follows:
+ - If |data_pipe| is successfully read from, the promise resolves with the
+ bytes that were read.
+ - Otherwise, the promise rejects with an exception whose message contains the
+ status from the attempted read.
+ """
+ class DataPipeCopyHelper():
+ def __init__(self, data_pipe, deadline, resolve, reject):
+ self.data_pipe = data_pipe
+ self.original_deadline = deadline
+ self.start_time = mojo_system.GetTimeTicksNow()
+ self.resolve = resolve
+ self.reject = reject
+ self.buffer_size = 1024
+ self.data = bytearray(self.buffer_size)
+ self.index = 0
+
+ def _ComputeCurrentDeadline(self):
+ if self.original_deadline == mojo_system.DEADLINE_INDEFINITE:
+ return self.original_deadline
+ elapsed_time = mojo_system.GetTimeTicksNow() - self.start_time
+ return max(0, self.original_deadline - elapsed_time)
+
+ def CopyFromDataPipeAsync(self, result):
+ while result == mojo_system.RESULT_OK:
+ assert self.index <= len(self.data)
+ if self.index == len(self.data):
+ self.buffer_size *= 2
+ self.data.extend(itertools.repeat(0, self.buffer_size))
+
+ # Careful! Have to construct a memoryview object here as otherwise the
+ # slice operation will create a copy of |data| and hence not write into
+ # |data| as desired.
+ result, read_bytes = self.data_pipe.ReadData(
+ memoryview(self.data)[self.index:])
+ if read_bytes:
+ self.index += len(read_bytes)
+ del read_bytes
+
+ if result == mojo_system.RESULT_SHOULD_WAIT:
+ data_pipe.AsyncWait(mojo_system.HANDLE_SIGNAL_READABLE,
+ self._ComputeCurrentDeadline(),
+ self.CopyFromDataPipeAsync)
+ return
+
+ # Treat a failed precondition as EOF.
+ if result == mojo_system.RESULT_FAILED_PRECONDITION:
+ self.resolve(self.data[:self.index])
+ return
+
+ self.reject(DataPipeCopyException("Result: %d" % result))
+
+
+ def GenerationMethod(resolve, reject):
+ helper = DataPipeCopyHelper(data_pipe, deadline, resolve, reject)
+ helper.CopyFromDataPipeAsync(mojo_system.RESULT_OK)
+
+ return promise.Promise(GenerationMethod)
diff --git a/mojo/python/tests/data_pipe_utils_unittest.py b/mojo/python/tests/data_pipe_utils_unittest.py
new file mode 100644
index 0000000..b6bc9fa
--- /dev/null
+++ b/mojo/python/tests/data_pipe_utils_unittest.py
@@ -0,0 +1,96 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import random
+
+import mojo_unittest
+from mojo_bindings import promise
+
+# pylint: disable=F0401
+import mojo_system as system
+
+# pylint: disable=F0401
+from mojo_utils import data_pipe_utils
+
+
+def _GetRandomBuffer(size):
+ random.seed(size)
+ return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
+
+
+class DataPipeCopyTest(mojo_unittest.MojoTestCase):
+ def setUp(self):
+ super(DataPipeCopyTest, self).setUp()
+ self.handles = system.DataPipe()
+ self.error = None
+
+ def tearDown(self):
+ self.handles = None
+ super(DataPipeCopyTest, self).tearDown()
+
+ def _writeDataAndClose(self, handle, data):
+ status, num_bytes_written = handle.WriteData(data)
+ handle.Close()
+ self.assertEquals(system.RESULT_OK, status)
+ self.assertEquals(len(data), num_bytes_written)
+
+ def _copyDataFromPipe(self, handle, expected_data,
+ deadline=system.DEADLINE_INDEFINITE):
+ self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe(
+ handle, deadline), expected_data).Catch(self._CatchError)
+
+ def _CatchError(self, error):
+ if self.loop:
+ self.loop.Quit()
+ self.error = error
+
+ @promise.async
+ def _VerifyDataCopied(self, data, expected_data):
+ self.assertEquals(expected_data, data)
+ self.loop.Quit()
+
+ def _runAndCheckError(self):
+ self.loop.Run()
+ if self.error:
+ # pylint: disable=E0702
+ raise self.error
+
+ def _testEagerWrite(self, data):
+ self._writeDataAndClose(self.handles.producer_handle, data)
+ self._copyDataFromPipe(self.handles.consumer_handle, data)
+ self._runAndCheckError()
+
+ def _testDelayedWrite(self, data):
+ self._copyDataFromPipe(self.handles.consumer_handle, data)
+ self._writeDataAndClose(self.handles.producer_handle, data)
+ self._runAndCheckError()
+
+ def testTimeout(self):
+ self._copyDataFromPipe(self.handles.consumer_handle, bytearray(),
+ deadline=100)
+ with self.assertRaises(data_pipe_utils.DataPipeCopyException):
+ self._runAndCheckError()
+
+ def testCloseProducerWithoutWriting(self):
+ self._copyDataFromPipe(self.handles.consumer_handle, bytearray())
+ self.handles.producer_handle.Close()
+ self._runAndCheckError()
+
+ def testEagerWriteOfEmptyData(self):
+ self._testEagerWrite(bytearray())
+
+ def testDelayedWriteOfEmptyData(self):
+ self._testDelayedWrite(bytearray())
+
+ def testEagerWriteOfNonEmptyData(self):
+ self._testEagerWrite(_GetRandomBuffer(1024))
+
+ def testDelayedWriteOfNonEmptyData(self):
+ self._testDelayedWrite(_GetRandomBuffer(1024))
+
+ def testEagerWriteOfLargeBuffer(self):
+ self._testEagerWrite(_GetRandomBuffer(32 * 1024))
+
+ def testDelayedWriteOfLargeBuffer(self):
+ self._testDelayedWrite(_GetRandomBuffer(32 * 1024))