Chapter 06 — Fork-Join Orchestration¶
Companion to book/ch06_*.md. Runs top-to-bottom in Google Colab in mock mode with no API key required.
import os
if not os.path.exists("crafting-agentic-swarms"):
!git clone https://github.com/TheAiSingularity/crafting-agentic-swarms.git
%cd crafting-agentic-swarms
!pip install -e ".[dev]" --quiet
!pip install matplotlib plotly ipywidgets --quiet
import os
try:
from google.colab import userdata
os.environ["ANTHROPIC_API_KEY"] = userdata.get("ANTHROPIC_API_KEY")
print("Using real API (key from Colab secrets).")
except (ImportError, Exception):
os.environ.setdefault("SWARM_MOCK", "true")
print("Running in mock mode (no API key needed).")
What you'll build here¶
- Plan a goal into independent work units with the orchestrator.
- Run 5 workers in parallel via
asyncio.gatherandrun_fork_join. - Visualise per-worker timing as a broken_barh gantt chart.
- Compare parallel vs serial cost, including cache-inheritance savings.
1. Why fork-join¶
Serial agents accumulate latency like tar on a highway. Fan out independent work, join the results, and the wall-clock shrinks to the slowest single worker. The asyncio.gather primitive is a one-liner; the interesting engineering is in failure isolation and cache-aware cost math.
2. Import the fork-join primitives¶
modules/08_orchestrator_workers/code/swarm.py holds the student-facing reference implementation. It uses the same call_llm abstraction as the rest of the course.
import sys
from pathlib import Path
module_path = Path("modules/08_orchestrator_workers/code")
sys.path.insert(0, str(module_path))
from swarm import WorkUnit, run_fork_join, research_and_plan, run_moa, run_worker
from client import call_llm # M02 provider client
print("fork-join primitives loaded")
3. Plan a goal into work units¶
The orchestrator (in live mode, a large model like Opus) decomposes a goal into small independent units. In mock mode it returns a fixed set of three units. Each unit carries a title, description, and acceptance criteria — enough for a worker to act on alone.
goal = "Build a Python CLI tool that counts words in a file and reports the top 10 most frequent words."
units = await research_and_plan(goal, model="claude-haiku-4-5-20251001", call_llm_fn=call_llm)
for u in units:
print(f"[{u.id}] {u.title}")
print(f" {u.description[:70]}")
4. Pad to 5 workers for the chart¶
We add two extra units so the gantt chart has enough rows to be visually interesting. In a real swarm the orchestrator decides the count; 5 is a common sweet spot for CLI tools.
extra = [
WorkUnit(id="unit_4", title="README", description="Write README",
acceptance="Has usage example"),
WorkUnit(id="unit_5", title="Tests", description="Add pytest cases",
acceptance="Covers edge cases"),
]
all_units = units + extra
print(f"{len(all_units)} workers queued")
5. Run the workers in parallel¶
run_fork_join wraps asyncio.gather with return_exceptions=True so a single worker failure does not cancel the others. Time the whole block to compare against a serial baseline.
import time
t0 = time.monotonic()
completed = await run_fork_join(
all_units, goal=goal, conventions="Python 3.11",
model="claude-haiku-4-5-20251001", call_llm_fn=call_llm,
)
parallel_ms = int((time.monotonic() - t0) * 1000)
for u in completed:
print(f"[{u.status}] {u.title:<20} latency={u.latency_ms}ms")
print(f"Total wall-clock: {parallel_ms}ms")
6. Gantt chart of worker timing¶
With parallel execution, all workers start at roughly t=0 and finish independently. A broken_barh chart makes the overlap obvious: the overall duration is the single slowest worker, not the sum of everyone.
import matplotlib.pyplot as plt
import random
random.seed(1)
fig, ax = plt.subplots(figsize=(9, 4))
yticks = []
ylabels = []
for i, u in enumerate(completed):
# Mock mode doesn't track real start times — synthesise a stagger for the chart.
start = random.uniform(0, 20)
ax.broken_barh([(start, u.latency_ms)], (i * 10, 8), facecolors="#3d7eff")
yticks.append(i * 10 + 4)
ylabels.append(u.title[:30])
ax.set_yticks(yticks)
ax.set_yticklabels(ylabels)
ax.set_xlabel("Time (ms)")
ax.set_title("Fork-join worker timing (parallel)")
ax.grid(axis="x", alpha=0.3)
plt.tight_layout()
plt.show()
7. Serial comparison¶
Run the same workers one at a time to establish the baseline. Parallel wall-clock should be close to the single slowest worker; serial wall-clock is the sum of everyone.
from dataclasses import replace
serial_units = [replace(u, status="pending", output="", error="", latency_ms=0) for u in all_units]
t0 = time.monotonic()
for u in serial_units:
await run_worker(
u, goal=goal, conventions="Python 3.11",
model="claude-haiku-4-5-20251001", call_llm_fn=call_llm,
)
serial_ms = int((time.monotonic() - t0) * 1000)
print(f"Serial: {serial_ms}ms | Parallel: {parallel_ms}ms | speedup: {serial_ms / max(parallel_ms, 1):.2f}x")
8. Speedup bar chart¶
Visual comparison. The bar ratio should roughly match the worker count; in practice it is a bit lower because of coordination overhead and unbalanced workers.
fig, ax = plt.subplots(figsize=(6, 3.5))
ax.bar(["Serial", "Parallel"], [serial_ms, parallel_ms], color=["#888", "#3d7eff"])
ax.set_ylabel("Wall-clock (ms)")
ax.set_title("Serial vs parallel wall-clock")
for i, v in enumerate([serial_ms, parallel_ms]):
ax.text(i, v + 5, str(v), ha="center")
plt.tight_layout()
plt.show()
9. Cache-inheritance cost model¶
When every worker shares the same system prompt prefix, Claude's prompt cache charges the first worker full price and the remaining workers 10 percent of input tokens on that prefix. We model the savings below using current Sonnet pricing.
# April-2026 Sonnet rates, per million tokens
SYSTEM_TOKENS = 2000 # shared prefix across all workers
UNIT_TOKENS = 300 # unique per-unit tail
OUTPUT_TOKENS = 500
PRICE_IN = 3.00
PRICE_CACHE_READ = 0.30
PRICE_OUT = 15.00
def cost_no_cache(n_workers: int) -> float:
return (n_workers * (SYSTEM_TOKENS + UNIT_TOKENS) * PRICE_IN
+ n_workers * OUTPUT_TOKENS * PRICE_OUT) / 1_000_000
def cost_with_cache(n_workers: int) -> float:
first = ((SYSTEM_TOKENS + UNIT_TOKENS) * PRICE_IN
+ OUTPUT_TOKENS * PRICE_OUT) / 1_000_000
rest_in = (n_workers - 1) * (SYSTEM_TOKENS * PRICE_CACHE_READ + UNIT_TOKENS * PRICE_IN) / 1_000_000
rest_out = (n_workers - 1) * OUTPUT_TOKENS * PRICE_OUT / 1_000_000
return first + rest_in + rest_out
for n in [1, 2, 4, 5, 8]:
a = cost_no_cache(n)
b = cost_with_cache(n)
print(f"n={n} no_cache=${a:.4f} with_cache=${b:.4f} savings={(a-b)/a*100:5.1f}%")
10. Parallelism vs cost curve¶
More workers shorten wall-clock but trade off against coordination overhead. The cache-aware cost curve is sub-linear thanks to the shared prefix. This is why Anthropic engineers push hard on shared system prompts in their orchestrators.
ns = list(range(1, 9))
no_cache = [cost_no_cache(n) for n in ns]
with_cache = [cost_with_cache(n) for n in ns]
fig, ax = plt.subplots(figsize=(8, 4.5))
ax.plot(ns, no_cache, "-o", label="Without prompt cache")
ax.plot(ns, with_cache, "-o", label="With prompt cache (shared prefix)")
ax.set_xlabel("Number of parallel workers")
ax.set_ylabel("Total cost (USD)")
ax.set_title("Fan-out cost: cache inheritance pays off")
ax.legend()
ax.grid(alpha=0.3)
plt.show()
11. Mixture of agents¶
Fan out the same question to N workers, then aggregate. In mock mode the answer is deterministic; in live mode different models bring different inductive biases and the aggregator picks the strongest signal. Useful when one model alone misses edge cases.
answer = await run_moa(
"What is the best sorting algorithm for nearly-sorted data?",
models=["claude-haiku-4-5-20251001", "claude-sonnet-4-6"],
aggregator_model="claude-haiku-4-5-20251001",
call_llm_fn=call_llm,
)
print(answer)
12. Status distribution across a swarm¶
Aggregate the final status across completed units. In mock mode every worker completes; with a real API failures show up and you want to surface them immediately.
from collections import Counter
counts = Counter(u.status for u in completed)
print(counts)
fig, ax = plt.subplots(figsize=(5, 3))
ax.bar(counts.keys(), counts.values(), color="#3d7eff")
ax.set_title("Worker status distribution")
plt.show()
13. Interactive cost explorer¶
A plotly line chart lets the learner hover to read exact savings at any N.
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=ns, y=no_cache, name="No cache", mode="lines+markers"))
fig.add_trace(go.Scatter(x=ns, y=with_cache, name="With cache", mode="lines+markers"))
fig.update_layout(title="Fan-out cost comparison",
xaxis_title="workers", yaxis_title="cost USD")
fig.show()
14. When fork-join hurts¶
Fork-join assumes independent units. If a unit depends on another's output, the coordination overhead usually swamps the gain. Common failure mode: the planner emits units that share state, so workers produce inconsistent files. Watch for: shared filenames, shared fixtures, shared DB tables. When you see this, collapse the two units into one and rerun.
15. What to try next¶
- Add a real API key and rerun cell 5. Observe actual worker latencies.
- Increase
all_unitsto 10. Does wall-clock stay flat? Does cost with cache hold up? - Swap in the
run_swarmentry point fromswarm.batch.executorfor the full 4-phase pipeline. - Trigger a failure by raising in one worker; confirm the others still complete.