Effect TS

A course for TypeScript developers

Streams

3 steps
21
Creating & Transforming Streams
Effect's answer to async iterables
45 min

Stream is Effect's answer to async iterables and observables. It models a potentially infinite sequence of values produced over time — with typed errors and dependency injection baked in, just like Effect.

Key InsightStream<A, E, R> emits multiple A values (vs Effect which produces one). It uses the same E and R channels you already know. Streams are lazy — nothing runs until consumed.
What to learn
Stream<A, E, R>
Like Effect<A, E, R> but produces zero or more A values instead of exactly one.
Stream.make / fromIterable
Create streams from literal values or arrays. Simplest constructors.
Stream.fromEffect / fromIterableEffect
Create a stream from an Effect. fromIterableEffect unwraps an Effect that returns an iterable.
Stream.unfold(seed, f)
Generate values from a seed — like Array.from but lazy and potentially infinite.
Stream.range(start, end)
Creates a stream of integers in a range. Inclusive on both ends.
Stream.map / filter / take / drop
Transform streams. Same names as Array methods — but lazy, nothing runs until consumed.
Stream.flatMap
Like Effect.flatMap but each element produces a sub-stream that gets flattened. Think Array.flatMap but async and lazy.
Stream.tap(f)
Run a side-effect for each element without changing the stream. Great for logging.
Stream.scan(seed, f)
Like Array.reduce but emits every intermediate accumulator value.
Stream.concat / merge / zip
Combine streams: sequentially (concat), interleaved async (merge), or paired (zip).
In TypeScript
// TypeScript: async generator
async function* doubles(arr: number[]) {
  for (const n of arr) {
    yield n * 2
  }
}
// Consuming:
for await (const n of doubles([1, 2, 3])) {
  if (n > 4) console.log(n)
}
// Problems:
// - No typed errors (throw anything, catch... string?)
// - No dependency injection
// - Composing generators is awkward (yield* in generators is clunky)
// - No built-in concurrency, buffering, or grouping
// - Can't easily merge two generators or zip them
With Effect
import { Stream, Effect } from "effect"

// ── Creating streams ──
const fromValues = Stream.make(1, 2, 3)
const fromArray  = Stream.fromIterable([1, 2, 3])
const fromEffect = Stream.fromEffect(Effect.succeed(42))
const naturals   = Stream.unfold(1, (n) => Option.some([n, n + 1]))
//                                         [emit, nextSeed]

// ── Transforming ──
const pipeline = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
  Stream.map((n) => n * 2),           // [2, 4, 6, 8, 10]
  Stream.filter((n) => n > 4),        // [6, 8, 10]
  Stream.tap((n) => Effect.log(`Processing: ${n}`))
)

// ── flatMap: each element → sub-stream ──
const expanded = Stream.make(1, 2, 3).pipe(
  Stream.flatMap((n) => Stream.make(n, n * 10))
)
// [1, 10, 2, 20, 3, 30]

// ── scan: running totals ──
const runningSum = Stream.fromIterable([1, 2, 3, 4]).pipe(
  Stream.scan(0, (acc, n) => acc + n)
)
// [0, 1, 3, 6, 10]

// ── Combining streams ──
const a = Stream.make(1, 2)
const b = Stream.make(3, 4)
const sequential  = Stream.concat(a, b)       // [1, 2, 3, 4]
const interleaved = Stream.merge(a, b)         // async interleaving
const paired      = Stream.zip(a, b)           // [[1,3], [2,4]]
How it works
  async function*              Stream<A, E, R>
  ──────────────              ───────────────

  yield value                 Stream.make(1, 2, 3)

  yield* otherGen()           Stream.flatMap(n =>
                                Stream.make(n, n * 10))

  for await (const x          Stream.runCollect
    of gen()) { ... }           → Chunk<A>

  no error types              E channel tracks errors

  no dependencies             R channel tracks services

  manual composition          Stream.concat / merge / zip


  Mental model:
  ─────────────
  Effect<A, E, R>   →  produces ONE value
  Stream<A, E, R>   →  produces MANY values

  All the same E/R patterns you learned still apply:
  - Stream.mapError, Stream.catchAll for errors
  - Stream.provideService for dependencies
Practice
Build a stream pipeline

Create a stream of numbers 1–10, double each, keep only values > 10, and collect into a Chunk.

import { Stream, Effect, Chunk } from "effect"

// TODO:
// 1. Create a stream of 1..10 using Stream.range
// 2. Double each value with Stream.map
// 3. Keep only values > 10 with Stream.filter
// 4. Collect into a Chunk with Stream.runCollect
// Expected result: Chunk(12, 14, 16, 18, 20)
Reveal solution
import { Stream, Effect, Chunk } from "effect"

const program = Stream.range(1, 10).pipe(
  Stream.map((n) => n * 2),
  Stream.filter((n) => n > 10),
  Stream.runCollect
)
// Effect<Chunk<number>, never, never>
// Result: Chunk(12, 14, 16, 18, 20)
Convert an async generator to a Stream

You have an async generator that fetches pages of data. Convert it to a Stream using Stream.unfold.

import { Stream, Effect, Option } from "effect"

// Simulated paginated API
const fetchPage = (page: number) =>
  Effect.succeed({
    items: [page * 10 + 1, page * 10 + 2],
    hasMore: page < 3
  })

// TODO: Use Stream.unfold to create a stream that:
// 1. Starts at page 1
// 2. Fetches each page
// 3. Emits the items array
// 4. Stops when hasMore is false
// Hint: unfoldEffect lets you use Effects in the unfold function
Reveal solution
import { Stream, Effect, Option } from "effect"

const fetchPage = (page: number) =>
  Effect.succeed({
    items: [page * 10 + 1, page * 10 + 2],
    hasMore: page < 3
  })

const pages = Stream.unfoldEffect(1, (page) =>
  fetchPage(page).pipe(
    Effect.map(({ items, hasMore }) =>
      hasMore
        ? Option.some([items, page + 1] as const)
        : Option.some([items, page + 1] as const) // emit last page
    )
  )
).pipe(
  Stream.mapConcat((items) => items) // flatten arrays into individual elements
)
// Emits: 11, 12, 21, 22, 31, 32
Common TrapStreams are lazy and pull-based. Don't confuse them with EventEmitters (push-based). A Stream doesn't produce values until a consumer (Sink/runner) requests them.
Read docs →
22
Sinks & Consuming Streams
Collecting, folding, and aggregating
45 min

A Stream produces values but does nothing until consumed. Sinks are the consumers — they collect, fold, aggregate, or process stream elements. Think of a Sink as the 'reduce' at the end of your pipeline.

Key InsightSink<A, In, L, E, R> consumes In values and produces an A. The L type represents 'leftovers' — elements the Sink didn't consume. Stream.run(stream, sink) connects them.
What to learn
Stream.runCollect
Collects all elements into a Chunk. Simplest consumer — but loads everything into memory.
Stream.runForEach(f)
Runs an Effect for each element. Like forEach but effectful — good for writing to DB or logging.
Stream.runFold(seed, f)
Accumulates all elements into a single value. Like Array.reduce.
Stream.run(stream, sink)
Runs a stream with a custom Sink. Most flexible consumption method.
Sink.collectAll()
Sink equivalent of runCollect. Collects into a Chunk.
Sink.fold(seed, cont, f)
Accumulates values until cont returns false. Returns accumulated result — leftovers go to L.
Sink.head()
Takes just the first element, wrapped in Option. Remainder becomes leftovers.
Sink.forEach(f)
Runs an Effect for each element. Sink version of runForEach.
Stream.grouped(n) / groupedWithin(n, duration)
Batch elements: fixed-size chunks or by size/time (whichever comes first). Essential for batch processing.
Stream.groupByKey(f)
Groups elements by a key function. Returns a GroupBy you evaluate with a function per group.
In TypeScript
// TypeScript: consuming an async iterable
async function collectAll<T>(gen: AsyncIterable<T>): Promise<T[]> {
  const result: T[] = []
  for await (const item of gen) {
    result.push(item) // all in memory
  }
  return result
}

async function reduce<T, U>(
  gen: AsyncIterable<T>,
  seed: U,
  f: (acc: U, item: T) => U
): Promise<U> {
  let acc = seed
  for await (const item of gen) {
    acc = f(acc, item)
  }
  return acc
}
// Problems:
// - Must write these helpers yourself
// - No built-in batching/grouping
// - No typed errors in the pipeline
// - No "take first N" without manual break logic
With Effect
import { Stream, Sink, Effect } from "effect"

const numbers = Stream.range(1, 100)

// ── Simple runners ──
const all   = Stream.runCollect(numbers)        // Chunk of 1..100
const sum   = Stream.runFold(numbers, 0, (a, b) => a + b)  // 5050
const log   = Stream.runForEach(numbers, (n) => Effect.log(`Got: ${n}`))

// ── Custom Sinks ──
const first = Stream.run(numbers, Sink.head())
// Effect<Option<number>, never, never>

// Collect first 5 elements:
const firstFive = Stream.run(numbers, Sink.take(5))
// Effect<Chunk<number>, never, never>

// Fold until sum exceeds 50:
const partial = Stream.run(
  numbers,
  Sink.fold(0, (sum) => sum <= 50, (sum, n) => sum + n)
)
// Stops consuming when sum > 50, leftovers stay unconsumed

// ── Batching with grouped ──
const batched = Stream.range(1, 20).pipe(
  Stream.grouped(5),                 // Chunk of 5 elements each
  Stream.runForEach((batch) =>
    Effect.log(`Processing batch of ${batch.length}`)
  )
)

// ── groupedWithin: size OR time ──
const adaptive = someEventStream.pipe(
  Stream.groupedWithin(100, "5 seconds"),
  // batch up to 100 items, OR flush every 5 seconds
  Stream.runForEach((batch) => writeToDB(batch))
)
How it works
  Stream              Sink               Result
  ──────              ────               ──────
  1, 2, 3, ...   →   collectAll()    →  Chunk(1, 2, 3, ...)
  1, 2, 3, ...   →   head()          →  Option.some(1)
  1, 2, 3, ...   →   take(3)         →  Chunk(1, 2, 3)
  1, 2, 3, ...   →   fold(0, +)      →  6  (1+2+3)
  1, 2, 3, ...   →   forEach(log)    →  void (side effects)


  TS mental model:
  ────────────────
  Array          →  .reduce()       →  single value
  Stream         →  Sink.fold()     →  single value (but lazy + effectful)

  Array          →  .slice(0, 5)    →  smaller array
  Stream         →  Sink.take(5)    →  Chunk (and stops pulling!)

  Array          →  .forEach()      →  side effects
  Stream         →  Sink.forEach()  →  side effects (but effectful)


  Batching:
  ─────────
  Stream.grouped(3):     [1,2,3] → [4,5,6] → [7,8,9] → ...
  Stream.groupedWithin(100, "5s"):
    → flush at 100 items OR 5 seconds, whichever first
    → essential for "batch writes to DB every N items or every M seconds"
Practice
Sum a stream without collecting it

Given a stream of numbers 1–1000, compute the sum without loading all elements into memory. Use runFold.

import { Stream, Effect } from "effect"

const numbers = Stream.range(1, 1000)

// TODO: Compute the sum using Stream.runFold
// Don't use runCollect — that loads everything into memory
// Expected result: 500500
Reveal solution
import { Stream, Effect } from "effect"

const numbers = Stream.range(1, 1000)

const sum = Stream.runFold(numbers, 0, (acc, n) => acc + n)
// Effect<number, never, never>
// Result: 500500
Take until a condition is met

Use Sink.fold to consume numbers from a stream until the running total exceeds 100. Return the total.

import { Stream, Sink, Effect } from "effect"

const numbers = Stream.range(1, 50)

// TODO: Use Stream.run with Sink.fold to:
// 1. Start with seed 0
// 2. Keep folding while total <= 100
// 3. Return the accumulated total
// Hint: Sink.fold(seed, continuePredicate, accumulator)
Reveal solution
import { Stream, Sink, Effect } from "effect"

const numbers = Stream.range(1, 50)

const result = Stream.run(
  numbers,
  Sink.fold(
    0,                        // seed
    (total) => total <= 100,  // continue while true
    (total, n) => total + n   // accumulator
  )
)
// Stops pulling from stream once total > 100
// Result: 105 (1+2+...+14 = 105)
Batch process a stream

Given a stream of user events, batch them into groups of 10 and log each batch. Use Stream.grouped and Stream.runForEach.

import { Stream, Effect, Chunk } from "effect"

const events = Stream.fromIterable(
  Array.from({ length: 35 }, (_, i) => ({ id: i, type: "click" }))
)

// TODO:
// 1. Group events into batches of 10 using Stream.grouped
// 2. Use Stream.runForEach to log each batch size
// Expected: 4 batches (10, 10, 10, 5)
Reveal solution
import { Stream, Effect, Chunk } from "effect"

const events = Stream.fromIterable(
  Array.from({ length: 35 }, (_, i) => ({ id: i, type: "click" }))
)

const program = events.pipe(
  Stream.grouped(10),
  Stream.runForEach((batch) =>
    Effect.log(`Batch of ${Chunk.size(batch)} events`)
  )
)
// Logs: Batch of 10 events (x3), Batch of 5 events (x1)
Common TraprunCollect loads ALL elements into memory. For large or infinite streams, use runForEach, runFold, or a Sink that processes elements incrementally. Treat runCollect like Array.from() on a generator — fine for small data, dangerous for large.
Read docs →
23
Stream Concurrency & Error Handling
Merging, racing, and recovering
45 min

Streams support concurrency (parallel flatMap, merging), error recovery (catchAll, retry), and resource management (finalizers, scoped streams). This is where Streams pull ahead of plain async iterables.

Key InsightStream.flatMap with { concurrency: N } processes N sub-streams in parallel. Stream.catchAll recovers from errors. Stream.finalizer guarantees cleanup. All the Effect patterns you know work here too.
What to learn
Stream.flatMap with { concurrency }
Process N sub-streams in parallel. Without concurrency, sub-streams run sequentially.
Stream.flatMap with { switch: true }
Cancel previous sub-stream when a new element arrives. Like RxJS switchMap.
Stream.merge(a, b)
Interleave two streams asynchronously. Elements arrive in whatever order they're produced.
Stream.catchAll(f)
Recover from stream errors by switching to a fallback stream.
Stream.catchSome(f)
Selectively recover from specific errors. Return Option.some(fallbackStream) or Option.none() to re-raise.
Stream.retry(schedule)
Retry a failing stream according to a Schedule. The stream restarts from the beginning.
Stream.finalizer(effect)
Guarantees an Effect runs when the stream ends — success, failure, or interruption.
Stream.scoped(effect)
Creates a stream from a scoped Effect. Resources are managed within the stream's lifetime.
Stream.schedule(schedule)
Controls emission rate. Stream.schedule(Schedule.spaced('1 second')) adds 1s delay between elements.
In TypeScript
// TypeScript: concurrent processing with async iterables
async function* processAll(urls: string[]) {
  // Sequential — no built-in concurrency for async generators
  for (const url of urls) {
    try {
      const res = await fetch(url)
      yield await res.json()
    } catch (e) {
      // Error kills the whole generator? Swallow it? Re-throw?
      console.error(e) // usually swallow and continue... fragile
    }
  }
}
// Problems:
// - No parallel processing without Promise.all (loses streaming)
// - Error handling is ad-hoc — no typed recovery
// - No cancellation/switchMap pattern
// - No guaranteed cleanup (finalizers)
// - No built-in retry with backoff
With Effect
import { Stream, Effect, Schedule } from "effect"

// ── Concurrent flatMap ──
const fetchUser = (id: number) =>
  Stream.fromEffect(Effect.tryPromise(() =>
    fetch(`/api/users/${id}`).then((r) => r.json())
  ))

const users = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
  Stream.flatMap(fetchUser, { concurrency: 3 })
  // 3 fetches in parallel at a time
)

// ── switchMap: cancel previous on new ──
const searchResults = searchTermStream.pipe(
  Stream.flatMap(
    (term) => Stream.fromEffect(searchAPI(term)),
    { switch: true }
    // new search term cancels the in-flight request
  )
)

// ── Error recovery ──
const resilient = Stream.fromEffect(
  Effect.tryPromise(() => fetch("/api/data"))
).pipe(
  Stream.catchAll((error) =>
    Stream.make({ fallback: true, error: String(error) })
  )
)

// ── Retry with backoff ──
const withRetry = Stream.fromEffect(
  Effect.tryPromise(() => fetch("/api/flaky"))
).pipe(
  Stream.retry(Schedule.exponential("1 second").pipe(
    Schedule.compose(Schedule.recurs(3))
  ))
)

// ── Resource management ──
const fileLines = Stream.scoped(
  Effect.acquireRelease(
    Effect.sync(() => openFile("data.csv")),
    (handle) => Effect.sync(() => handle.close())
  )
).pipe(
  Stream.flatMap((handle) => Stream.fromIterable(handle.readLines()))
)

// ── Finalizer: guaranteed cleanup ──
const withCleanup = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.finalizer(
    Effect.log("Stream finished — cleaning up")
  ))
)
How it works
  Concurrency modes:
  ──────────────────

  flatMap (default):  ─── sub-stream 1 ──→ sub-stream 2 ──→ sub-stream 3 ──→
                      sequential: one at a time

  flatMap(f, {        ─── sub-stream 1 ──→
   concurrency: 3     ─── sub-stream 2 ──→  (up to 3 in parallel)
  })                   ─── sub-stream 3 ──→

  flatMap(f, {        ─── sub-stream 1 ──✗ (cancelled!)
   switch: true        ─── sub-stream 2 ──✗ (cancelled!)
  })                   ─── sub-stream 3 ──→ (only latest survives)
                       like RxJS switchMap


  Error recovery:
  ───────────────

  Stream ──→ error! ──→ catchAll ──→ fallback stream ──→ continues
                                                         (original stream is done)

  Stream ──→ error! ──→ retry(3x) ──→ restart stream ──→ ...
                                      (whole stream restarts, not just the element)


  TS ↔ RxJS mental model:
  ───────────────────────
  Stream.flatMap            = RxJS concatMap (default)
  flatMap + concurrency     = RxJS mergeMap
  flatMap + switch          = RxJS switchMap
  Stream.merge              = RxJS merge
  Stream.zip                = RxJS zip
Practice
Parallel fetch with concurrency limit

Given a stream of 10 user IDs, fetch each user in parallel with a concurrency limit of 3. Collect all results.

import { Stream, Effect, Chunk } from "effect"

const userIds = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

const fetchUser = (id: number) =>
  Effect.succeed({ id, name: `User ${id}` }).pipe(
    Effect.delay("100 millis") // simulate network delay
  )

// TODO:
// 1. Use Stream.flatMap with { concurrency: 3 } to fetch users
// 2. Collect all results with Stream.runCollect
// Hint: wrap fetchUser in Stream.fromEffect
Reveal solution
import { Stream, Effect, Chunk } from "effect"

const userIds = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

const fetchUser = (id: number) =>
  Effect.succeed({ id, name: `User ${id}` }).pipe(
    Effect.delay("100 millis")
  )

const program = userIds.pipe(
  Stream.flatMap(
    (id) => Stream.fromEffect(fetchUser(id)),
    { concurrency: 3 }
  ),
  Stream.runCollect
)
// Fetches 3 users at a time, collects all 10 results
Recover from stream errors

Create a stream that fails on even numbers. Use catchAll to recover with a stream that emits -1 as a sentinel value.

import { Stream, Effect } from "effect"

const numbers = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
  Stream.mapEffect((n) =>
    n % 2 === 0
      ? Effect.fail(`${n} is even!` as const)
      : Effect.succeed(n)
  )
)

// TODO: Use Stream.catchAll to recover from the error
// When the stream fails, switch to Stream.make(-1)
// Expected: collects [1] then fails on 2, switches to [-1]
// Final result: Chunk(1, -1)
Reveal solution
import { Stream, Effect } from "effect"

const numbers = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
  Stream.mapEffect((n) =>
    n % 2 === 0
      ? Effect.fail(`${n} is even!` as const)
      : Effect.succeed(n)
  )
)

const recovered = numbers.pipe(
  Stream.catchAll((_error) => Stream.make(-1))
)

const result = Stream.runCollect(recovered)
// Chunk(1, -1)
// Stream emitted 1, then hit 2 (error), switched to fallback
Common TrapStream.retry restarts the ENTIRE stream, not just the failed element. If you need per-element retry, use Stream.mapEffect with an Effect that has its own retry logic.
Read docs →
ConcurrencyTesting & Style