John McCutchan | 9f60559 | 2015-09-17 14:09:47 -0700 | [diff] [blame] | 1 | // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 | // for details. All rights reserved. Use of this source code is governed by a |
| 3 | // BSD-style license that can be found in the LICENSE file. |
| 4 | |
| 5 | library barback.test.stream_pool_test; |
| 6 | |
| 7 | import 'dart:async'; |
| 8 | |
| 9 | import 'package:barback/src/utils.dart'; |
| 10 | import 'package:barback/src/utils/stream_pool.dart'; |
| 11 | import 'package:scheduled_test/scheduled_test.dart'; |
| 12 | |
| 13 | import 'utils.dart'; |
| 14 | |
| 15 | main() { |
| 16 | initConfig(); |
| 17 | |
| 18 | group("buffered", () { |
| 19 | test("buffers events from multiple inputs", () { |
| 20 | var pool = new StreamPool<String>(); |
| 21 | |
| 22 | var controller1 = new StreamController<String>(); |
| 23 | pool.add(controller1.stream); |
| 24 | controller1.add("first"); |
| 25 | |
| 26 | var controller2 = new StreamController<String>(); |
| 27 | pool.add(controller2.stream); |
| 28 | controller2.add("second"); |
| 29 | |
| 30 | // Call [toList] asynchronously to be sure that the events have been |
| 31 | // buffered beforehand and aren't just being received unbuffered. |
| 32 | expect(newFuture(() => pool.stream.toList()), |
| 33 | completion(equals(["first", "second"]))); |
| 34 | |
| 35 | pumpEventQueue().then((_) => pool.close()); |
| 36 | }); |
| 37 | |
| 38 | test("buffers errors from multiple inputs", () { |
| 39 | var pool = new StreamPool<String>(); |
| 40 | |
| 41 | var controller1 = new StreamController<String>(); |
| 42 | pool.add(controller1.stream); |
| 43 | controller1.add("first"); |
| 44 | |
| 45 | var controller2 = new StreamController<String>(); |
| 46 | pool.add(controller2.stream); |
| 47 | controller2.add("second"); |
| 48 | controller1.addError("third"); |
| 49 | controller2.addError("fourth"); |
| 50 | controller1.add("fifth"); |
| 51 | |
| 52 | expect(newFuture(() { |
| 53 | return pool.stream.transform(new StreamTransformer.fromHandlers( |
| 54 | handleData: (data, sink) => sink.add(["data", data]), |
| 55 | handleError: (error, stackTrace, sink) { |
| 56 | sink.add(["error", error]); |
| 57 | })).toList(); |
| 58 | }), completion(equals([ |
| 59 | ["data", "first"], |
| 60 | ["data", "second"], |
| 61 | ["error", "third"], |
| 62 | ["error", "fourth"], |
| 63 | ["data", "fifth"] |
| 64 | ]))); |
| 65 | |
| 66 | pumpEventQueue().then((_) => pool.close()); |
| 67 | }); |
| 68 | |
| 69 | test("buffers inputs from a broadcast stream", () { |
| 70 | var pool = new StreamPool<String>(); |
| 71 | var controller = new StreamController<String>.broadcast(); |
| 72 | pool.add(controller.stream); |
| 73 | controller.add("first"); |
| 74 | controller.add("second"); |
| 75 | |
| 76 | // Call [toList] asynchronously to be sure that the events have been |
| 77 | // buffered beforehand and aren't just being received unbuffered. |
| 78 | expect(newFuture(() => pool.stream.toList()), |
| 79 | completion(equals(["first", "second"]))); |
| 80 | |
| 81 | pumpEventQueue().then((_) => pool.close()); |
| 82 | }); |
| 83 | }); |
| 84 | |
| 85 | group("broadcast", () { |
| 86 | test("doesn't buffer inputs", () { |
| 87 | var pool = new StreamPool<String>.broadcast(); |
| 88 | |
| 89 | var controller1 = new StreamController<String>.broadcast(); |
| 90 | pool.add(controller1.stream); |
| 91 | controller1.add("first"); |
| 92 | |
| 93 | var controller2 = new StreamController<String>.broadcast(); |
| 94 | pool.add(controller2.stream); |
| 95 | controller2.add("second"); |
| 96 | |
| 97 | // Call [toList] asynchronously to be sure that the events have been |
| 98 | // buffered beforehand and aren't just being received unbuffered. |
| 99 | expect(newFuture(() => pool.stream.toList()), completion(isEmpty)); |
| 100 | |
| 101 | pumpEventQueue().then((_) => pool.close()); |
| 102 | }); |
| 103 | |
| 104 | test("doesn't buffer errors", () { |
| 105 | var pool = new StreamPool<String>.broadcast(); |
| 106 | |
| 107 | var controller1 = new StreamController<String>.broadcast(); |
| 108 | pool.add(controller1.stream); |
| 109 | controller1.addError("first"); |
| 110 | |
| 111 | var controller2 = new StreamController<String>.broadcast(); |
| 112 | pool.add(controller2.stream); |
| 113 | controller2.addError("second"); |
| 114 | |
| 115 | expect(newFuture(() { |
| 116 | return pool.stream.transform(new StreamTransformer.fromHandlers( |
| 117 | handleData: (data, sink) => sink.add(data), |
| 118 | handleError: (error, stackTrace, sink) { sink.add(error); })) |
| 119 | .toList(); |
| 120 | }), completion(isEmpty)); |
| 121 | |
| 122 | pumpEventQueue().then((_) => pool.close()); |
| 123 | }); |
| 124 | |
| 125 | test("doesn't buffer inputs from a buffered stream", () { |
| 126 | var pool = new StreamPool<String>.broadcast(); |
| 127 | var controller = new StreamController<String>(); |
| 128 | pool.add(controller.stream); |
| 129 | controller.add("first"); |
| 130 | controller.add("second"); |
| 131 | |
| 132 | expect(pumpEventQueue().then((_) => pool.stream.toList()), |
| 133 | completion(isEmpty)); |
| 134 | |
| 135 | pumpEventQueue().then((_) => pool.close()); |
| 136 | }); |
| 137 | }); |
| 138 | |
| 139 | for (var type in ["buffered", "broadcast"]) { |
| 140 | group(type, () { |
| 141 | var pool; |
| 142 | var bufferedController; |
| 143 | var bufferedStream; |
| 144 | var bufferedSyncController; |
| 145 | var broadcastController; |
| 146 | var broadcastStream; |
| 147 | var broadcastSyncController; |
| 148 | |
| 149 | setUp(() { |
| 150 | if (type == "buffered") { |
| 151 | pool = new StreamPool<String>(); |
| 152 | } else { |
| 153 | pool = new StreamPool<String>.broadcast(); |
| 154 | } |
| 155 | |
| 156 | bufferedController = new StreamController<String>(); |
| 157 | pool.add(bufferedController.stream); |
| 158 | |
| 159 | bufferedSyncController = new StreamController<String>(sync: true); |
| 160 | pool.add(bufferedSyncController.stream); |
| 161 | |
| 162 | broadcastController = new StreamController<String>.broadcast(); |
| 163 | pool.add(broadcastController.stream); |
| 164 | |
| 165 | broadcastSyncController = |
| 166 | new StreamController<String>.broadcast(sync: true); |
| 167 | pool.add(broadcastSyncController.stream); |
| 168 | }); |
| 169 | |
| 170 | test("emits events to a listener", () { |
| 171 | expect(pool.stream.toList(), completion(equals(["first", "second"]))); |
| 172 | |
| 173 | bufferedController.add("first"); |
| 174 | broadcastController.add("second"); |
| 175 | pumpEventQueue().then((_) => pool.close()); |
| 176 | }); |
| 177 | |
| 178 | test("emits sync events synchronously", () { |
| 179 | var events = []; |
| 180 | pool.stream.listen(events.add); |
| 181 | |
| 182 | bufferedSyncController.add("first"); |
| 183 | expect(events, equals(["first"])); |
| 184 | |
| 185 | broadcastSyncController.add("second"); |
| 186 | expect(events, equals(["first", "second"])); |
| 187 | }); |
| 188 | |
| 189 | test("emits async events asynchronously", () { |
| 190 | var events = []; |
| 191 | pool.stream.listen(events.add); |
| 192 | |
| 193 | bufferedController.add("first"); |
| 194 | broadcastController.add("second"); |
| 195 | expect(events, isEmpty); |
| 196 | |
| 197 | expect(pumpEventQueue().then((_) => events), |
| 198 | completion(equals(["first", "second"]))); |
| 199 | }); |
| 200 | |
| 201 | test("doesn't emit events from removed streams", () { |
| 202 | expect(pool.stream.toList(), completion(equals(["first", "third"]))); |
| 203 | |
| 204 | bufferedController.add("first"); |
| 205 | expect(pumpEventQueue().then((_) { |
| 206 | pool.remove(bufferedController.stream); |
| 207 | bufferedController.add("second"); |
| 208 | }).then((_) { |
| 209 | broadcastController.add("third"); |
| 210 | return pumpEventQueue(); |
| 211 | }).then((_) { |
| 212 | pool.remove(broadcastController.stream); |
| 213 | broadcastController.add("fourth"); |
| 214 | pool.close(); |
| 215 | }), completes); |
| 216 | }); |
| 217 | }); |
| 218 | } |
| 219 | } |