Mobile Development 25 min read

Understanding Dart Stream Operators: A Comprehensive Guide with Code Examples

This article provides an in‑depth tutorial on Dart's Stream API, explaining the purpose and usage of various operators such as map, distinct, where, take, expand, asyncMap, transform, and terminal methods like reduce, fold, firstWhere, and includes complete code examples for each.

Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Understanding Dart Stream Operators: A Comprehensive Guide with Code Examples

This article is a detailed tutorial on Dart's Stream API, covering both basic and advanced operators used in reactive programming. It begins with an introduction to streams as continuous event sequences and the RxDart implementation.

0. Preface – Test Description

Stream

is a continuously listening event sequence based on the publish‑subscribe pattern, often referred to as reactive programming. Many languages provide Rx<language> implementations, including RxDart which extends Stream.

The test stream provides seven int elements, emitting one every 100 ms via StreamProvider#createStream.

class StreamProvider{
  Stream<int> createStream() async*{
    List<int> res = [1,9,9,4,3,2,8];
    for(int i = 0 ; i < res.length ; i++){
      yield res[i];
      await Future.delayed(const Duration(milliseconds: 100));
    }
  }
}

Running the following code prints a number every 100 ms:

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  intStream.listen((e) {
    print(e);
  });
}

1. Simple Stream Transformations

The article introduces eight basic operators that return a new Stream of the same generic type:

1. map – Mapping Transformation

Stream<S> map<S>(S convert(T event))
map

converts each element of type T to type S. Example converting int to String using a lookup map:

Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<String> newStream = intStream.map<String>((int e) => numMap[e]!);
  newStream.listen((e) {
    print(e);
  });
}

2. distinct – Filtering Duplicates

Stream<T> distinct([bool equals(T previous, T next)?])

Emits only elements that differ from the previous one. Example:

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<int> newStream = intStream.distinct((int a , int b) => a == b);
  newStream.listen((e) {
    print(e);
  });
}

3. where – Conditional Filtering

Stream<T> where(bool test(T event))

Keeps elements that satisfy the predicate. Example keeping values greater than 5:

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<int> newStream = intStream.where((int e) => e > 5);
  newStream.listen((e) {
    print(e);
  });
}

4. take – Taking a Fixed Number of Elements

Stream<T> take(int count)

Emits only the first count elements.

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<int> newStream = intStream.take(3);
  newStream.listen((e) {
    print(e);
  });
}

5. takeWhile – Conditional Take

Stream<T> takeWhile(bool test(T element))

Continues emitting while the predicate returns true.

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<int> newStream = intStream.takeWhile((int e) => e <= 4 || e == 9);
  newStream.listen((e) {
    print(e);
  });
}

6. skip – Skipping Elements

Stream<T> skip(int count)

Ignores the first count elements.

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<int> newStream = intStream.skip(3);
  newStream.listen((e) {
    print(e);
  });
}

7. skipWhile – Conditional Skip

Stream<T> skipWhile(bool test(T element))

Skips elements while the predicate is true, then emits the rest.

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<int> newStream = intStream.skipWhile((int e) => e < 4);
  newStream.listen((e) {
    print(e);
  });
}

8. cast / castFrom – Type Casting

Stream<R> cast<R>() => Stream.castFrom<T, R>(this);

Creates a stream with a different generic type, performing a runtime cast.

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<num> newStream = intStream.cast<num>();;
  int start = DateTime.now().millisecondsSinceEpoch;
  newStream.listen((e) {
    print(e);
  });
}

2. More Complex Stream Transformations

1. expand – Expanding Elements

Stream<S> expand<S>(Iterable<S> convert(T element))

Each input element can produce multiple output elements.

Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};
Map<int,String> numMap2 = {0:"0",1:"Ⅰ",2:"Ⅱ",3:"Ⅲ",4:"Ⅳ",5:"Ⅴ",6:"Ⅵ",7:"Ⅶ",8:"Ⅷ",9:"Ⅸ",10:"Ⅹ"};

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<String> newStream = intStream.expand<String>((e) => [numMap[e]!,numMap2[e]!]);
  int start = DateTime.now().millisecondsSinceEpoch;
  newStream.listen((e) {
    print(e);
  });
}

2. asyncMap – Asynchronous Mapping

Stream<E> asyncMap<E>(FutureOr<E> convert(T event))

Applies an asynchronous conversion to each element.

Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<String> newStream = intStream.asyncMap(delayMap);
  int start = DateTime.now().millisecondsSinceEpoch;
  newStream.listen((e) {
    print(e);
  });
}

Future<String> delayMap(int e) async{
  await Future.delayed(const Duration(milliseconds: 100));
  return numMap[e]!;
}

3. asyncExpand – Asynchronous Expansion

Stream<E> asyncExpand<E>(Stream<E>? convert(T event))

Each element can be turned into a new asynchronous stream.

Map<int,String> numMap = {0:"零",1:"壹",2:"贰",3:"叁",4:"肆",5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾"};
Map<int,String> numMap2 = {0:"0",1:"Ⅰ",2:"Ⅱ",3:"Ⅲ",4:"Ⅳ",5:"Ⅴ",6:"Ⅵ",7:"Ⅶ",8:"Ⅷ",9:"Ⅸ",10:"Ⅹ"};

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<String> newStream = intStream.asyncExpand(_streamExpand);
  newStream.listen((e) {
    print(e);
  });
}

Stream<String>? _streamExpand(int e) async*{
  await Future.delayed(const Duration(milliseconds: 50));
  yield numMap[e]!;
  await Future.delayed(const Duration(milliseconds: 50));
  yield numMap2[e]!;
}

4. transform – StreamTransformer

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer)

Applies a custom StreamTransformer. Example using the built‑in AsciiEncoder to convert integers to ASCII byte lists:

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Stream<List<int>> newStream = intStream
      .map((int e) => e.toString())
      .transform(const AsciiEncoder());

  int start = DateTime.now().millisecondsSinceEpoch;
  newStream.listen((e) {
    print("$e === ${DateTime.now().millisecondsSinceEpoch - start} ms");
  });
}

3. Single‑Element Operators (Future‑Returning)

1. reduce – Iterative Combination

Future<T> reduce(T combine(T previous, T element))

Combines elements sequentially, returning a Future of the final result.

2. fold – Combination with Initial Value

Future<S> fold<S>(S initialValue, S combine(S previous, T element))

Similar to reduce but starts with an explicit initial value, allowing type conversion.

3. drain – Discard All Elements

Future<E> drain<E>([E? futureValue])

Ignores all stream elements and completes when the stream ends.

4. every – All‑Match Test

Future<bool> every(bool test(T element))

Returns true if every element satisfies the predicate.

5. any – Any‑Match Test

Future<bool> any(bool test(T element))

Returns true as soon as an element satisfies the predicate.

6. singleWhere – Exactly One Match

Future<T> singleWhere(bool test(T element), {T orElse()?})

Completes with the sole matching element or throws if none or multiple match.

7. firstWhere – First Match

Future<T> firstWhere(bool test(T element), {T orElse()?})

Completes with the first element that satisfies the predicate.

8. lastWhere – Last Match

Future<T> lastWhere(bool test(T element), {T orElse()?})

Waits for the stream to finish and returns the last matching element.

9. elementAt – Indexed Access

Future<T> elementAt(int index)

Completes with the element at the given index.

4. Additional Operators

The article also covers forEach (asynchronous iteration), contains (membership test), and join (concatenating elements into a string), each returning a Future that resolves when the stream completes.

void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Future<dynamic> result = intStream.forEach(process);
  int start = DateTime.now().millisecondsSinceEpoch;
  result.then((value) {
    print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
  });
}

void process(int e){
  print(e);
}
void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Future<bool> result = intStream.contains(4);
  int start = DateTime.now().millisecondsSinceEpoch;
  result.then((value) {
    print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
  });
}
void main(){
  Stream<int> intStream = StreamProvider().createStream();
  Future<String> result = intStream.join(",");
  int start = DateTime.now().millisecondsSinceEpoch;
  result.then((value) {
    print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
  });
}

Overall, the tutorial demonstrates how Dart's built‑in stream operators and their asynchronous variants can be combined to perform complex data transformations, filtering, and aggregation, serving as a solid foundation for reactive programming in Flutter and other Dart‑based applications.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

DARTFlutterAsynchronousreactive-programmingStreamRxDart
Rare Earth Juejin Tech Community
Written by

Rare Earth Juejin Tech Community

Juejin, a tech community that helps developers grow.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.