go/system: adding two-phase methods of data pipe

R=gautham.dorai@gmail.com, jamesr@chromium.org

Review URL: https://codereview.chromium.org/860193002
diff --git a/mojo/go/tests/system_test.go b/mojo/go/tests/system_test.go
index f3581f0..b662ec7 100644
--- a/mojo/go/tests/system_test.go
+++ b/mojo/go/tests/system_test.go
@@ -95,7 +95,7 @@
 		t.Fatalf("state should allow all signals after CreateMessagePipe:%v", state.SatisfiableSignals)
 	}
 
-	r, state = h0.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, 0)
+	r, state = h0.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, system.MOJO_DEADLINE_INDEFINITE)
 	if r != system.MOJO_RESULT_OK {
 		t.Fatalf("h0 should be writable:%v", r)
 	}
@@ -157,7 +157,7 @@
 		t.Fatalf("Close on h0 failed:%v", r)
 	}
 
-	r, state = h1.Wait(MOJO_HANDLE_SIGNAL_READWRITABLE, 100)
+	r, state = h1.Wait(MOJO_HANDLE_SIGNAL_READWRITABLE, system.MOJO_DEADLINE_INDEFINITE)
 	if r != system.MOJO_RESULT_FAILED_PRECONDITION {
 		t.Fatalf("h1 should not be readable/writable after Close(h0):%v", r)
 	}
@@ -187,15 +187,19 @@
 	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 0); r != system.MOJO_RESULT_DEADLINE_EXCEEDED {
 		t.Fatalf("hc should not be readable:%v", r)
 	}
-	if r, _ = hp.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, 0); r != system.MOJO_RESULT_OK {
+	if r, _ = hp.Wait(system.MOJO_HANDLE_SIGNAL_WRITABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_OK {
 		t.Fatalf("hp should be writeable:%v", r)
 	}
+
+	// Test one-phase read/write.
+	// Writing.
 	kHello := []byte("hello")
 	r, numBytes := hp.WriteData(kHello, system.MOJO_WRITE_DATA_FLAG_NONE)
 	if r != system.MOJO_RESULT_OK || numBytes != len(kHello) {
 		t.Fatalf("Failed WriteData on hp:%v numBytes:%d", r, numBytes)
 	}
-	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 1000); r != system.MOJO_RESULT_OK {
+	// Reading.
+	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_OK {
 		t.Fatalf("hc should be readable after WriteData on hp:%v", r)
 	}
 	r, data := hc.ReadData(system.MOJO_READ_DATA_FLAG_NONE)
@@ -205,10 +209,51 @@
 	if !bytes.Equal(data, kHello) {
 		t.Fatalf("Invalid data expected:%s, got:%s", kHello, data)
 	}
+
+	// Test two-phase read/write.
+	// Writing.
+	kHello = []byte("Hello, world!")
+	r, buf := hp.BeginWriteData(len(kHello), system.MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
+	if r != system.MOJO_RESULT_OK {
+		t.Fatalf("Failed BeginWriteData on hp:%v numBytes:%d", r, len(kHello))
+	}
+	if len(buf) < len(kHello) {
+		t.Fatalf("Buffer size(%d) should be at least %d", len(buf), len(kHello))
+	}
+	copy(buf, kHello)
+	if r, _ := hp.WriteData(kHello, system.MOJO_WRITE_DATA_FLAG_NONE); r != system.MOJO_RESULT_BUSY {
+		t.Fatalf("hp should be busy during a two-phase write: %v", r)
+	}
+	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 0); r != system.MOJO_RESULT_DEADLINE_EXCEEDED {
+		t.Fatalf("hc shouldn't be readable before EndWriteData on hp:%v", r)
+	}
+	if r := hp.EndWriteData(len(kHello)); r != system.MOJO_RESULT_OK {
+		t.Fatalf("Failed EndWriteData on hp:%v", r)
+	}
+	// Reading.
+	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_OK {
+		t.Fatalf("hc should be readable after EndWriteData on hp:%v", r)
+	}
+	if r, buf = hc.BeginReadData(len(kHello), system.MOJO_READ_DATA_FLAG_ALL_OR_NONE); r != system.MOJO_RESULT_OK {
+		t.Fatalf("Failed BeginReadData on hc:%v numBytes:%d", r, len(kHello))
+	}
+	if len(buf) != len(kHello) {
+		t.Fatalf("Buffer size(%d) should be equal to %d", len(buf), len(kHello))
+	}
+	if r, _ := hc.ReadData(system.MOJO_READ_DATA_FLAG_NONE); r != system.MOJO_RESULT_BUSY {
+		t.Fatalf("hc should be busy during a two-phase read: %v", r)
+	}
+	if !bytes.Equal(buf, kHello) {
+		t.Fatalf("Invalid data expected:%s, got:%s", kHello, buf)
+	}
+	if r := hc.EndReadData(len(buf)); r != system.MOJO_RESULT_OK {
+		t.Fatalf("Failed EndReadData on hc:%v", r)
+	}
+
 	if r = hp.Close(); r != system.MOJO_RESULT_OK {
 		t.Fatalf("Close on hp failed:%v", r)
 	}
-	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, 100); r != system.MOJO_RESULT_FAILED_PRECONDITION {
+	if r, _ = hc.Wait(system.MOJO_HANDLE_SIGNAL_READABLE, system.MOJO_DEADLINE_INDEFINITE); r != system.MOJO_RESULT_FAILED_PRECONDITION {
 		t.Fatalf("hc should not be readable after hp closed:%v", r)
 	}
 	if r = hc.Close(); r != system.MOJO_RESULT_OK {
diff --git a/mojo/public/go/system/c_allocators.c b/mojo/public/go/system/c_allocators.c
index 7af7bb2..2f93908 100644
--- a/mojo/public/go/system/c_allocators.c
+++ b/mojo/public/go/system/c_allocators.c
@@ -157,3 +157,15 @@
   free(p.num_bytes);
   free(p.elements);
 }
+
+struct TwoPhaseActionParams MallocTwoPhaseActionParams() {
+  struct TwoPhaseActionParams p;
+  p.buffer = (void**)malloc(sizeof(void*));
+  p.num_bytes = (uint32_t*)malloc(sizeof(uint32_t));
+  return p;
+}
+
+void FreeTwoPhaseActionParams(struct TwoPhaseActionParams p) {
+  free(p.buffer);
+  free(p.num_bytes);
+}
diff --git a/mojo/public/go/system/c_allocators.h b/mojo/public/go/system/c_allocators.h
index f915e6b..6050507 100644
--- a/mojo/public/go/system/c_allocators.h
+++ b/mojo/public/go/system/c_allocators.h
@@ -126,4 +126,11 @@
 struct WriteDataParams MallocWriteDataParams(uint32_t length);
 void FreeWriteDataParams(struct WriteDataParams p);
 
+struct TwoPhaseActionParams {
+  void** buffer;
+  uint32_t* num_bytes;
+};
+struct TwoPhaseActionParams MallocTwoPhaseActionParams();
+void FreeTwoPhaseActionParams(struct TwoPhaseActionParams p);
+
 #endif  // MOJO_PUBLIC_GO_SYSTEM_C_ALLOCATORS_H_
diff --git a/mojo/public/go/system/data_pipe.go b/mojo/public/go/system/data_pipe.go
index 7de27d1..b79a1e0 100644
--- a/mojo/public/go/system/data_pipe.go
+++ b/mojo/public/go/system/data_pipe.go
@@ -16,6 +16,31 @@
 	// ReadData reads data from the data pipe consumer handle with the
 	// given flags. On success, returns the data that was read.
 	ReadData(flags MojoReadDataFlags) (MojoResult, []byte)
+
+	// BeginReadData begins a two-phase read from the data pipe consumer.
+	// On success, returns a slice from which the caller can read up to its
+	// length bytes of data. If flags has |MOJO_READ_DATA_FLAG_ALL_OR_NONE|
+	// set, then the slice length will be at least as large as |numBytes|,
+	// which must also be a multiple of the element size (otherwise the
+	// caller must check the length of the slice).
+	//
+	// During a two-phase read, this handle is *not* readable. E.g., read
+	// from this handle will return |MOJO_RESULT_BUSY|.
+	//
+	// Once the caller has finished reading data from the slice, it should
+	// call |EndReadData()| to specify the amount read and to complete the
+	// two-phase read.
+	BeginReadData(numBytes int, flags MojoReadDataFlags) (MojoResult, []byte)
+
+	// EndReadData ends a two-phase read from the data pipe consumer that
+	// was begun by a call to |BeginReadData()| on the same handle.
+	// |numBytesRead| should indicate the amount of data actually read; it
+	// must be less than or equal to the length of the slice returned by
+	// |BeginReadData()| and must be a multiple of the element size.
+	//
+	// On failure, the two-phase read (if any) is ended (so the handle may
+	// become readable again) but no data is "removed" from the data pipe.
+	EndReadData(numBytesRead int) MojoResult
 }
 
 // ProducerHandle is a handle for the producer part of a data pipe.
@@ -26,6 +51,34 @@
 	// given flags. On success, returns the number of bytes that were
 	// actually written.
 	WriteData(data []byte, flags MojoWriteDataFlags) (MojoResult, int)
+
+	// BeginWriteData begins a two-phase write to the data pipe producer.
+	// On success, returns a slice to which the caller can write. If flags
+	// has |MOJO_READ_DATA_FLAG_ALL_OR_NONE| set, then the slice length will
+	// be at least as large as |numBytes|, which must also be a multiple of
+	// the element size (otherwise the caller must check the length of the
+	// slice).
+	//
+	// During a two-phase write, this handle is *not* writable. E.g., write
+	// to this handle will return |MOJO_RESULT_BUSY|.
+	//
+	// Once the caller has finished writing data to the buffer, it should
+	// call |EndWriteData()| to specify the amount written and to complete
+	// the two-phase write.
+	BeginWriteData(numBytes int, flags MojoWriteDataFlags) (MojoResult, []byte)
+
+	// EndWriteData ends a two-phase write to the data pipe producer that
+	// was begun by a call to |BeginWriteData()| on the same handle.
+	// |numBytesWritten| should indicate the amount of data actually
+	// written; it must be less than or equal to the length of the slice
+	// returned by |BeginWriteData()| and must be a multiple of the element
+	// size. The slice returned from |BeginWriteData()| must have been
+	// filled with exactly |numBytesWritten| bytes of data.
+	//
+	// On failure, the two-phase write (if any) is ended (so the handle may
+	// become writable again, if there's space available) but no data
+	// written to the slice is "put into" the data pipe.
+	EndWriteData(numBytesWritten int) MojoResult
 }
 
 type dataPipeConsumer struct {
@@ -49,6 +102,19 @@
 	return MojoResult(result), data
 }
 
+func (h *dataPipeConsumer) BeginReadData(numBytes int, flags MojoReadDataFlags) (MojoResult, []byte) {
+	cParams := C.MallocTwoPhaseActionParams()
+	defer C.FreeTwoPhaseActionParams(cParams)
+	*cParams.num_bytes = C.uint32_t(numBytes)
+	result := C.MojoBeginReadData(h.mojoHandle.cValue(), cParams.buffer, cParams.num_bytes, flags.cValue())
+	buffer := unsafeByteSlice(unsafe.Pointer(*cParams.buffer), int(*cParams.num_bytes))
+	return MojoResult(result), buffer
+}
+
+func (h *dataPipeConsumer) EndReadData(numBytesRead int) MojoResult {
+	return MojoResult(C.MojoEndReadData(h.mojoHandle.cValue(), C.uint32_t(numBytesRead)))
+}
+
 type dataPipeProducer struct {
 	baseHandle
 }
@@ -62,3 +128,16 @@
 	result := C.MojoWriteData(h.mojoHandle.cValue(), cParams.elements, cParams.num_bytes, flags.cValue())
 	return MojoResult(result), int(*cParams.num_bytes)
 }
+
+func (h *dataPipeProducer) BeginWriteData(numBytes int, flags MojoWriteDataFlags) (MojoResult, []byte) {
+	cParams := C.MallocTwoPhaseActionParams()
+	defer C.FreeTwoPhaseActionParams(cParams)
+	*cParams.num_bytes = C.uint32_t(numBytes)
+	result := C.MojoBeginWriteData(h.mojoHandle.cValue(), cParams.buffer, cParams.num_bytes, flags.cValue())
+	buffer := unsafeByteSlice(unsafe.Pointer(*cParams.buffer), int(*cParams.num_bytes))
+	return MojoResult(result), buffer
+}
+
+func (h *dataPipeProducer) EndWriteData(numBytesWritten int) MojoResult {
+	return MojoResult(C.MojoEndWriteData(h.mojoHandle.cValue(), C.uint32_t(numBytesWritten)))
+}