Deep Dive into Dart Stream Callback Mechanism and Implementation
This article thoroughly explores the internal workings of Dart's Stream API, detailing how callbacks are triggered through StreamController, the role of microtasks, the implementation of synchronous and broadcast stream controllers, and provides concrete code examples to illustrate each step of the process.
The article begins by revisiting the basic concepts of Stream and its listen method, emphasizing that callbacks are invoked via the onData parameter and are ultimately triggered by the Dart VM's internal _RawReceivePortImpl#_handleMessage during the microtask loop.
It then presents a minimal example using StreamController to add elements and listen to the stream, showing how debugging reveals the callback origin and how the _handleMessage method schedules the callback through the microtask queue.
void main(){
StreamController<int> controller = StreamController();
// Listen to events
controller.stream.listen((int event) {
print(event); // <--- breakpoint
});
// Add elements
controller.add(1);
controller.add(2);
}The article maps the flow of the onData callback through several internal classes: _StreamImpl → _ControllerStream → _createSubscription → _controller#_subscribe → _ControllerSubscription → _BufferingStreamSubscription , finally registering the callback in the current zone.
It explains the implementation of StreamController and its concrete subclass _AsyncStreamController , showing how the abstract _EventDispatch interface is mixed in to provide _sendData , _sendError , and _sendDone methods that enqueue events via _addPending and schedule them with scheduleMicrotask .
abstract class _EventDispatch
{
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}For synchronous streams, the article describes the _SyncStreamController path, where events are dispatched immediately without microtask scheduling, allowing the listener to react before the surrounding code continues.
Broadcast streams are covered next, explaining why a regular StreamController cannot have multiple listeners and how StreamController.broadcast() creates an _AsyncBroadcastStreamController that maintains a doubly‑linked list of _BroadcastSubscription objects, delivering each event to all listeners.
void main(){
StreamController
controller = StreamController.broadcast();
controller.stream.listen((int event) { print('listener 1: $event'); });
controller.stream.listen((int event) { print('listener 2: $event'); });
controller.add(1);
print('main continues');
controller.add(2);
}Finally, the article touches on related constructs such as Future.sync , Stream.periodic , and Stream.fromFutures , summarizing how they fit into the same underlying event‑dispatch architecture and hinting at future topics like isolates for heavy computation.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.