Stream is Effect's answer to async iterables and observables. It models a potentially infinite sequence of values produced over time — with typed errors and dependency injection baked in, just like Effect.
Stream<A, E, R>Stream.make / fromIterableStream.fromEffect / fromIterableEffectStream.unfold(seed, f)Stream.range(start, end)Stream.map / filter / take / dropStream.flatMapStream.tap(f)Stream.scan(seed, f)Stream.concat / merge / zip// TypeScript: async generator
async function* doubles(arr: number[]) {
for (const n of arr) {
yield n * 2
}
}
// Consuming:
for await (const n of doubles([1, 2, 3])) {
if (n > 4) console.log(n)
}
// Problems:
// - No typed errors (throw anything, catch... string?)
// - No dependency injection
// - Composing generators is awkward (yield* in generators is clunky)
// - No built-in concurrency, buffering, or grouping
// - Can't easily merge two generators or zip themimport { Stream, Effect } from "effect"
// ── Creating streams ──
const fromValues = Stream.make(1, 2, 3)
const fromArray = Stream.fromIterable([1, 2, 3])
const fromEffect = Stream.fromEffect(Effect.succeed(42))
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))
// [emit, nextSeed]
// ── Transforming ──
const pipeline = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.map((n) => n * 2), // [2, 4, 6, 8, 10]
Stream.filter((n) => n > 4), // [6, 8, 10]
Stream.tap((n) => Effect.log(`Processing: ${n}`))
)
// ── flatMap: each element → sub-stream ──
const expanded = Stream.make(1, 2, 3).pipe(
Stream.flatMap((n) => Stream.make(n, n * 10))
)
// [1, 10, 2, 20, 3, 30]
// ── scan: running totals ──
const runningSum = Stream.fromIterable([1, 2, 3, 4]).pipe(
Stream.scan(0, (acc, n) => acc + n)
)
// [0, 1, 3, 6, 10]
// ── Combining streams ──
const a = Stream.make(1, 2)
const b = Stream.make(3, 4)
const sequential = Stream.concat(a, b) // [1, 2, 3, 4]
const interleaved = Stream.merge(a, b) // async interleaving
const paired = Stream.zip(a, b) // [[1,3], [2,4]] async function* Stream<A, E, R>
────────────── ───────────────
yield value Stream.make(1, 2, 3)
yield* otherGen() Stream.flatMap(n =>
Stream.make(n, n * 10))
for await (const x Stream.runCollect
of gen()) { ... } → Chunk<A>
no error types E channel tracks errors
no dependencies R channel tracks services
manual composition Stream.concat / merge / zip
Mental model:
─────────────
Effect<A, E, R> → produces ONE value
Stream<A, E, R> → produces MANY values
All the same E/R patterns you learned still apply:
- Stream.mapError, Stream.catchAll for errors
- Stream.provideService for dependenciesCreate a stream of numbers 1–10, double each, keep only values > 10, and collect into a Chunk.
import { Stream, Effect, Chunk } from "effect"
// TODO:
// 1. Create a stream of 1..10 using Stream.range
// 2. Double each value with Stream.map
// 3. Keep only values > 10 with Stream.filter
// 4. Collect into a Chunk with Stream.runCollect
// Expected result: Chunk(12, 14, 16, 18, 20)import { Stream, Effect, Chunk } from "effect"
const program = Stream.range(1, 10).pipe(
Stream.map((n) => n * 2),
Stream.filter((n) => n > 10),
Stream.runCollect
)
// Effect<Chunk<number>, never, never>
// Result: Chunk(12, 14, 16, 18, 20)You have an async generator that fetches pages of data. Convert it to a Stream using Stream.unfold.
import { Stream, Effect, Option } from "effect"
// Simulated paginated API
const fetchPage = (page: number) =>
Effect.succeed({
items: [page * 10 + 1, page * 10 + 2],
hasMore: page < 3
})
// TODO: Use Stream.unfold to create a stream that:
// 1. Starts at page 1
// 2. Fetches each page
// 3. Emits the items array
// 4. Stops when hasMore is false
// Hint: unfoldEffect lets you use Effects in the unfold functionimport { Stream, Effect, Option } from "effect"
const fetchPage = (page: number) =>
Effect.succeed({
items: [page * 10 + 1, page * 10 + 2],
hasMore: page < 3
})
const pages = Stream.unfoldEffect(1, (page) =>
fetchPage(page).pipe(
Effect.map(({ items, hasMore }) =>
hasMore
? Option.some([items, page + 1] as const)
: Option.some([items, page + 1] as const) // emit last page
)
)
).pipe(
Stream.mapConcat((items) => items) // flatten arrays into individual elements
)
// Emits: 11, 12, 21, 22, 31, 32A Stream produces values but does nothing until consumed. Sinks are the consumers — they collect, fold, aggregate, or process stream elements. Think of a Sink as the 'reduce' at the end of your pipeline.
Stream.runCollectStream.runForEach(f)Stream.runFold(seed, f)Stream.run(stream, sink)Sink.collectAll()Sink.fold(seed, cont, f)Sink.head()Sink.forEach(f)Stream.grouped(n) / groupedWithin(n, duration)Stream.groupByKey(f)// TypeScript: consuming an async iterable
async function collectAll<T>(gen: AsyncIterable<T>): Promise<T[]> {
const result: T[] = []
for await (const item of gen) {
result.push(item) // all in memory
}
return result
}
async function reduce<T, U>(
gen: AsyncIterable<T>,
seed: U,
f: (acc: U, item: T) => U
): Promise<U> {
let acc = seed
for await (const item of gen) {
acc = f(acc, item)
}
return acc
}
// Problems:
// - Must write these helpers yourself
// - No built-in batching/grouping
// - No typed errors in the pipeline
// - No "take first N" without manual break logicimport { Stream, Sink, Effect } from "effect"
const numbers = Stream.range(1, 100)
// ── Simple runners ──
const all = Stream.runCollect(numbers) // Chunk of 1..100
const sum = Stream.runFold(numbers, 0, (a, b) => a + b) // 5050
const log = Stream.runForEach(numbers, (n) => Effect.log(`Got: ${n}`))
// ── Custom Sinks ──
const first = Stream.run(numbers, Sink.head())
// Effect<Option<number>, never, never>
// Collect first 5 elements:
const firstFive = Stream.run(numbers, Sink.take(5))
// Effect<Chunk<number>, never, never>
// Fold until sum exceeds 50:
const partial = Stream.run(
numbers,
Sink.fold(0, (sum) => sum <= 50, (sum, n) => sum + n)
)
// Stops consuming when sum > 50, leftovers stay unconsumed
// ── Batching with grouped ──
const batched = Stream.range(1, 20).pipe(
Stream.grouped(5), // Chunk of 5 elements each
Stream.runForEach((batch) =>
Effect.log(`Processing batch of ${batch.length}`)
)
)
// ── groupedWithin: size OR time ──
const adaptive = someEventStream.pipe(
Stream.groupedWithin(100, "5 seconds"),
// batch up to 100 items, OR flush every 5 seconds
Stream.runForEach((batch) => writeToDB(batch))
) Stream Sink Result
────── ──── ──────
1, 2, 3, ... → collectAll() → Chunk(1, 2, 3, ...)
1, 2, 3, ... → head() → Option.some(1)
1, 2, 3, ... → take(3) → Chunk(1, 2, 3)
1, 2, 3, ... → fold(0, +) → 6 (1+2+3)
1, 2, 3, ... → forEach(log) → void (side effects)
TS mental model:
────────────────
Array → .reduce() → single value
Stream → Sink.fold() → single value (but lazy + effectful)
Array → .slice(0, 5) → smaller array
Stream → Sink.take(5) → Chunk (and stops pulling!)
Array → .forEach() → side effects
Stream → Sink.forEach() → side effects (but effectful)
Batching:
─────────
Stream.grouped(3): [1,2,3] → [4,5,6] → [7,8,9] → ...
Stream.groupedWithin(100, "5s"):
→ flush at 100 items OR 5 seconds, whichever first
→ essential for "batch writes to DB every N items or every M seconds"Given a stream of numbers 1–1000, compute the sum without loading all elements into memory. Use runFold.
import { Stream, Effect } from "effect"
const numbers = Stream.range(1, 1000)
// TODO: Compute the sum using Stream.runFold
// Don't use runCollect — that loads everything into memory
// Expected result: 500500import { Stream, Effect } from "effect"
const numbers = Stream.range(1, 1000)
const sum = Stream.runFold(numbers, 0, (acc, n) => acc + n)
// Effect<number, never, never>
// Result: 500500Use Sink.fold to consume numbers from a stream until the running total exceeds 100. Return the total.
import { Stream, Sink, Effect } from "effect"
const numbers = Stream.range(1, 50)
// TODO: Use Stream.run with Sink.fold to:
// 1. Start with seed 0
// 2. Keep folding while total <= 100
// 3. Return the accumulated total
// Hint: Sink.fold(seed, continuePredicate, accumulator)import { Stream, Sink, Effect } from "effect"
const numbers = Stream.range(1, 50)
const result = Stream.run(
numbers,
Sink.fold(
0, // seed
(total) => total <= 100, // continue while true
(total, n) => total + n // accumulator
)
)
// Stops pulling from stream once total > 100
// Result: 105 (1+2+...+14 = 105)Given a stream of user events, batch them into groups of 10 and log each batch. Use Stream.grouped and Stream.runForEach.
import { Stream, Effect, Chunk } from "effect"
const events = Stream.fromIterable(
Array.from({ length: 35 }, (_, i) => ({ id: i, type: "click" }))
)
// TODO:
// 1. Group events into batches of 10 using Stream.grouped
// 2. Use Stream.runForEach to log each batch size
// Expected: 4 batches (10, 10, 10, 5)import { Stream, Effect, Chunk } from "effect"
const events = Stream.fromIterable(
Array.from({ length: 35 }, (_, i) => ({ id: i, type: "click" }))
)
const program = events.pipe(
Stream.grouped(10),
Stream.runForEach((batch) =>
Effect.log(`Batch of ${Chunk.size(batch)} events`)
)
)
// Logs: Batch of 10 events (x3), Batch of 5 events (x1)Streams support concurrency (parallel flatMap, merging), error recovery (catchAll, retry), and resource management (finalizers, scoped streams). This is where Streams pull ahead of plain async iterables.
Stream.flatMap with { concurrency }Stream.flatMap with { switch: true }Stream.merge(a, b)Stream.catchAll(f)Stream.catchSome(f)Stream.retry(schedule)Stream.finalizer(effect)Stream.scoped(effect)Stream.schedule(schedule)// TypeScript: concurrent processing with async iterables
async function* processAll(urls: string[]) {
// Sequential — no built-in concurrency for async generators
for (const url of urls) {
try {
const res = await fetch(url)
yield await res.json()
} catch (e) {
// Error kills the whole generator? Swallow it? Re-throw?
console.error(e) // usually swallow and continue... fragile
}
}
}
// Problems:
// - No parallel processing without Promise.all (loses streaming)
// - Error handling is ad-hoc — no typed recovery
// - No cancellation/switchMap pattern
// - No guaranteed cleanup (finalizers)
// - No built-in retry with backoffimport { Stream, Effect, Schedule } from "effect"
// ── Concurrent flatMap ──
const fetchUser = (id: number) =>
Stream.fromEffect(Effect.tryPromise(() =>
fetch(`/api/users/${id}`).then((r) => r.json())
))
const users = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.flatMap(fetchUser, { concurrency: 3 })
// 3 fetches in parallel at a time
)
// ── switchMap: cancel previous on new ──
const searchResults = searchTermStream.pipe(
Stream.flatMap(
(term) => Stream.fromEffect(searchAPI(term)),
{ switch: true }
// new search term cancels the in-flight request
)
)
// ── Error recovery ──
const resilient = Stream.fromEffect(
Effect.tryPromise(() => fetch("/api/data"))
).pipe(
Stream.catchAll((error) =>
Stream.make({ fallback: true, error: String(error) })
)
)
// ── Retry with backoff ──
const withRetry = Stream.fromEffect(
Effect.tryPromise(() => fetch("/api/flaky"))
).pipe(
Stream.retry(Schedule.exponential("1 second").pipe(
Schedule.compose(Schedule.recurs(3))
))
)
// ── Resource management ──
const fileLines = Stream.scoped(
Effect.acquireRelease(
Effect.sync(() => openFile("data.csv")),
(handle) => Effect.sync(() => handle.close())
)
).pipe(
Stream.flatMap((handle) => Stream.fromIterable(handle.readLines()))
)
// ── Finalizer: guaranteed cleanup ──
const withCleanup = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.finalizer(
Effect.log("Stream finished — cleaning up")
))
) Concurrency modes:
──────────────────
flatMap (default): ─── sub-stream 1 ──→ sub-stream 2 ──→ sub-stream 3 ──→
sequential: one at a time
flatMap(f, { ─── sub-stream 1 ──→
concurrency: 3 ─── sub-stream 2 ──→ (up to 3 in parallel)
}) ─── sub-stream 3 ──→
flatMap(f, { ─── sub-stream 1 ──✗ (cancelled!)
switch: true ─── sub-stream 2 ──✗ (cancelled!)
}) ─── sub-stream 3 ──→ (only latest survives)
like RxJS switchMap
Error recovery:
───────────────
Stream ──→ error! ──→ catchAll ──→ fallback stream ──→ continues
(original stream is done)
Stream ──→ error! ──→ retry(3x) ──→ restart stream ──→ ...
(whole stream restarts, not just the element)
TS ↔ RxJS mental model:
───────────────────────
Stream.flatMap = RxJS concatMap (default)
flatMap + concurrency = RxJS mergeMap
flatMap + switch = RxJS switchMap
Stream.merge = RxJS merge
Stream.zip = RxJS zipGiven a stream of 10 user IDs, fetch each user in parallel with a concurrency limit of 3. Collect all results.
import { Stream, Effect, Chunk } from "effect"
const userIds = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
const fetchUser = (id: number) =>
Effect.succeed({ id, name: `User ${id}` }).pipe(
Effect.delay("100 millis") // simulate network delay
)
// TODO:
// 1. Use Stream.flatMap with { concurrency: 3 } to fetch users
// 2. Collect all results with Stream.runCollect
// Hint: wrap fetchUser in Stream.fromEffectimport { Stream, Effect, Chunk } from "effect"
const userIds = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
const fetchUser = (id: number) =>
Effect.succeed({ id, name: `User ${id}` }).pipe(
Effect.delay("100 millis")
)
const program = userIds.pipe(
Stream.flatMap(
(id) => Stream.fromEffect(fetchUser(id)),
{ concurrency: 3 }
),
Stream.runCollect
)
// Fetches 3 users at a time, collects all 10 resultsCreate a stream that fails on even numbers. Use catchAll to recover with a stream that emits -1 as a sentinel value.
import { Stream, Effect } from "effect"
const numbers = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.mapEffect((n) =>
n % 2 === 0
? Effect.fail(`${n} is even!` as const)
: Effect.succeed(n)
)
)
// TODO: Use Stream.catchAll to recover from the error
// When the stream fails, switch to Stream.make(-1)
// Expected: collects [1] then fails on 2, switches to [-1]
// Final result: Chunk(1, -1)import { Stream, Effect } from "effect"
const numbers = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.mapEffect((n) =>
n % 2 === 0
? Effect.fail(`${n} is even!` as const)
: Effect.succeed(n)
)
)
const recovered = numbers.pipe(
Stream.catchAll((_error) => Stream.make(-1))
)
const result = Stream.runCollect(recovered)
// Chunk(1, -1)
// Stream emitted 1, then hit 2 (error), switched to fallback