go/system: adding two-phase methods of data pipe
R=gautham.dorai@gmail.com, jamesr@chromium.org
Review URL: https://codereview.chromium.org/860193002
diff --git a/mojo/go/tests/system_test.go b/mojo/go/tests/system_test.go
index f3581f0..b662ec7 100644
--- a/mojo/go/tests/system_test.go
+++ b/mojo/go/tests/system_test.go
@@ -95,7 +95,7 @@
t.Fatalf("state should allow all signals after CreateMessagePipe:%v", state.SatisfiableSignals)
}
- r, state = h0.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, 0)
+ r, state = h0.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, system.MOJO_DEADLINE_INDEFINITE)
if r != system.MOJO_RESULT_OK {
t.Fatalf("h0 should be writable:%v", r)
}
@@ -157,7 +157,7 @@
t.Fatalf("Close on h0 failed:%v", r)
}
- r, state = h1.Wait(MOJO_HANDLE_SIGNAL_READWRITABLE, 100)
+ r, state = h1.Wait(MOJO_HANDLE_SIGNAL_READWRITABLE, system.MOJO_DEADLINE_INDEFINITE)
if r != system.MOJO_RESULT_FAILED_PRECONDITION {
t.Fatalf("h1 should not be readable/writable after Close(h0):%v", r)
}
@@ -187,15 +187,19 @@
if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 0); r != system.MOJO_RESULT_DEADLINE_EXCEEDED {
t.Fatalf("hc should not be readable:%v", r)
}
- if r, _ = hp.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, 0); r != system.MOJO_RESULT_OK {
+ if r, _ = hp.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_OK {
t.Fatalf("hp should be writeable:%v", r)
}
+
+ // Test one-phase read/write.
+ // Writing.
kHello := []byte("hello")
r, numBytes := hp.WriteData(kHello, system.MOJO_WRITE_DATA_FLAG_NONE)
if r != system.MOJO_RESULT_OK || numBytes != len(kHello) {
t.Fatalf("Failed WriteData on hp:%v numBytes:%d", r, numBytes)
}
- if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 1000); r != system.MOJO_RESULT_OK {
+ // Reading.
+ if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_OK {
t.Fatalf("hc should be readable after WriteData on hp:%v", r)
}
r, data := hc.ReadData(system.MOJO_READ_DATA_FLAG_NONE)
@@ -205,10 +209,51 @@
if !bytes.Equal(data, kHello) {
t.Fatalf("Invalid data expected:%s, got:%s", kHello, data)
}
+
+ // Test two-phase read/write.
+ // Writing.
+ kHello = []byte("Hello, world!")
+ r, buf := hp.BeginWriteData(len(kHello), system.MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
+ if r != system.MOJO_RESULT_OK {
+ t.Fatalf("Failed BeginWriteData on hp:%v numBytes:%d", r, len(kHello))
+ }
+ if len(buf) < len(kHello) {
+ t.Fatalf("Buffer size(%d) should be at least %d", len(buf), len(kHello))
+ }
+ copy(buf, kHello)
+ if r, _ := hp.WriteData(kHello, system.MOJO_WRITE_DATA_FLAG_NONE); r != system.MOJO_RESULT_BUSY {
+ t.Fatalf("hp should be busy during a two-phase write: %v", r)
+ }
+ if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 0); r != system.MOJO_RESULT_DEADLINE_EXCEEDED {
+ t.Fatalf("hc shouldn't be readable before EndWriteData on hp:%v", r)
+ }
+ if r := hp.EndWriteData(len(kHello)); r != system.MOJO_RESULT_OK {
+ t.Fatalf("Failed EndWriteData on hp:%v", r)
+ }
+ // Reading.
+ if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_OK {
+ t.Fatalf("hc should be readable after EndWriteData on hp:%v", r)
+ }
+ if r, buf = hc.BeginReadData(len(kHello), system.MOJO_READ_DATA_FLAG_ALL_OR_NONE); r != system.MOJO_RESULT_OK {
+ t.Fatalf("Failed BeginReadData on hc:%v numBytes:%d", r, len(kHello))
+ }
+ if len(buf) != len(kHello) {
+ t.Fatalf("Buffer size(%d) should be equal to %d", len(buf), len(kHello))
+ }
+ if r, _ := hc.ReadData(system.MOJO_READ_DATA_FLAG_NONE); r != system.MOJO_RESULT_BUSY {
+ t.Fatalf("hc should be busy during a two-phase read: %v", r)
+ }
+ if !bytes.Equal(buf, kHello) {
+ t.Fatalf("Invalid data expected:%s, got:%s", kHello, buf)
+ }
+ if r := hc.EndReadData(len(buf)); r != system.MOJO_RESULT_OK {
+ t.Fatalf("Failed EndReadData on hc:%v", r)
+ }
+
if r = hp.Close(); r != system.MOJO_RESULT_OK {
t.Fatalf("Close on hp failed:%v", r)
}
- if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 100); r != system.MOJO_RESULT_FAILED_PRECONDITION {
+ if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_FAILED_PRECONDITION {
t.Fatalf("hc should not be readable after hp closed:%v", r)
}
if r = hc.Close(); r != system.MOJO_RESULT_OK {
diff --git a/mojo/public/go/system/c_allocators.c b/mojo/public/go/system/c_allocators.c
index 7af7bb2..2f93908 100644
--- a/mojo/public/go/system/c_allocators.c
+++ b/mojo/public/go/system/c_allocators.c
@@ -157,3 +157,15 @@
free(p.num_bytes);
free(p.elements);
}
+
+struct TwoPhaseActionParams MallocTwoPhaseActionParams() {
+ struct TwoPhaseActionParams p;
+ p.buffer = (void**)malloc(sizeof(void*));
+ p.num_bytes = (uint32_t*)malloc(sizeof(uint32_t));
+ return p;
+}
+
+void FreeTwoPhaseActionParams(struct TwoPhaseActionParams p) {
+ free(p.buffer);
+ free(p.num_bytes);
+}
diff --git a/mojo/public/go/system/c_allocators.h b/mojo/public/go/system/c_allocators.h
index f915e6b..6050507 100644
--- a/mojo/public/go/system/c_allocators.h
+++ b/mojo/public/go/system/c_allocators.h
@@ -126,4 +126,11 @@
struct WriteDataParams MallocWriteDataParams(uint32_t length);
void FreeWriteDataParams(struct WriteDataParams p);
+struct TwoPhaseActionParams {
+ void** buffer;
+ uint32_t* num_bytes;
+};
+struct TwoPhaseActionParams MallocTwoPhaseActionParams();
+void FreeTwoPhaseActionParams(struct TwoPhaseActionParams p);
+
#endif // MOJO_PUBLIC_GO_SYSTEM_C_ALLOCATORS_H_
diff --git a/mojo/public/go/system/data_pipe.go b/mojo/public/go/system/data_pipe.go
index 7de27d1..b79a1e0 100644
--- a/mojo/public/go/system/data_pipe.go
+++ b/mojo/public/go/system/data_pipe.go
@@ -16,6 +16,31 @@
// ReadData reads data from the data pipe consumer handle with the
// given flags. On success, returns the data that was read.
ReadData(flags MojoReadDataFlags) (MojoResult, []byte)
+
+ // BeginReadData begins a two-phase read from the data pipe consumer.
+ // On success, returns a slice from which the caller can read up to its
+ // length bytes of data. If flags has |MOJO_READ_DATA_FLAG_ALL_OR_NONE|
+ // set, then the slice length will be at least as large as |numBytes|,
+ // which must also be a multiple of the element size (otherwise the
+ // caller must check the length of the slice).
+ //
+ // During a two-phase read, this handle is *not* readable. E.g., read
+ // from this handle will return |MOJO_RESULT_BUSY|.
+ //
+ // Once the caller has finished reading data from the slice, it should
+ // call |EndReadData()| to specify the amount read and to complete the
+ // two-phase read.
+ BeginReadData(numBytes int, flags MojoReadDataFlags) (MojoResult, []byte)
+
+ // EndReadData ends a two-phase read from the data pipe consumer that
+ // was begun by a call to |BeginReadData()| on the same handle.
+ // |numBytesRead| should indicate the amount of data actually read; it
+ // must be less than or equal to the length of the slice returned by
+ // |BeginReadData()| and must be a multiple of the element size.
+ //
+ // On failure, the two-phase read (if any) is ended (so the handle may
+ // become readable again) but no data is "removed" from the data pipe.
+ EndReadData(numBytesRead int) MojoResult
}
// ProducerHandle is a handle for the producer part of a data pipe.
@@ -26,6 +51,34 @@
// given flags. On success, returns the number of bytes that were
// actually written.
WriteData(data []byte, flags MojoWriteDataFlags) (MojoResult, int)
+
+ // BeginWriteData begins a two-phase write to the data pipe producer.
+ // On success, returns a slice to which the caller can write. If flags
+ // has |MOJO_READ_DATA_FLAG_ALL_OR_NONE| set, then the slice length will
+ // be at least as large as |numBytes|, which must also be a multiple of
+ // the element size (otherwise the caller must check the length of the
+ // slice).
+ //
+ // During a two-phase write, this handle is *not* writable. E.g., write
+ // to this handle will return |MOJO_RESULT_BUSY|.
+ //
+ // Once the caller has finished writing data to the buffer, it should
+ // call |EndWriteData()| to specify the amount written and to complete
+ // the two-phase write.
+ BeginWriteData(numBytes int, flags MojoWriteDataFlags) (MojoResult, []byte)
+
+ // EndWriteData ends a two-phase write to the data pipe producer that
+ // was begun by a call to |BeginWriteData()| on the same handle.
+ // |numBytesWritten| should indicate the amount of data actually
+ // written; it must be less than or equal to the length of the slice
+ // returned by |BeginWriteData()| and must be a multiple of the element
+ // size. The slice returned from |BeginWriteData()| must have been
+ // filled with exactly |numBytesWritten| bytes of data.
+ //
+ // On failure, the two-phase write (if any) is ended (so the handle may
+ // become writable again, if there's space available) but no data
+ // written to the slice is "put into" the data pipe.
+ EndWriteData(numBytesWritten int) MojoResult
}
type dataPipeConsumer struct {
@@ -49,6 +102,19 @@
return MojoResult(result), data
}
+func (h *dataPipeConsumer) BeginReadData(numBytes int, flags MojoReadDataFlags) (MojoResult, []byte) {
+ cParams := C.MallocTwoPhaseActionParams()
+ defer C.FreeTwoPhaseActionParams(cParams)
+ *cParams.num_bytes = C.uint32_t(numBytes)
+ result := C.MojoBeginReadData(h.mojoHandle.cValue(), cParams.buffer, cParams.num_bytes, flags.cValue())
+ buffer := unsafeByteSlice(unsafe.Pointer(*cParams.buffer), int(*cParams.num_bytes))
+ return MojoResult(result), buffer
+}
+
+func (h *dataPipeConsumer) EndReadData(numBytesRead int) MojoResult {
+ return MojoResult(C.MojoEndReadData(h.mojoHandle.cValue(), C.uint32_t(numBytesRead)))
+}
+
type dataPipeProducer struct {
baseHandle
}
@@ -62,3 +128,16 @@
result := C.MojoWriteData(h.mojoHandle.cValue(), cParams.elements, cParams.num_bytes, flags.cValue())
return MojoResult(result), int(*cParams.num_bytes)
}
+
+func (h *dataPipeProducer) BeginWriteData(numBytes int, flags MojoWriteDataFlags) (MojoResult, []byte) {
+ cParams := C.MallocTwoPhaseActionParams()
+ defer C.FreeTwoPhaseActionParams(cParams)
+ *cParams.num_bytes = C.uint32_t(numBytes)
+ result := C.MojoBeginWriteData(h.mojoHandle.cValue(), cParams.buffer, cParams.num_bytes, flags.cValue())
+ buffer := unsafeByteSlice(unsafe.Pointer(*cParams.buffer), int(*cParams.num_bytes))
+ return MojoResult(result), buffer
+}
+
+func (h *dataPipeProducer) EndWriteData(numBytesWritten int) MojoResult {
+ return MojoResult(C.MojoEndWriteData(h.mojoHandle.cValue(), C.uint32_t(numBytesWritten)))
+}