Understanding Dart Stream Operators: A Comprehensive Guide with Code Examples
This article provides an in‑depth tutorial on Dart's Stream API, explaining the purpose and usage of various operators such as map, distinct, where, take, expand, asyncMap, transform, and terminal methods like reduce, fold, firstWhere, and includes complete code examples for each.
This article is a detailed tutorial on Dart's Stream API, covering both basic and advanced operators used in reactive programming. It begins with an introduction to streams as continuous event sequences and the RxDart implementation.
0. Preface – Test Description
Streamis a continuously listening event sequence based on the publish‑subscribe pattern, often referred to as reactive programming. Many languages provide Rx<language> implementations, including RxDart which extends Stream.
The test stream provides seven int elements, emitting one every 100 ms via StreamProvider#createStream.
class StreamProvider{
Stream<int> createStream() async*{
List<int> res = [1,9,9,4,3,2,8];
for(int i = 0 ; i < res.length ; i++){
yield res[i];
await Future.delayed(const Duration(milliseconds: 100));
}
}
}Running the following code prints a number every 100 ms:
void main(){
Stream<int> intStream = StreamProvider().createStream();
intStream.listen((e) {
print(e);
});
}1. Simple Stream Transformations
The article introduces eight basic operators that return a new Stream of the same generic type:
1. map – Mapping Transformation
Stream<S> map<S>(S convert(T event)) mapconverts each element of type T to type S. Example converting int to String using a lookup map:
Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.map<String>((int e) => numMap[e]!);
newStream.listen((e) {
print(e);
});
}2. distinct – Filtering Duplicates
Stream<T> distinct([bool equals(T previous, T next)?])Emits only elements that differ from the previous one. Example:
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.distinct((int a , int b) => a == b);
newStream.listen((e) {
print(e);
});
}3. where – Conditional Filtering
Stream<T> where(bool test(T event))Keeps elements that satisfy the predicate. Example keeping values greater than 5:
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.where((int e) => e > 5);
newStream.listen((e) {
print(e);
});
}4. take – Taking a Fixed Number of Elements
Stream<T> take(int count)Emits only the first count elements.
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.take(3);
newStream.listen((e) {
print(e);
});
}5. takeWhile – Conditional Take
Stream<T> takeWhile(bool test(T element))Continues emitting while the predicate returns true.
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.takeWhile((int e) => e <= 4 || e == 9);
newStream.listen((e) {
print(e);
});
}6. skip – Skipping Elements
Stream<T> skip(int count)Ignores the first count elements.
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.skip(3);
newStream.listen((e) {
print(e);
});
}7. skipWhile – Conditional Skip
Stream<T> skipWhile(bool test(T element))Skips elements while the predicate is true, then emits the rest.
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.skipWhile((int e) => e < 4);
newStream.listen((e) {
print(e);
});
}8. cast / castFrom – Type Casting
Stream<R> cast<R>() => Stream.castFrom<T, R>(this);Creates a stream with a different generic type, performing a runtime cast.
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<num> newStream = intStream.cast<num>();;
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print(e);
});
}2. More Complex Stream Transformations
1. expand – Expanding Elements
Stream<S> expand<S>(Iterable<S> convert(T element))Each input element can produce multiple output elements.
Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};
Map<int,String> numMap2 = {0:"0",1:"Ⅰ",2:"Ⅱ",3:"Ⅲ",4:"Ⅳ",5:"Ⅴ",6:"Ⅵ",7:"Ⅶ",8:"Ⅷ",9:"Ⅸ",10:"Ⅹ"};
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.expand<String>((e) => [numMap[e]!,numMap2[e]!]);
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print(e);
});
}2. asyncMap – Asynchronous Mapping
Stream<E> asyncMap<E>(FutureOr<E> convert(T event))Applies an asynchronous conversion to each element.
Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.asyncMap(delayMap);
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print(e);
});
}
Future<String> delayMap(int e) async{
await Future.delayed(const Duration(milliseconds: 100));
return numMap[e]!;
}3. asyncExpand – Asynchronous Expansion
Stream<E> asyncExpand<E>(Stream<E>? convert(T event))Each element can be turned into a new asynchronous stream.
Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};
Map<int,String> numMap2 = {0:"0",1:"Ⅰ",2:"Ⅱ",3:"Ⅲ",4:"Ⅳ",5:"Ⅴ",6:"Ⅵ",7:"Ⅶ",8:"Ⅷ",9:"Ⅸ",10:"Ⅹ"};
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.asyncExpand(_streamExpand);
newStream.listen((e) {
print(e);
});
}
Stream<String>? _streamExpand(int e) async*{
await Future.delayed(const Duration(milliseconds: 50));
yield numMap[e]!;
await Future.delayed(const Duration(milliseconds: 50));
yield numMap2[e]!;
}4. transform – StreamTransformer
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer)Applies a custom StreamTransformer. Example using the built‑in AsciiEncoder to convert integers to ASCII byte lists:
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<List<int>> newStream = intStream
.map((int e) => e.toString())
.transform(const AsciiEncoder());
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print("$e === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}3. Single‑Element Operators (Future‑Returning)
1. reduce – Iterative Combination
Future<T> reduce(T combine(T previous, T element))Combines elements sequentially, returning a Future of the final result.
2. fold – Combination with Initial Value
Future<S> fold<S>(S initialValue, S combine(S previous, T element))Similar to reduce but starts with an explicit initial value, allowing type conversion.
3. drain – Discard All Elements
Future<E> drain<E>([E? futureValue])Ignores all stream elements and completes when the stream ends.
4. every – All‑Match Test
Future<bool> every(bool test(T element))Returns true if every element satisfies the predicate.
5. any – Any‑Match Test
Future<bool> any(bool test(T element))Returns true as soon as an element satisfies the predicate.
6. singleWhere – Exactly One Match
Future<T> singleWhere(bool test(T element), {T orElse()?})Completes with the sole matching element or throws if none or multiple match.
7. firstWhere – First Match
Future<T> firstWhere(bool test(T element), {T orElse()?})Completes with the first element that satisfies the predicate.
8. lastWhere – Last Match
Future<T> lastWhere(bool test(T element), {T orElse()?})Waits for the stream to finish and returns the last matching element.
9. elementAt – Indexed Access
Future<T> elementAt(int index)Completes with the element at the given index.
4. Additional Operators
The article also covers forEach (asynchronous iteration), contains (membership test), and join (concatenating elements into a string), each returning a Future that resolves when the stream completes.
void main(){
Stream<int> intStream = StreamProvider().createStream();
Future<dynamic> result = intStream.forEach(process);
int start = DateTime.now().millisecondsSinceEpoch;
result.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
void process(int e){
print(e);
} void main(){
Stream<int> intStream = StreamProvider().createStream();
Future<bool> result = intStream.contains(4);
int start = DateTime.now().millisecondsSinceEpoch;
result.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
} void main(){
Stream<int> intStream = StreamProvider().createStream();
Future<String> result = intStream.join(",");
int start = DateTime.now().millisecondsSinceEpoch;
result.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}Overall, the tutorial demonstrates how Dart's built‑in stream operators and their asynchronous variants can be combined to perform complex data transformations, filtering, and aggregation, serving as a solid foundation for reactive programming in Flutter and other Dart‑based applications.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
