Effect TS

A course for TypeScript developers

Concurrency

2 steps
19
Fibers & Basic Concurrency
Lightweight threads for TypeScript
60 min

Fibers are Effect's concurrency primitive — lightweight virtual threads that can be forked, joined, and interrupted.

Key InsightEffect.fork runs an Effect on a new Fiber. Fiber.join waits for it. Fiber.interrupt cancels it. Effect.all with concurrency options is the easy path for most cases.
What to learn
Effect.fork(effect)
Starts a Fiber. Returns immediately with a Fiber handle. Child fiber is auto-supervised — killed when parent ends.
Fiber.join(fiber)
Waits for a Fiber to complete. Returns the result as an Effect. Re-raises the fiber's error if it failed.
Fiber.interrupt(fiber)
Cancels a running Fiber. Resources are cleaned up automatically via finalizers.
Effect.all(effects, { concurrency: 'unbounded' })
Runs effects concurrently. The simple way to do parallelism. Also supports concurrency: number for limits.
Effect.forEach(items, fn, { concurrency: N })
Map over items concurrently with a limit. Like Promise.all(items.map(fn)) but with backpressure.
Effect.race(effects)
Returns the first effect to complete. Others are interrupted automatically.
Effect.forkDaemon(effect)
Fork a fiber that outlives its parent. Runs until the global scope closes or it completes.
Effect.forkScoped(effect)
Fork a fiber tied to the current Scope. Useful for resource-managed background work.
In TypeScript
// TypeScript: manual concurrency with Promise.all
async function fetchAllUsers(ids: string[]) {
  // No concurrency limit — fires everything at once
  const users = await Promise.all(ids.map(id => fetchUser(id)))
  return users
}

// Want a concurrency limit? DIY with chunking or p-limit
import pLimit from "p-limit"
const limit = pLimit(5)
const users = await Promise.all(
  ids.map(id => limit(() => fetchUser(id)))
)

// Cancellation? AbortController per request, wire it yourself
const controller = new AbortController()
const result = await fetch(url, { signal: controller.signal })
// To cancel: controller.abort() — but who cleans up?
With Effect
import { Effect, Fiber } from "effect"

const program = Effect.gen(function* () {
  // Fork a background task — returns immediately
  const fiber = yield* Effect.fork(longRunningJob)

  // Do other work concurrently...
  yield* processItems()

  // Wait for the background task
  const result = yield* Fiber.join(fiber)

  // ── High-level concurrency (preferred) ──

  // Run 10 tasks with concurrency limit (like p-limit but built-in)
  yield* Effect.forEach(urls, fetchUrl, { concurrency: 10 })

  // Run all effects concurrently, collect results
  const [a, b, c] = yield* Effect.all(
    [taskA, taskB, taskC],
    { concurrency: "unbounded" }
  )

  // Race — first to finish wins, others are interrupted
  const fastest = yield* Effect.race(fetchFromCDN, fetchFromOrigin)
})
How it works
  TypeScript / Promises                 Effect Fibers
  ─────────────────────────────         ──────────────────────────────────
  Promise.all([...])                    Effect.all([...], { concurrency })
  ├─ no concurrency limit               ├─ "unbounded" | number | "inherit"
  ├─ one rejects → all lost             ├─ structured error handling
  └─ no cancellation                    └─ losers auto-interrupted

  p-limit / manual chunking             Effect.forEach(items, fn, { concurrency: N })
  ├─ extra dependency                    ├─ built-in, zero deps
  └─ no cancellation                    └─ cancellation + backpressure

  AbortController                       Fiber interruption
  ├─ manual wiring per request           ├─ automatic (parent kills children)
  ├─ easy to forget cleanup              ├─ finalizers run automatically
  └─ no nested cancellation             └─ entire fiber tree interrupted

  new Worker() / worker_threads         Effect.fork / forkDaemon
  ├─ OS-level threads (heavy)            ├─ virtual threads (lightweight)
  ├─ serialization overhead              ├─ shared memory, zero-copy
  └─ manual lifecycle                   └─ auto-supervised (or daemon)
Practice
Parallel fetch with concurrency limit

Fetch a list of URLs concurrently, but limit to 3 at a time. Collect all results.

import { Effect } from "effect"

const fetchUrl = (url: string) =>
  Effect.tryPromise(() => fetch(url).then(r => r.json()))

const urls = ["/api/1", "/api/2", "/api/3", "/api/4", "/api/5"]

// TODO: fetch all URLs with concurrency limited to 3
const program = Effect.???
Reveal solution
import { Effect } from "effect"

const fetchUrl = (url: string) =>
  Effect.tryPromise(() => fetch(url).then(r => r.json()))

const urls = ["/api/1", "/api/2", "/api/3", "/api/4", "/api/5"]

// Effect.forEach maps over items with a concurrency limit
// This is the Effect equivalent of p-limit + Promise.all
const program = Effect.forEach(urls, fetchUrl, { concurrency: 3 })
// Type: Effect<unknown[], UnknownException, never>
Fork, do work, then join

Fork a slow background task, do some immediate work, then wait for the background result.

import { Effect, Fiber } from "effect"

const slowTask = Effect.gen(function* () {
  yield* Effect.sleep("2 seconds")
  return 42
})

const quickTask = Effect.log("doing quick work")

// TODO: fork slowTask, run quickTask, then join the fiber
const program = Effect.gen(function* () {
  // 1. Fork slowTask into a fiber

  // 2. Run quickTask while slowTask runs in background

  // 3. Wait for the fiber and get the result

})
Reveal solution
import { Effect, Fiber } from "effect"

const slowTask = Effect.gen(function* () {
  yield* Effect.sleep("2 seconds")
  return 42
})

const quickTask = Effect.log("doing quick work")

const program = Effect.gen(function* () {
  // 1. Fork slowTask — returns immediately with a Fiber handle
  const fiber = yield* Effect.fork(slowTask)

  // 2. Run quickTask concurrently (slowTask is running in background)
  yield* quickTask

  // 3. Join the fiber — waits for slowTask to finish
  const result = yield* Fiber.join(fiber)
  // result = 42
  yield* Effect.log(`Background task returned: ${result}`)
})
Race two effects

Race two API calls — return whichever responds first, automatically cancel the other.

import { Effect } from "effect"

const fetchFromCDN = Effect.gen(function* () {
  yield* Effect.sleep("100 millis")
  return { source: "cdn", data: "fast" }
})

const fetchFromOrigin = Effect.gen(function* () {
  yield* Effect.sleep("500 millis")
  return { source: "origin", data: "slow" }
})

// TODO: race both — first to finish wins
const program = Effect.???
Reveal solution
import { Effect } from "effect"

const fetchFromCDN = Effect.gen(function* () {
  yield* Effect.sleep("100 millis")
  return { source: "cdn", data: "fast" }
})

const fetchFromOrigin = Effect.gen(function* () {
  yield* Effect.sleep("500 millis")
  return { source: "origin", data: "slow" }
})

// Effect.race returns the first to complete
// The loser is automatically interrupted — no cleanup needed
const program = Effect.race(fetchFromCDN, fetchFromOrigin)
// result: { source: "cdn", data: "fast" }
Common TrapFibers are low-level. For most concurrent work, use Effect.all with concurrency options, or Effect.forEach with concurrency. Only reach for raw Fibers when you need fine-grained control. Also: Effect.fork auto-supervises — the child dies when the parent does. If you need a background task that outlives the parent, use Effect.forkDaemon.
Read docs →
20
Queue, Deferred, Semaphore
Coordination primitives
45 min

When Fibers need to communicate or coordinate, Effect provides Queue (buffered channel), Deferred (one-shot promise), and Semaphore (concurrency limiter). Think of them as the building blocks Go developers get with channels, WaitGroups, and mutexes — but type-safe and composable.

Key InsightQueue = async channel between fibers. Deferred = a promise-like value set once. Semaphore = limit concurrent access to N. These compose with the rest of Effect seamlessly.
What to learn
Queue.bounded<A>(capacity)
Creates a bounded queue. Producers block (suspend the fiber) when full, consumers block when empty. Like a Go buffered channel.
Queue.unbounded / Queue.sliding / Queue.dropping
Other queue strategies. Unbounded = no limit. Sliding = drop oldest when full. Dropping = drop newest when full.
Queue.offer / Queue.take
Add to / take from the queue. Both return Effects. take suspends until an item is available.
Queue.takeUpTo(queue, n) / Queue.takeAll
Batch consume: take up to N items, or drain everything currently available.
Queue.shutdown(queue)
Terminates the queue. All waiting offers/takes are interrupted. Prevents further use.
Deferred.make<A, E>()
Creates a deferred. Like a Promise you can complete from outside, but as an Effect.
Deferred.succeed / Deferred.fail / Deferred.await
Complete a Deferred (once) with success or failure. await suspends until it's completed.
Effect.makeSemaphore(n)
Limits concurrent access. semaphore.withPermits(1)(effect) limits to 1 at a time.
PubSub.bounded<A>(capacity)
Publish/subscribe hub. Multiple subscribers each get every message. Like an EventEmitter but backpressured.
In TypeScript
// TypeScript: manual coordination patterns

// Channel-like behavior? No built-in. Use arrays + polling
const queue: string[] = []
const enqueue = (item: string) => queue.push(item)
const dequeue = () => queue.shift() // undefined if empty — no waiting

// One-shot signal? Wrap a Promise with external resolve
let resolve: (value: string) => void
const deferred = new Promise<string>(r => { resolve = r })
resolve!("done") // easy to misuse, no type safety on completion

// Concurrency limit? p-limit or hand-rolled semaphore
import pLimit from "p-limit"
const limit = pLimit(3)
await Promise.all(tasks.map(t => limit(() => t())))

// Pub/sub? EventEmitter — no backpressure, no type safety
import { EventEmitter } from "events"
const emitter = new EventEmitter()
emitter.on("data", (msg) => { /* hope msg is the right type */ })
emitter.emit("data", { anything: "goes" })
With Effect
import { Effect, Queue, Deferred, Fiber, PubSub } from "effect"

// ── Queue: producer/consumer channel ──
const producerConsumer = Effect.gen(function* () {
  const queue = yield* Queue.bounded<string>(10)

  // Producer fiber — pushes items
  const producer = yield* Effect.fork(
    Effect.forEach(
      ["a", "b", "c"],
      (item) => Queue.offer(queue, item),
      { concurrency: 1 }
    )
  )

  // Consumer — takes items one by one
  const first = yield* Queue.take(queue)  // suspends until available
  const batch = yield* Queue.takeUpTo(queue, 10)  // grab remaining

  yield* Fiber.join(producer)
  yield* Queue.shutdown(queue)
})

// ── Deferred: one-shot signal between fibers ──
const signaling = Effect.gen(function* () {
  const deferred = yield* Deferred.make<string, never>()

  // Worker waits for the signal
  const worker = yield* Effect.fork(
    Effect.gen(function* () {
      const value = yield* Deferred.await(deferred)
      yield* Effect.log(`Got: ${value}`)
    })
  )

  // Main fiber completes the deferred after some work
  yield* Effect.sleep("1 second")
  yield* Deferred.succeed(deferred, "go!")
  yield* Fiber.join(worker)
})

// ── Semaphore: limit concurrent access ──
const rateLimited = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(3)

  // Only 3 API calls run at a time
  yield* Effect.forEach(
    urls,
    (url) => semaphore.withPermits(1)(fetchUrl(url)),
    { concurrency: "unbounded" }
  )
})
How it works
  TypeScript                          Effect
  ─────────────────────────────       ─────────────────────────────────
  Array + polling                     Queue.bounded / unbounded
  ├─ no backpressure                   ├─ producer suspends when full
  ├─ no blocking consume               ├─ consumer suspends when empty
  └─ manual synchronization           └─ fiber-safe, composable

  Promise + external resolve          Deferred.make
  ├─ can be resolved multiple times    ├─ complete once (succeed/fail)
  ├─ no error channel                  ├─ typed success AND error
  └─ no cancellation                  └─ await is interruptible

  p-limit / hand-rolled               Effect.makeSemaphore(n)
  ├─ extra dependency                  ├─ built-in
  ├─ Promise-based only                ├─ works with all Effects
  └─ no cancellation                  └─ withPermits composes

  EventEmitter                        PubSub.bounded
  ├─ no backpressure                   ├─ backpressure built-in
  ├─ any type (unsafe)                 ├─ fully typed messages
  ├─ sync callbacks                    ├─ async fiber-based
  └─ memory leak footgun              └─ shutdown cleans up
Practice
Producer-consumer with Queue

Create a bounded queue. Fork a producer that offers numbers 1-5. In the main fiber, take all 5 items and return them.

import { Effect, Queue, Fiber } from "effect"

const program = Effect.gen(function* () {
  // 1. Create a bounded queue with capacity 10

  // 2. Fork a producer that offers numbers 1 through 5

  // 3. Take 5 items from the queue (hint: use Queue.take in a loop or Effect.forEach)

  // 4. Join the producer fiber

  // 5. Return the collected items
})
Reveal solution
import { Effect, Queue, Fiber } from "effect"

const program = Effect.gen(function* () {
  // 1. Create a bounded queue
  const queue = yield* Queue.bounded<number>(10)

  // 2. Fork producer — offers 1 through 5
  const producer = yield* Effect.fork(
    Effect.forEach(
      [1, 2, 3, 4, 5],
      (n) => Queue.offer(queue, n)
    )
  )

  // 3. Take 5 items — each Queue.take suspends until an item is available
  const items = yield* Effect.forEach(
    [1, 2, 3, 4, 5],
    () => Queue.take(queue)
  )

  // 4. Join the producer
  yield* Fiber.join(producer)

  // 5. Return collected items
  return items // [1, 2, 3, 4, 5]
})
Signal between fibers with Deferred

Use a Deferred to coordinate two fibers. Fiber A waits for a signal, then logs the received value. Fiber B sends the signal after a delay.

import { Effect, Deferred, Fiber } from "effect"

const program = Effect.gen(function* () {
  // 1. Create a Deferred<string, never>

  // 2. Fork Fiber A: await the deferred, then log the value

  // 3. Fork Fiber B: sleep 500ms, then succeed the deferred with "hello"

  // 4. Join both fibers
})
Reveal solution
import { Effect, Deferred, Fiber } from "effect"

const program = Effect.gen(function* () {
  const signal = yield* Deferred.make<string, never>()

  // Fiber A: waits for the signal
  const fiberA = yield* Effect.fork(
    Effect.gen(function* () {
      const value = yield* Deferred.await(signal)
      yield* Effect.log(`Received: ${value}`)
      return value
    })
  )

  // Fiber B: sends the signal after a delay
  const fiberB = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Effect.sleep("500 millis")
      yield* Deferred.succeed(signal, "hello")
    })
  )

  // Wait for both
  const result = yield* Fiber.join(fiberA)
  yield* Fiber.join(fiberB)
  return result // "hello"
})
Rate-limit with Semaphore

Use a Semaphore to limit concurrent API calls to 2 at a time, even though all 5 are forked with unbounded concurrency.

import { Effect } from "effect"

const callApi = (id: number) =>
  Effect.gen(function* () {
    yield* Effect.log(`Start ${id}`)
    yield* Effect.sleep("1 second")
    yield* Effect.log(`Done ${id}`)
    return id
  })

const ids = [1, 2, 3, 4, 5]

// TODO: use a semaphore to limit to 2 concurrent calls
const program = Effect.gen(function* () {
  // 1. Create a semaphore with 2 permits

  // 2. Run all calls with unbounded concurrency, but wrap each in withPermits(1)

})
Reveal solution
import { Effect } from "effect"

const callApi = (id: number) =>
  Effect.gen(function* () {
    yield* Effect.log(`Start ${id}`)
    yield* Effect.sleep("1 second")
    yield* Effect.log(`Done ${id}`)
    return id
  })

const ids = [1, 2, 3, 4, 5]

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(2)

  // All 5 are "launched" at once, but the semaphore
  // ensures only 2 run at any given time
  const results = yield* Effect.forEach(
    ids,
    (id) => semaphore.withPermits(1)(callApi(id)),
    { concurrency: "unbounded" }
  )
  return results // [1, 2, 3, 4, 5] — order preserved
})
Common TrapQueue.take blocks the Fiber (not the thread). This is fine — Fibers are cheap. But don't create a queue and forget to consume it, or you'll leak memory. Also: Deferred can only be completed once — calling succeed/fail again is a no-op (returns false).
Read docs →
ObservabilityStreams