Dart: Use a RawReceivePort to receive events for Mojo handles.

This makes MojoEventStream more of an event handler, and
MojoEventStreamListener more of the controller of the event handler, so
I've changed the names accordingly.

Since handlers on RawReceivePorts don't throw exceptions into the current
Zone, but rather into the root Zone, errors thrown by event handlers are
now propagated to the event controller's onError function rather than
relying on clients using Zones.

This change has a noticible improvement in startup time due to avoiding
compiling Stream related functions. ~130ms -> ~120ms on my machine, and
so hopeuflly ~150ms -> 130-140ms on the perf bot.

BUG=
R=johnmccutchan@google.com

Review URL: https://codereview.chromium.org/1414483010 .
diff --git a/benchmarks/mojo_rtt_benchmark/lib/echo_server.dart b/benchmarks/mojo_rtt_benchmark/lib/echo_server.dart
index 2608361..5134158 100644
--- a/benchmarks/mojo_rtt_benchmark/lib/echo_server.dart
+++ b/benchmarks/mojo_rtt_benchmark/lib/echo_server.dart
@@ -23,7 +23,7 @@
 
   Future close() => _stub.close();
 
-  _errorHandler() => _application.removeService(this);
+  _errorHandler(Object e) => _application.removeService(this);
 }
 
 class EchoApplication extends Application {
@@ -59,7 +59,7 @@
     return echoService;
   }
 
-  _errorHandler() async {
+  _errorHandler(Object e) async {
     _closing = true;
     for (var service in _echoServices) {
       await service.close();
diff --git a/benchmarks/mojo_rtt_benchmark/lib/isolate.dart b/benchmarks/mojo_rtt_benchmark/lib/isolate.dart
index 668012e..5df9cf9 100644
--- a/benchmarks/mojo_rtt_benchmark/lib/isolate.dart
+++ b/benchmarks/mojo_rtt_benchmark/lib/isolate.dart
@@ -95,7 +95,7 @@
     }
   }
 
-  void _errorHandler() {
+  void _errorHandler(Object e) {
     _doEcho = false;
     _receivePort.close();
     MojoHandle.reportLeakedHandles();
diff --git a/benchmarks/mojo_rtt_benchmark/lib/main.dart b/benchmarks/mojo_rtt_benchmark/lib/main.dart
index c553724..7a2f873 100644
--- a/benchmarks/mojo_rtt_benchmark/lib/main.dart
+++ b/benchmarks/mojo_rtt_benchmark/lib/main.dart
@@ -54,7 +54,7 @@
     for (int i = 0; i < _numClients; i++) {
       var newProxy = new EchoProxy.unbound();
       newProxy.errorFuture.then((e) {
-        _errorHandler();
+        _errorHandler(e);
       });
       connectToService(echoUrl, newProxy);
       _echoProxies.add(newProxy);
@@ -99,7 +99,7 @@
     return _echoProxies[idx].ptr.echoString(s);
   }
 
-  _errorHandler() {
+  _errorHandler(Object e) {
     _doEcho = false;
     return Future.wait(_echoProxies.map((p) => p.close())).then((_) {
       MojoHandle.reportLeakedHandles();
diff --git a/examples/dart/netcat/lib/main.dart b/examples/dart/netcat/lib/main.dart
index 9e5cc20..4cef6a6 100644
--- a/examples/dart/netcat/lib/main.dart
+++ b/examples/dart/netcat/lib/main.dart
@@ -49,7 +49,7 @@
   TcpConnectedSocketProxy _socket;
   MojoDataPipeProducer _socketSender;
   MojoDataPipeConsumer _socketReceiver;
-  MojoEventStream _socketReceiverEventStream;
+  MojoEventSubscription _socketReceiverEventSubscription;
   final ByteData _readBuffer;
   final ByteData _writeBuffer;
 
@@ -81,8 +81,9 @@
       _startReadingFromTerminal();
 
       // Set up reading from the socket.
-      _socketReceiverEventStream = new MojoEventStream(_socketReceiver.handle);
-      _socketReceiverEventStream.listen(_onSocketReceiverEvent);
+      _socketReceiverEventSubscription =
+          new MojoEventSubscription(_socketReceiver.handle);
+      _socketReceiverEventSubscription.subscribe(_onSocketReceiverEvent);
     } catch (e) {
       _shutDown();
     }
@@ -93,7 +94,9 @@
     _terminal.ptr
         .read(_writeBuffer.lengthInBytes, 0, files.Whence.FROM_CURRENT)
         .then(_onReadFromTerminal)
-        .catchError((e) { _shutDown(); });
+        .catchError((e) {
+      _shutDown();
+    });
   }
 
   void _onReadFromTerminal(files.FileReadResponseParams p) {
@@ -127,10 +130,13 @@
       var numBytesRead = _socketReceiver.read(_readBuffer);
       if (_socketReceiver.status.isOk) {
         assert(numBytesRead > 0);
-        _terminal.ptr.write(_readBuffer.buffer.asUint8List(0, numBytesRead), 0,
-                            files.Whence.FROM_CURRENT)
-            .catchError((e) { _shutDown(); });
-        _socketReceiverEventStream.enableReadEvents();
+        _terminal.ptr
+            .write(_readBuffer.buffer.asUint8List(0, numBytesRead), 0,
+                files.Whence.FROM_CURRENT)
+            .catchError((e) {
+          _shutDown();
+        });
+        _socketReceiverEventSubscription.enableReadEvents();
       } else {
         shouldShutDown = true;
       }
@@ -145,18 +151,16 @@
   }
 
   void _shutDown() {
-    if (_socketReceiverEventStream != null) {
-      ignoreFuture(_socketReceiverEventStream.close());
-      _socketReceiverEventStream = null;
+    if (_socketReceiverEventSubscription != null) {
+      ignoreFuture(_socketReceiverEventSubscription.close());
+      _socketReceiverEventSubscription = null;
     }
     if (_socketSender != null) {
-      if (_socketSender.handle.isValid)
-        _socketSender.handle.close();
+      if (_socketSender.handle.isValid) _socketSender.handle.close();
       _socketSender = null;
     }
     if (_socketReceiver != null) {
-      if (_socketReceiver.handle.isValid)
-        _socketReceiver.handle.close();
+      if (_socketReceiver.handle.isValid) _socketReceiver.handle.close();
       _socketReceiver = null;
     }
     if (_terminal != null) {
@@ -183,22 +187,28 @@
     try {
       remote_address = _getNetAddressFromUrl(url);
     } catch (e) {
-      fputs(terminal.ptr, 'HALP: Add a query: ?host=<host>&port=<port>\n'
-          '(<host> must be "localhost" or n1.n2.n3.n4)\n\n'
-          'Got query parameters:\n' + url.queryParameters.toString());
+      fputs(
+          terminal.ptr,
+          'HALP: Add a query: ?host=<host>&port=<port>\n'
+              '(<host> must be "localhost" or n1.n2.n3.n4)\n\n'
+              'Got query parameters:\n' +
+              url.queryParameters.toString());
       ignoreFuture(terminal.close());
       return;
     }
 
     // TODO(vtl): Currently, we only do IPv4, so this should work.
-    fputs(terminal.ptr,
-          'Connecting to: ' + remote_address.ipv4.addr.join('.') + ':' +
-              remote_address.ipv4.port.toString() + '...');
+    fputs(
+        terminal.ptr,
+        'Connecting to: ' +
+            remote_address.ipv4.addr.join('.') +
+            ':' +
+            remote_address.ipv4.port.toString() +
+            '...');
 
     var connector = new Connector(_application, terminal);
     // TODO(vtl): Do we have to do something on error?
-    connector.connect(remote_address)
-        .catchError((e) {});
+    connector.connect(remote_address).catchError((e) {});
   }
 
   // Note: May throw all sorts of things.
@@ -226,7 +236,7 @@
   MojoHandle appHandle = new MojoHandle(args[0]);
   String url = args[1];
   new NetcatApplication.fromHandle(appHandle)
-    ..onError = (() {
+    ..onError = ((Object e) {
       MojoHandle.reportLeakedHandles();
     });
 }
diff --git a/mojo/dart/embedder/io/socket_patch.dart b/mojo/dart/embedder/io/socket_patch.dart
index d8a80fd..80b172c 100644
--- a/mojo/dart/embedder/io/socket_patch.dart
+++ b/mojo/dart/embedder/io/socket_patch.dart
@@ -34,8 +34,8 @@
   bool _inClosed = false;
   bool _readEventsEnabled = true;
   bool _writeEventsEnabled = true;
-  MojoEventStream _pipeOutEvents;
-  MojoEventStream _pipeInEvents;
+  MojoEventSubscription _pipeOutEvents;
+  MojoEventSubscription _pipeInEvents;
   InternetAddress _localAddress;
   int _localPort;
   InternetAddress _remoteAddress;
@@ -267,22 +267,6 @@
     }
   }
 
-  _onInputError(e, st) {
-    _controller.addError(e);
-    _onInputDone();
-  }
-
-  _onInputDone() {
-    if (_inClosed) {
-      return;
-    }
-    if (_trace) {
-      _tracePrint('<- READ_CLOSED (done)');
-    }
-    _controller.add(RawSocketEvent.READ_CLOSED);
-    _inClosed = true;
-  }
-
   _onOutputData(List<int> event) {
     if (_outClosed) {
       return;
@@ -309,40 +293,20 @@
     }
   }
 
-  _onOutputError(e, st) {
-    _controller.addError(e);
-    _onOutputDone();
-  }
-
-  _onOutputDone() {
-    if (_outClosed) {
-      return;
-    }
-    if (_trace) {
-      _tracePrint('<- CLOSED (done)');
-    }
-    _controller.add(RawSocketEvent.CLOSED);
-    _outClosed = true;
-  }
-
   _setupIn() {
     assert(_pipeInEvents == null);
-    _pipeInEvents = new MojoEventStream(_pipeIn.consumer.handle,
+    _pipeInEvents = new MojoEventSubscription(_pipeIn.consumer.handle,
                                         MojoHandleSignals.READABLE +
                                         MojoHandleSignals.PEER_CLOSED);
-    _pipeInEvents.listen(_onInputData,
-                         onError: _onInputError,
-                         onDone: _onInputDone);
+    _pipeInEvents.subscribe(_onInputData);
   }
 
   _setupOut() {
     assert(_pipeOutEvents == null);
-    _pipeOutEvents = new MojoEventStream(_pipeOut.producer.handle,
+    _pipeOutEvents = new MojoEventSubscription(_pipeOut.producer.handle,
                                          MojoHandleSignals.WRITABLE +
                                          MojoHandleSignals.PEER_CLOSED);
-    _pipeOutEvents.listen(_onOutputData,
-                          onError: _onOutputError,
-                          onDone: _onOutputDone);
+    _pipeOutEvents.subscribe(_onOutputData);
   }
 
   _shutdownIn([bool force = false]) {
@@ -498,27 +462,27 @@
   }
 
 
-  static _enableReadEvents(MojoEventStream stream) {
-    if (stream == null) {
+  static _enableReadEvents(MojoEventSubscription subscription) {
+    if (subscription == null) {
       return;
     }
-    stream.enableSignals(MojoHandleSignals.PEER_CLOSED +
-                         MojoHandleSignals.READABLE);
+    subscription.enableSignals(MojoHandleSignals.PEER_CLOSED +
+                               MojoHandleSignals.READABLE);
   }
 
-  static _enableWriteEvents(MojoEventStream stream) {
-    if (stream == null) {
+  static _enableWriteEvents(MojoEventSubscription subscription) {
+    if (subscription == null) {
       return;
     }
-    stream.enableSignals(MojoHandleSignals.PEER_CLOSED +
-                         MojoHandleSignals.WRITABLE);
+    subscription.enableSignals(MojoHandleSignals.PEER_CLOSED +
+                               MojoHandleSignals.WRITABLE);
   }
 
-  static _disableEvents(MojoEventStream stream) {
-    if (stream == null) {
+  static _disableEvents(MojoEventSubscription subscription) {
+    if (subscription == null) {
       return;
     }
-    stream.enableSignals(MojoHandleSignals.PEER_CLOSED);
+    subscription.enableSignals(MojoHandleSignals.PEER_CLOSED);
   }
 
   _pause() {
diff --git a/mojo/dart/embedder/test/run_dart_tests.cc b/mojo/dart/embedder/test/run_dart_tests.cc
index d58f942..c6315af 100644
--- a/mojo/dart/embedder/test/run_dart_tests.cc
+++ b/mojo/dart/embedder/test/run_dart_tests.cc
@@ -117,10 +117,6 @@
   RunTest("codec_test.dart");
 }
 
-TEST(DartTest, handle_watcher_test) {
-  RunTest("handle_watcher_test.dart");
-}
-
 TEST(DartTest, bindings_generation_test) {
   RunTest("bindings_generation_test.dart");
 }
diff --git a/mojo/dart/test/bindings_generation_test.dart b/mojo/dart/test/bindings_generation_test.dart
index 71559f2..293ea98 100644
--- a/mojo/dart/test/bindings_generation_test.dart
+++ b/mojo/dart/test/bindings_generation_test.dart
@@ -14,8 +14,10 @@
 import 'package:mojom/mojo/test/test_structs.mojom.dart' as structs;
 import 'package:mojom/mojo/test/test_unions.mojom.dart' as unions;
 import 'package:mojom/mojo/test/rect.mojom.dart' as rect;
-import 'package:mojom/mojo/test/serialization_test_structs.mojom.dart' as serialization;
-import 'package:mojom/regression_tests/regression_tests.mojom.dart' as regression;
+import 'package:mojom/mojo/test/serialization_test_structs.mojom.dart'
+    as serialization;
+import 'package:mojom/regression_tests/regression_tests.mojom.dart'
+    as regression;
 
 class ProviderImpl implements sample.Provider {
   sample.ProviderStub _stub;
@@ -126,8 +128,8 @@
 testSerializeHandleToJSON() {
   var s = new serialization.Struct2();
 
-  Expect.throws(() => JSON.encode(s),
-    (e) => e.cause is bindings.MojoCodecError);
+  Expect.throws(
+      () => JSON.encode(s), (e) => e.cause is bindings.MojoCodecError);
 }
 
 testSerializeStructs() {
@@ -138,8 +140,7 @@
 }
 
 testSerializePodUnions() {
-  var s = new unions.WrapperStruct()
-    ..podUnion = new unions.PodUnion();
+  var s = new unions.WrapperStruct()..podUnion = new unions.PodUnion();
   s.podUnion.fUint32 = 32;
 
   Expect.equals(unions.PodUnionTag.fUint32, s.podUnion.tag);
@@ -152,10 +153,8 @@
 }
 
 testSerializeStructInUnion() {
-  var s = new unions.WrapperStruct()
-    ..objectUnion = new unions.ObjectUnion();
-  s.objectUnion.fDummy = new unions.DummyStruct()
-    ..fInt8 = 8;
+  var s = new unions.WrapperStruct()..objectUnion = new unions.ObjectUnion();
+  s.objectUnion.fDummy = new unions.DummyStruct()..fInt8 = 8;
 
   var message = messageOfStruct(s);
   var s2 = unions.WrapperStruct.deserialize(message.payload);
@@ -164,8 +163,7 @@
 }
 
 testSerializeArrayInUnion() {
-  var s = new unions.WrapperStruct()
-    ..objectUnion = new unions.ObjectUnion();
+  var s = new unions.WrapperStruct()..objectUnion = new unions.ObjectUnion();
   s.objectUnion.fArrayInt8 = [1, 2, 3];
 
   var message = messageOfStruct(s);
@@ -175,12 +173,8 @@
 }
 
 testSerializeMapInUnion() {
-  var s = new unions.WrapperStruct()
-    ..objectUnion = new unions.ObjectUnion();
-  s.objectUnion.fMapInt8 = {
-    "one": 1,
-    "two": 2,
-  };
+  var s = new unions.WrapperStruct()..objectUnion = new unions.ObjectUnion();
+  s.objectUnion.fMapInt8 = {"one": 1, "two": 2,};
 
   var message = messageOfStruct(s);
   var s2 = unions.WrapperStruct.deserialize(message.payload);
@@ -192,10 +186,8 @@
 testSerializeUnionInArray() {
   var s = new unions.SmallStruct()
     ..podUnionArray = [
-      new unions.PodUnion()
-        ..fUint16 = 16,
-      new unions.PodUnion()
-        ..fUint32 = 32,
+      new unions.PodUnion()..fUint16 = 16,
+      new unions.PodUnion()..fUint32 = 32,
     ];
 
   var message = messageOfStruct(s);
@@ -209,10 +201,8 @@
 testSerializeUnionInMap() {
   var s = new unions.SmallStruct()
     ..podUnionMap = {
-      'one': new unions.PodUnion()
-        ..fUint16 = 16,
-      'two': new unions.PodUnion()
-        ..fUint32 = 32,
+      'one': new unions.PodUnion()..fUint16 = 16,
+      'two': new unions.PodUnion()..fUint32 = 32,
     };
 
   var message = messageOfStruct(s);
@@ -224,10 +214,8 @@
 }
 
 testSerializeUnionInUnion() {
-  var s = new unions.WrapperStruct()
-    ..objectUnion = new unions.ObjectUnion();
-    s.objectUnion.fPodUnion = new unions.PodUnion()
-        ..fUint32 = 32;
+  var s = new unions.WrapperStruct()..objectUnion = new unions.ObjectUnion();
+  s.objectUnion.fPodUnion = new unions.PodUnion()..fUint32 = 32;
 
   var message = messageOfStruct(s);
   var s2 = unions.WrapperStruct.deserialize(message.payload);
@@ -303,7 +291,7 @@
   var testCompleter = new Completer();
   var pipe = new core.MojoMessagePipe();
   var proxy = new sample.ProviderProxy.fromEndpoint(pipe.endpoints[0]);
-  proxy.impl.onError = () => testCompleter.complete(true);
+  proxy.impl.onError = (_) => testCompleter.complete(true);
   Isolate.spawn(closingProviderIsolate, pipe.endpoints[1]);
   return testCompleter.future.then((b) {
     Expect.isTrue(b);
diff --git a/mojo/dart/test/compile_all_interfaces_test.dart b/mojo/dart/test/compile_all_interfaces_test.dart
index 8761879..c169fa3 100644
--- a/mojo/dart/test/compile_all_interfaces_test.dart
+++ b/mojo/dart/test/compile_all_interfaces_test.dart
@@ -14,9 +14,9 @@
 import 'package:mojo/application.dart';
 import 'package:mojo/bindings.dart';
 import 'package:mojo/core.dart';
-import 'package:mojom/mojo/application.mojom.dart';
-import 'package:mojom/mojo/service_provider.mojom.dart';
-import 'package:mojom/mojo/shell.mojom.dart';
+import 'package:mojo/mojo/application.mojom.dart';
+import 'package:mojo/mojo/service_provider.mojom.dart';
+import 'package:mojo/mojo/shell.mojom.dart';
 import 'package:mojom/math/math_calculator.mojom.dart';
 import 'package:mojom/no_module.mojom.dart';
 import 'package:mojom/mojo/test/rect.mojom.dart';
diff --git a/mojo/dart/test/handle_finalizer_test.dart b/mojo/dart/test/handle_finalizer_test.dart
index 39a1522..3c1e39f 100644
--- a/mojo/dart/test/handle_finalizer_test.dart
+++ b/mojo/dart/test/handle_finalizer_test.dart
@@ -16,10 +16,10 @@
   var endpoint = pipe.endpoints[0];
   Expect.isTrue(endpoint.handle.isValid);
 
-  var eventStream = new MojoEventStream(endpoint.handle);
-  // After making a MojoEventStream, the underlying mojo handle will have
-  // the native MojoClose called on it when the MojoEventStream is GC'd or the
-  // VM shuts down.
+  var eventSubscription = new MojoEventSubscription(endpoint.handle);
+  // After making a MojoEventSubscription, the underlying mojo handle will have
+  // the native MojoClose called on it when the MojoEventSubscription is GC'd or
+  // the VM shuts down.
 
   return endpoint.handle;
 }
diff --git a/mojo/dart/test/handle_watcher_test.dart b/mojo/dart/test/handle_watcher_test.dart
deleted file mode 100644
index 9ae2b6c..0000000
--- a/mojo/dart/test/handle_watcher_test.dart
+++ /dev/null
@@ -1,142 +0,0 @@
-// Copyright 2014 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-import 'dart:async';
-import 'dart:isolate';
-import 'dart:typed_data';
-
-import 'package:_testing/expect.dart';
-import 'package:mojo/core.dart';
-
-void simpleTest() {
-  var pipe = new MojoMessagePipe();
-  Expect.isNotNull(pipe);
-
-  var endpoint = pipe.endpoints[0];
-  Expect.isTrue(endpoint.handle.isValid);
-
-  var eventStream = new MojoEventStream(endpoint.handle);
-  var completer = new Completer();
-  int numEvents = 0;
-
-  eventStream.listen((_) {
-    numEvents++;
-    eventStream.close();
-  }, onDone: () {
-    completer.complete(numEvents);
-  });
-  eventStream.enableWriteEvents();
-
-  completer.future.then((int numEvents) {
-    Expect.equals(1, numEvents);
-  });
-}
-
-Future simpleAsyncAwaitTest() async {
-  var pipe = new MojoMessagePipe();
-  Expect.isNotNull(pipe);
-
-  var endpoint = pipe.endpoints[0];
-  Expect.isTrue(endpoint.handle.isValid);
-
-  var eventStream =
-      new MojoEventStream(endpoint.handle, MojoHandleSignals.READWRITE);
-
-  int numEvents = 0;
-  await for (List<int> event in eventStream) {
-    numEvents++;
-    eventStream.close();
-  }
-  Expect.equals(1, numEvents);
-}
-
-ByteData byteDataOfString(String s) {
-  return new ByteData.view((new Uint8List.fromList(s.codeUnits)).buffer);
-}
-
-String stringOfByteData(ByteData bytes) {
-  return new String.fromCharCodes(bytes.buffer.asUint8List().toList());
-}
-
-void expectStringFromEndpoint(
-    String expected, MojoMessagePipeEndpoint endpoint) {
-  // Query how many bytes are available.
-  var result = endpoint.query();
-  Expect.isNotNull(result);
-  int size = result.bytesRead;
-  Expect.isTrue(size > 0);
-
-  // Read the data.
-  ByteData bytes = new ByteData(size);
-  result = endpoint.read(bytes);
-  Expect.isNotNull(result);
-  Expect.equals(size, result.bytesRead);
-
-  // Convert to a string and check.
-  String msg = stringOfByteData(bytes);
-  Expect.equals(expected, msg);
-}
-
-Future pingPongIsolate(MojoMessagePipeEndpoint endpoint) async {
-  int pings = 0;
-  int pongs = 0;
-  var eventStream = new MojoEventStream(endpoint.handle);
-  await for (List<int> event in eventStream) {
-    var mojoSignals = new MojoHandleSignals(event[1]);
-    if (mojoSignals.isReadWrite) {
-      // We are either sending or receiving.
-      throw new Exception("Unexpected event");
-    } else if (mojoSignals.isReadable) {
-      expectStringFromEndpoint("Ping", endpoint);
-      pings++;
-      eventStream.enableWriteEvents();
-    } else if (mojoSignals.isWritable) {
-      endpoint.write(byteDataOfString("Pong"));
-      pongs++;
-      eventStream.enableReadEvents();
-    }
-  }
-  eventStream.close();
-  Expect.equals(10, pings);
-  Expect.equals(10, pongs);
-}
-
-Future pingPongTest() async {
-  var pipe = new MojoMessagePipe();
-  var isolate = await Isolate.spawn(pingPongIsolate, pipe.endpoints[0]);
-  var endpoint = pipe.endpoints[1];
-  var eventStream =
-      new MojoEventStream(endpoint.handle, MojoHandleSignals.READWRITE);
-
-  int pings = 0;
-  int pongs = 0;
-  await for (List<int> event in eventStream) {
-    var mojoSignals = new MojoHandleSignals(event[1]);
-    if (mojoSignals.isReadWrite) {
-      // We are either sending or receiving.
-      throw new Exception("Unexpected event");
-    } else if (mojoSignals.isReadable) {
-      expectStringFromEndpoint("Pong", endpoint);
-      pongs++;
-      if (pongs == 10) {
-        eventStream.close();
-      }
-      eventStream.enableWriteEvents(); // Now it is our turn to send.
-    } else if (mojoSignals.isWritable) {
-      if (pings < 10) {
-        endpoint.write(byteDataOfString("Ping"));
-        pings++;
-      }
-      eventStream.enableReadEvents(); // Don't send while waiting for reply.
-    }
-  }
-  Expect.equals(10, pings);
-  Expect.equals(10, pongs);
-}
-
-main() async {
-  simpleTest();
-  await simpleAsyncAwaitTest();
-  await pingPongTest();
-}
diff --git a/mojo/dart/test/ping_pong_test.dart b/mojo/dart/test/ping_pong_test.dart
index 6b8d958..7701419 100644
--- a/mojo/dart/test/ping_pong_test.dart
+++ b/mojo/dart/test/ping_pong_test.dart
@@ -36,19 +36,19 @@
 }
 
 void pipeTestIsolate(core.MojoMessagePipeEndpoint endpoint) {
-  var eventStream = new core.MojoEventStream(endpoint.handle);
-  eventStream.listen((List<int> event) {
+  var eventSubscription = new core.MojoEventSubscription(endpoint.handle);
+  eventSubscription.subscribe((List<int> event) {
     var mojoSignals = new core.MojoHandleSignals(event[1]);
     if (mojoSignals.isReadWrite) {
       throw 'We should only be reading or writing, not both.';
     } else if (mojoSignals.isReadable) {
       expectStringFromEndpoint("Ping", endpoint);
-      eventStream.enableWriteEvents();
+      eventSubscription.enableWriteEvents();
     } else if (mojoSignals.isWritable) {
       endpoint.write(byteDataOfString("Pong"));
-      eventStream.enableReadEvents();
+      eventSubscription.enableReadEvents();
     } else if (mojoSignals.isPeerClosed) {
-      eventStream.close();
+      eventSubscription.close();
     } else {
       throw 'Unexpected event.';
     }
@@ -58,24 +58,24 @@
 main() {
   var pipe = new core.MojoMessagePipe();
   var endpoint = pipe.endpoints[0];
-  var eventStream = new core.MojoEventStream(endpoint.handle);
+  var eventSubscription = new core.MojoEventSubscription(endpoint.handle);
   Isolate.spawn(pipeTestIsolate, pipe.endpoints[1]).then((_) {
-    eventStream.listen((List<int> event) {
+    eventSubscription.subscribe((List<int> event) {
       var mojoSignals = new core.MojoHandleSignals(event[1]);
       if (mojoSignals.isReadWrite) {
         throw 'We should only be reading or writing, not both.';
       } else if (mojoSignals.isReadable) {
         expectStringFromEndpoint("Pong", endpoint);
-        eventStream.close();
+        eventSubscription.close();
       } else if (mojoSignals.isWritable) {
         endpoint.write(byteDataOfString("Ping"));
-        eventStream.enableReadEvents();
+        eventSubscription.enableReadEvents();
       } else if (mojoSignals.isPeerClosed) {
         throw 'This end should close first.';
       } else {
         throw 'Unexpected event.';
       }
     });
-    eventStream.enableWriteEvents();
+    eventSubscription.enableWriteEvents();
   });
 }
diff --git a/mojo/dart/test/simple_handle_watcher_test.dart b/mojo/dart/test/simple_handle_watcher_test.dart
index d8456de..b0c0401 100644
--- a/mojo/dart/test/simple_handle_watcher_test.dart
+++ b/mojo/dart/test/simple_handle_watcher_test.dart
@@ -14,17 +14,17 @@
   var endpoint = pipe.endpoints[0];
   assert(endpoint.handle.isValid);
 
-  var eventStream = new core.MojoEventStream(endpoint.handle);
+  var eventSubscription = new core.MojoEventSubscription(endpoint.handle);
   var completer = new Completer();
   int numEvents = 0;
 
-  eventStream.listen((_) {
+  eventSubscription.subscribe((_) {
     numEvents++;
-    eventStream.close();
-  }, onDone: () {
-    completer.complete(numEvents);
+    eventSubscription.close().then((_) {
+      completer.complete(numEvents);
+    });
   });
-  eventStream.enableWriteEvents();
+  eventSubscription.enableWriteEvents();
 
   completer.future.then((int numEvents) {
     assert(numEvents == 1);
diff --git a/mojo/dart/test/validation_test.dart b/mojo/dart/test/validation_test.dart
index 4de347a..0d344e2 100644
--- a/mojo/dart/test/validation_test.dart
+++ b/mojo/dart/test/validation_test.dart
@@ -22,6 +22,10 @@
     _stub = new ConformanceTestInterfaceStub.fromEndpoint(endpoint, this);
   }
 
+  set onError(Function f) {
+    _stub.onError = f;
+  }
+
   void _complete() => _completer.complete(null);
 
   method0(double param0) => _complete();
@@ -35,17 +39,19 @@
     param0.structD.messagePipes.forEach((h) => h.close());
     _complete();
   }
+
   method6(List<List<int>> param0) => _complete();
   method7(StructF param0, List<List<int>> param1) => _complete();
   method8(List<List<String>> param0) => _complete();
   method9(List<List<MojoHandle>> param0) {
     if (param0 != null) {
       param0.forEach((l) => l.forEach((h) {
-        if (h != null) h.close();
-      }));
+            if (h != null) h.close();
+          }));
     }
     _complete();
   }
+
   method10(Map<String, int> param0) => _complete();
   method11(StructG param0) => _complete();
   method12(double param0, [Function responseFactory]) {
@@ -55,11 +61,13 @@
     _complete();
     return new Future.value(responseFactory(0.0));
   }
+
   method13(InterfaceAProxy param0, int param1, InterfaceAProxy param2) {
     if (param0 != null) param0.close(immediate: true);
     if (param2 != null) param2.close(immediate: true);
     _complete();
   }
+
   method14(UnionA param0) => _complete();
   method15(StructH param0) => _complete();
 
@@ -74,14 +82,13 @@
   var completer = new Completer();
   var conformanceImpl;
 
-  runZoned(() {
-    conformanceImpl =
-        new ConformanceTestInterfaceImpl(completer, pipe.endpoints[0]);
-  }, onError: (e, stackTrace) {
+  conformanceImpl =
+      new ConformanceTestInterfaceImpl(completer, pipe.endpoints[0]);
+  conformanceImpl.onError = ((e) {
     assert(e is MojoCodecError);
     // TODO(zra): Make the error messages conform?
     // assert(e == expected);
-    conformanceImpl.close(immediate: true);
+    conformanceImpl.close();
     pipe.endpoints[0].close();
     pipe.endpoints[1].close();
     handles.forEach((h) => h.close());
@@ -94,10 +101,11 @@
 
   return completer.future.then((_) {
     assert(expected == "PASS");
-    conformanceImpl.close();
-    pipe.endpoints[0].close();
-    pipe.endpoints[1].close();
-    handles.forEach((h) => h.close());
+    return conformanceImpl.close().then((_) {
+      pipe.endpoints[0].close();
+      pipe.endpoints[1].close();
+      handles.forEach((h) => h.close());
+    });
   }, onError: (e) {
     // Do nothing.
   });
diff --git a/mojo/public/dart/mojo/lib/src/application.dart b/mojo/public/dart/mojo/lib/src/application.dart
index 7d6cfbb..01380a7 100644
--- a/mojo/public/dart/mojo/lib/src/application.dart
+++ b/mojo/public/dart/mojo/lib/src/application.dart
@@ -13,13 +13,13 @@
       Application application, core.MojoMessagePipeEndpoint endpoint) {
     _application = application;
     _stub = new application_mojom.ApplicationStub.fromEndpoint(endpoint, this);
-    _stub.onError = close;
+    _stub.onError = ((_) => close());
   }
 
   _ApplicationImpl.fromHandle(Application application, core.MojoHandle handle) {
     _application = application;
     _stub = new application_mojom.ApplicationStub.fromHandle(handle, this);
-    _stub.onError = close;
+    _stub.onError = ((_) => close());
   }
 
   set onError(core.ErrorHandler f) {
@@ -101,9 +101,9 @@
     close();
   }
 
-  void _errorHandler() {
+  void _errorHandler(Object e) {
     close().then((_) {
-      if (onError != null) onError();
+      if (onError != null) onError(e);
     });
   }
 
diff --git a/mojo/public/dart/mojo/lib/src/application_connection.dart b/mojo/public/dart/mojo/lib/src/application_connection.dart
index 2e80fc1..6106bf4 100644
--- a/mojo/public/dart/mojo/lib/src/application_connection.dart
+++ b/mojo/public/dart/mojo/lib/src/application_connection.dart
@@ -95,9 +95,9 @@
     _nameToServiceFactory[interfaceName] = factory;
   }
 
-  void _errorHandler() {
+  void _errorHandler(Object e) {
     close().then((_) {
-      if (onError != null) onError();
+      if (onError != null) onError(e);
     });
   }
 
diff --git a/mojo/public/dart/mojo/lib/src/codec.dart b/mojo/public/dart/mojo/lib/src/codec.dart
index 66b7464..b53ec4c 100644
--- a/mojo/public/dart/mojo/lib/src/codec.dart
+++ b/mojo/public/dart/mojo/lib/src/codec.dart
@@ -203,7 +203,7 @@
       encodeHandle(value != null ? value.handle : null, offset, nullable);
 
   void encodeInterface(
-      core.MojoEventStreamListener interface, int offset, bool nullable) {
+      core.MojoEventHandler interface, int offset, bool nullable) {
     if (interface == null) {
       encodeInvalideHandle(offset, nullable);
       // Set the version field to 0.
@@ -214,7 +214,7 @@
       assert(!interface.isBound);
       var pipe = new core.MojoMessagePipe();
       interface.bind(pipe.endpoints[0]);
-      interface.listen();
+      interface.beginHandlingEvents();
       encodeMessagePipeHandle(pipe.endpoints[1], offset, nullable);
       // Set the version to the version in the stub.
       encodeUint32(interface.version, offset + kSerializedHandleSize);
@@ -223,14 +223,13 @@
       if (!interface.isOpen) {
         // Make sure that we are listening so that state for the proxy is
         // cleaned up when the message is sent and the handle is closed.
-        interface.listen();
+        interface.beginHandlingEvents();
       }
       encodeMessagePipeHandle(interface.endpoint, offset, nullable);
       // Set the version to the current version of the proxy.
       encodeUint32(interface.version, offset + kSerializedHandleSize);
     } else {
-      throw new MojoCodecError(
-          'Trying to encode an unknown MojoEventStreamListener');
+      throw new MojoCodecError('Trying to encode an unknown MojoEventHandler');
     }
   }
 
@@ -241,7 +240,7 @@
     }
     var pipe = new core.MojoMessagePipe();
     client.impl.bind(pipe.endpoints[0]);
-    client.impl.listen();
+    client.impl.beginHandlingEvents();
     encodeMessagePipeHandle(pipe.endpoints[1], offset, nullable);
   }
 
diff --git a/mojo/public/dart/mojo/lib/src/drain_data.dart b/mojo/public/dart/mojo/lib/src/drain_data.dart
index b5929e5..dc630bf 100644
--- a/mojo/public/dart/mojo/lib/src/drain_data.dart
+++ b/mojo/public/dart/mojo/lib/src/drain_data.dart
@@ -6,19 +6,18 @@
 
 class DataPipeDrainer {
   MojoDataPipeConsumer _consumer;
-  MojoEventStream _eventStream;
+  MojoEventSubscription _eventSubscription;
   List<ByteData> _dataList;
   int _dataSize;
 
   DataPipeDrainer(this._consumer) {
-    _eventStream = new MojoEventStream(_consumer.handle);
+    _eventSubscription = new MojoEventSubscription(_consumer.handle);
     _dataList = new List();
     _dataSize = 0;
   }
 
-  ByteData _copy(ByteData byteData) =>
-      new ByteData.view(
-          new Uint8List.fromList(byteData.buffer.asUint8List()).buffer);
+  ByteData _copy(ByteData byteData) => new ByteData.view(
+      new Uint8List.fromList(byteData.buffer.asUint8List()).buffer);
 
   MojoResult _doRead() {
     ByteData thisRead = _consumer.beginRead();
@@ -34,8 +33,9 @@
     var data = new ByteData(_dataSize);
     int end = 0;
     for (var chunk in _dataList) {
-      data.buffer.asUint8List().setRange(
-          end, end + chunk.lengthInBytes, chunk.buffer.asUint8List());
+      data.buffer
+          .asUint8List()
+          .setRange(end, end + chunk.lengthInBytes, chunk.buffer.asUint8List());
       end += chunk.lengthInBytes;
     }
     return data;
@@ -43,20 +43,20 @@
 
   Future<ByteData> drain() {
     var completer = new Completer();
-    _eventStream.listen((List<int> event) {
+    _eventSubscription.subscribe((List<int> event) {
       var mojoSignals = new MojoHandleSignals(event[1]);
       if (mojoSignals.isReadable) {
         var result = _doRead();
         if (!result.isOk) {
-          _eventStream.close();
-          _eventStream = null;
+          _eventSubscription.close();
+          _eventSubscription = null;
           completer.complete(_concatData());
         } else {
-          _eventStream.enableReadEvents();
+          _eventSubscription.enableReadEvents();
         }
       } else if (mojoSignals.isPeerClosed) {
-        _eventStream.close();
-        _eventStream = null;
+        _eventSubscription.close();
+        _eventSubscription = null;
         completer.complete(_concatData());
       } else {
         throw 'Unexpected handle event: $mojoSignals';
diff --git a/mojo/public/dart/mojo/lib/src/event_stream.dart b/mojo/public/dart/mojo/lib/src/event_stream.dart
index 0ff091e..ceb0e0e 100644
--- a/mojo/public/dart/mojo/lib/src/event_stream.dart
+++ b/mojo/public/dart/mojo/lib/src/event_stream.dart
@@ -4,33 +4,29 @@
 
 part of core;
 
-class MojoEventStream extends Stream<List<int>> {
+class MojoEventSubscription {
   // The underlying Mojo handle.
   MojoHandle _handle;
 
-  // Providing our own stream controller allows us to take custom actions when
-  // listeners pause/resume/etc. their StreamSubscription.
-  StreamController _controller;
-
   // The send port that we give to the handle watcher to notify us of handle
   // events.
   SendPort _sendPort;
 
   // The receive port on which we listen and receive events from the handle
   // watcher.
-  ReceivePort _receivePort;
+  RawReceivePort _receivePort;
 
   // The signals on this handle that we're interested in.
   MojoHandleSignals _signals;
 
-  // Whether listen has been called.
-  bool _isListening;
+  // Whether subscribe() has been called.
+  bool _isSubscribed;
 
-  MojoEventStream(MojoHandle handle,
+  MojoEventSubscription(MojoHandle handle,
       [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
       : _handle = handle,
         _signals = signals,
-        _isListening = false {
+        _isSubscribed = false {
     MojoResult result = MojoHandle.registerFinalizer(this);
     if (!result.isOk) {
       throw new MojoInternalError(
@@ -40,35 +36,26 @@
 
   Future close({bool immediate: false}) => _close(immediate: immediate);
 
-  StreamSubscription<List<int>> listen(void onData(List event),
-      {Function onError, void onDone(), bool cancelOnError}) {
-    if (_isListening) {
-      throw new MojoApiError("Listen has already been called: $_handle.");
+  void subscribe(void handler(List<int> event)) {
+    if (_isSubscribed) {
+      throw new MojoApiError("subscribe() has already been called: $this.");
     }
-    _receivePort = new ReceivePort();
+    _receivePort = new RawReceivePort(handler);
     _sendPort = _receivePort.sendPort;
-    _controller = new StreamController(
-        sync: true,
-        onPause: _onPauseStateChange,
-        onResume: _onPauseStateChange);
-    _controller.addStream(_receivePort).whenComplete(_controller.close);
 
     if (_signals != MojoHandleSignals.NONE) {
-      var res = new MojoResult(
-          MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
-      if (!res.isOk) {
+      int res = MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value);
+      if (res != MojoResult.kOk) {
         throw new MojoInternalError("MojoHandleWatcher add failed: $res");
       }
     }
 
-    _isListening = true;
-    return _controller.stream.listen(onData,
-        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+    _isSubscribed = true;
   }
 
   bool enableSignals(MojoHandleSignals signals) {
     _signals = signals;
-    if (_isListening) {
+    if (_isSubscribed) {
       return MojoHandleWatcher.add(_handle.h, _sendPort, signals.value) ==
           MojoResult.kOk;
     }
@@ -82,7 +69,7 @@
 
   Future _close({bool immediate: false, bool local: false}) {
     if (_handle != null) {
-      if (_isListening && !local) {
+      if (_isSubscribed && !local) {
         return _handleWatcherClose(immediate: immediate).then((result) {
           // If the handle watcher is gone, then close the handle ourselves.
           if (!result.isOk) {
@@ -117,59 +104,44 @@
     }
   }
 
-  void _onPauseStateChange() {
-    if (_controller.isPaused) {
-      var res = new MojoResult(MojoHandleWatcher.remove(_handle.h));
-      if (!res.isOk) {
-        throw new MojoInternalError("MojoHandleWatcher add failed: $res");
-      }
-    } else {
-      var res = new MojoResult(
-          MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
-      if (!res.isOk) {
-        throw new MojoInternalError("MojoHandleWatcher add failed: $res");
-      }
-    }
-  }
-
   bool get readyRead => _handle.readyRead;
   bool get readyWrite => _handle.readyWrite;
+  MojoHandleSignals get signals => _signals;
 
   String toString() => "$_handle";
 }
 
-typedef void ErrorHandler();
+typedef void ErrorHandler(Object e);
 
-class MojoEventStreamListener {
-  StreamSubscription subscription;
+class MojoEventHandler {
   ErrorHandler onError;
 
   MojoMessagePipeEndpoint _endpoint;
-  MojoEventStream _eventStream;
+  MojoEventSubscription _eventSubscription;
   bool _isOpen = false;
   bool _isInHandler = false;
   bool _isPeerClosed = false;
 
-  MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint)
+  MojoEventHandler.fromEndpoint(MojoMessagePipeEndpoint endpoint)
       : _endpoint = endpoint,
-        _eventStream = new MojoEventStream(endpoint.handle) {
-    listen();
+        _eventSubscription = new MojoEventSubscription(endpoint.handle) {
+    beginHandlingEvents();
   }
 
-  MojoEventStreamListener.fromHandle(MojoHandle handle) {
+  MojoEventHandler.fromHandle(MojoHandle handle) {
     _endpoint = new MojoMessagePipeEndpoint(handle);
-    _eventStream = new MojoEventStream(handle);
-    listen();
+    _eventSubscription = new MojoEventSubscription(handle);
+    beginHandlingEvents();
   }
 
-  MojoEventStreamListener.unbound();
+  MojoEventHandler.unbound();
 
   void bind(MojoMessagePipeEndpoint endpoint) {
     if (isBound) {
       throw new MojoApiError("MojoEventStreamListener is already bound.");
     }
     _endpoint = endpoint;
-    _eventStream = new MojoEventStream(endpoint.handle);
+    _eventSubscription = new MojoEventSubscription(endpoint.handle);
     _isOpen = false;
     _isInHandler = false;
     _isPeerClosed = false;
@@ -180,67 +152,74 @@
       throw new MojoApiError("MojoEventStreamListener is already bound.");
     }
     _endpoint = new MojoMessagePipeEndpoint(handle);
-    _eventStream = new MojoEventStream(handle);
+    _eventSubscription = new MojoEventSubscription(handle);
     _isOpen = false;
     _isInHandler = false;
     _isPeerClosed = false;
   }
 
-  StreamSubscription<List<int>> listen() {
+  void beginHandlingEvents() {
     if (!isBound) {
-      throw new MojoApiError("MojoEventStreamListener is unbound.");
-    }
-    if (subscription != null) {
-      throw new MojoApiError("Listen has already been called.");
+      throw new MojoApiError("MojoEventHandler is unbound.");
     }
     _isOpen = true;
-    subscription = _eventStream.listen((List<int> event) {
-      if (!_isOpen) {
-        // The actual close of the underlying stream happens asynchronously
-        // after the call to close. However, we start to ignore incoming events
-        // immediately.
-        return;
-      }
-      var signalsWatched = new MojoHandleSignals(event[0]);
-      var signalsReceived = new MojoHandleSignals(event[1]);
-      _isInHandler = true;
-      if (signalsReceived.isReadable) {
-        assert(_eventStream.readyRead);
-        handleRead();
-      }
-      if (signalsReceived.isWritable) {
-        assert(_eventStream.readyWrite);
-        handleWrite();
-      }
-      _isPeerClosed = signalsReceived.isPeerClosed ||
-          !_eventStream.enableSignals(signalsWatched);
-      _isInHandler = false;
-      if (_isPeerClosed) {
-        close().then((_) {
+    _eventSubscription.subscribe((List<int> event) {
+      try {
+        _handleEvent(event);
+      } catch (e) {
+        close(immediate: true).then((_) {
           if (onError != null) {
-            onError();
+            onError(e);
           }
         });
       }
-    }, onDone: close);
-    return subscription;
+    });
   }
 
   Future close({bool immediate: false}) {
     var result;
     _isOpen = false;
     _endpoint = null;
-    subscription = null;
-    if (_eventStream != null) {
-      result = _eventStream
+    if (_eventSubscription != null) {
+      result = _eventSubscription
           ._close(immediate: immediate, local: _isPeerClosed)
           .then((_) {
-        _eventStream = null;
+        _eventSubscription = null;
       });
     }
     return result != null ? result : new Future.value(null);
   }
 
+  void _handleEvent(List<int> event) {
+    if (!_isOpen) {
+      // The actual close of the underlying stream happens asynchronously
+      // after the call to close. However, we start to ignore incoming events
+      // immediately.
+      return;
+    }
+    var signalsWatched = new MojoHandleSignals(event[0]);
+    var signalsReceived = new MojoHandleSignals(event[1]);
+    _isInHandler = true;
+    if (signalsReceived.isReadable) {
+      assert(_eventSubscription.readyRead);
+      handleRead();
+    }
+    if (signalsReceived.isWritable) {
+      assert(_eventSubscription.readyWrite);
+      handleWrite();
+    }
+    _isPeerClosed = signalsReceived.isPeerClosed ||
+        !_eventSubscription.enableSignals(signalsWatched);
+    _isInHandler = false;
+    if (_isPeerClosed) {
+      close().then((_) {
+        if (onError != null) {
+          onError(null);
+        }
+      });
+    }
+  }
+
   void handleRead() {}
   void handleWrite() {}
 
@@ -250,6 +229,6 @@
   bool get isBound => _endpoint != null;
   bool get isPeerClosed => _isPeerClosed;
 
-  String toString() => "MojoEventStreamListener("
+  String toString() => "MojoEventHandler("
       "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
 }
diff --git a/mojo/public/dart/mojo/lib/src/fill_data.dart b/mojo/public/dart/mojo/lib/src/fill_data.dart
index 848efc2..c139a7c 100644
--- a/mojo/public/dart/mojo/lib/src/fill_data.dart
+++ b/mojo/public/dart/mojo/lib/src/fill_data.dart
@@ -7,20 +7,17 @@
 class DataPipeFiller {
   final MojoDataPipeProducer _producer;
   final ByteData _data;
-  MojoEventStream _eventStream;
+  MojoEventSubscription _eventSubscription;
   int _dataPosition;
 
   DataPipeFiller(this._producer, this._data) {
-    _eventStream = new MojoEventStream(_producer.handle);
+    _eventSubscription = new MojoEventSubscription(_producer.handle);
     _dataPosition = 0;
   }
 
   MojoResult _doWrite() {
     ByteData view = new ByteData.view(
-      _data.buffer,
-      _dataPosition,
-      _data.lengthInBytes - _dataPosition
-    );
+        _data.buffer, _dataPosition, _data.lengthInBytes - _dataPosition);
     int written = _producer.write(view);
     if (!_producer.status.isOk) {
       throw 'Data pipe beginWrite failed: ${_producer.status}';
@@ -30,18 +27,18 @@
   }
 
   void fill() {
-    _eventStream.enableWriteEvents();
-    _eventStream.listen((List<int> event) {
+    _eventSubscription.enableWriteEvents();
+    _eventSubscription.subscribe((List<int> event) {
       var mojoSignals = new MojoHandleSignals(event[1]);
       if (mojoSignals.isWritable) {
         MojoResult result = _doWrite();
         if (_dataPosition >= _data.lengthInBytes || !result.isOk) {
-          _eventStream.close();
-          _eventStream = null;
+          _eventSubscription.close();
+          _eventSubscription = null;
         }
       } else if (mojoSignals.isPeerClosed) {
-        _eventStream.close();
-        _eventStream = null;
+        _eventSubscription.close();
+        _eventSubscription = null;
       } else {
         throw 'Unexpected handle event: $mojoSignals';
       }
diff --git a/mojo/public/dart/mojo/lib/src/handle.dart b/mojo/public/dart/mojo/lib/src/handle.dart
index 583877f..24a9322 100644
--- a/mojo/public/dart/mojo/lib/src/handle.dart
+++ b/mojo/public/dart/mojo/lib/src/handle.dart
@@ -4,7 +4,6 @@
 
 part of core;
 
-
 class MojoHandle {
   static const int INVALID = 0;
   static const int DEADLINE_INDEFINITE = -1;
@@ -81,9 +80,9 @@
     return new MojoWaitManyResult(new MojoResult(result[0]), result[1], states);
   }
 
-  static MojoResult registerFinalizer(MojoEventStream eventStream) {
+  static MojoResult registerFinalizer(MojoEventSubscription eventSubscription) {
     return new MojoResult(MojoHandleNatives.registerFinalizer(
-        eventStream, eventStream._handle.h));
+        eventSubscription, eventSubscription._handle.h));
   }
 
   static bool reportLeakedHandles() => MojoHandleNatives.reportOpenHandles();
diff --git a/mojo/public/dart/mojo/lib/src/proxy.dart b/mojo/public/dart/mojo/lib/src/proxy.dart
index 2e5b93e..72ef13d 100644
--- a/mojo/public/dart/mojo/lib/src/proxy.dart
+++ b/mojo/public/dart/mojo/lib/src/proxy.dart
@@ -10,8 +10,8 @@
   String toString() => "ProxyError: $message";
 }
 
-abstract class Proxy extends core.MojoEventStreamListener {
-  final Map<int, Completer> _completerMap = {};
+abstract class Proxy extends core.MojoEventHandler {
+  Map<int, Completer> _completerMap = {};
   Completer _errorCompleter = new Completer();
   Set<Completer> _errorCompleters;
   int _nextId = 0;
@@ -80,10 +80,9 @@
       return;
     }
     if (!isOpen) {
-      listen();
+      beginHandlingEvents();
     }
     var header = new MessageHeader(name);
-
     var serviceMessage = message.serializeWithHeader(header);
     endpoint.write(serviceMessage.buffer, serviceMessage.buffer.lengthInBytes,
         serviceMessage.handles);
@@ -99,7 +98,7 @@
       return completer.future;
     }
     if (!isOpen) {
-      listen();
+      beginHandlingEvents();
     }
     if (id == -1) {
       id = _nextId++;
diff --git a/mojo/public/dart/mojo/lib/src/stub.dart b/mojo/public/dart/mojo/lib/src/stub.dart
index a8c9ee8..dce1038 100644
--- a/mojo/public/dart/mojo/lib/src/stub.dart
+++ b/mojo/public/dart/mojo/lib/src/stub.dart
@@ -4,7 +4,7 @@
 
 part of bindings;
 
-abstract class Stub extends core.MojoEventStreamListener {
+abstract class Stub extends core.MojoEventHandler {
   int _outstandingResponseFutures = 0;
   bool _isClosing = false;
   Completer _closeCompleter;
diff --git a/services/dart/test/echo/lib/main.dart b/services/dart/test/echo/lib/main.dart
index 4b5cce0..92f2771 100644
--- a/services/dart/test/echo/lib/main.dart
+++ b/services/dart/test/echo/lib/main.dart
@@ -49,7 +49,7 @@
   MojoHandle appHandle = new MojoHandle(args[0]);
   String url = args[1];
   new EchoApplication.fromHandle(appHandle)
-    ..onError = (() {
+    ..onError = ((_) {
       MojoHandle.reportLeakedHandles();
     });
 }
diff --git a/services/dart/test/pingpong/lib/main.dart b/services/dart/test/pingpong/lib/main.dart
index f14f9cf..baa8c46 100644
--- a/services/dart/test/pingpong/lib/main.dart
+++ b/services/dart/test/pingpong/lib/main.dart
@@ -120,7 +120,7 @@
   MojoHandle appHandle = new MojoHandle(args[0]);
   String url = args[1];
   new PingPongApplication.fromHandle(appHandle)
-    ..onError = (() {
+    ..onError = ((_) {
       MojoHandle.reportLeakedHandles();
     });
 }
diff --git a/services/dart/test/pingpong_target/lib/main.dart b/services/dart/test/pingpong_target/lib/main.dart
index 0d83133..54d3a53 100644
--- a/services/dart/test/pingpong_target/lib/main.dart
+++ b/services/dart/test/pingpong_target/lib/main.dart
@@ -28,10 +28,12 @@
 
   // These methods are unimplemented; they merely throw on invocation.
   Future<PingPongServicePingTargetUrlResponseParams> pingTargetUrl(
-          String url, int count, [Function responseFactory]) =>
+          String url, int count,
+          [Function responseFactory]) =>
       throw "Unimplemented";
   Future<PingPongServicePingTargetServiceResponseParams> pingTargetService(
-          Object service, int count, [Function responseFactory]) =>
+          Object service, int count,
+          [Function responseFactory]) =>
       throw "Unimplemented";
   void getPingPongService(Object service) => throw "Unimplemented";
 
@@ -53,7 +55,7 @@
     connection.provideService(PingPongServiceName,
         (endpoint) => new PingPongServiceImpl(this, endpoint));
     // Close the application when the first connection goes down.
-    connection.onError = closeApplication;
+    connection.onError = ((_) => closeApplication());
   }
 
   Future closeApplication() async {