Skip to content

Appendix: DAG Orchestration#

Why fork-join isn't enough#

Chapter 06 builds a fork-join orchestrator: the planner emits N independent work units, they all run in one asyncio.gather, and a verifier reduces. That is the right shape for parallel code reviews, parallel research queries, parallel refactors, anything where the branches are peers.

Real workflows are not that clean. Most have at least one of three complications fork-join does not express:

  1. Sequential dependencies inside a branch: plan produces a design, implementation depends on the design, review depends on the implementation. A single gather cannot encode that chain.
  2. Mixed serial and parallel stages: once implementation lands, review and test can happen in parallel, but both must wait for implementation, and the fix stage must wait for both.
  3. Partial failure isolation: one branch failing should not cancel unrelated branches. asyncio.gather(return_exceptions=True) handles exceptions but does not track which downstream tasks should be skipped.

The general shape is a directed acyclic graph. Nodes are async functions; edges are dependencies. swarm/batch/dag_executor.py is the minimal implementation.

The DAG class#

Three objects: Node (the unit of work), DAG (the graph), and the run() method that executes it.

@dataclass
class Node:
    name: str
    func: Callable[..., Awaitable[Any]]
    depends_on: list[str] = field(default_factory=list)
    kwargs: dict[str, Any] = field(default_factory=dict)

class DAG:
    def add(self, node: Node) -> "DAG": ...
    def validate(self) -> None: ...                     # CycleError, MissingDep
    async def run(self, bus: HookBus | None = None) -> dict[str, Any]: ...

Internally run() does Kahn's topological sort and executes one "level" at a time. A level is the set of nodes whose dependencies are all satisfied. Every level runs through a single asyncio.gather, so independent nodes on the same level are concurrent and a level waits for its predecessors.

When bus is supplied, the executor emits dag_node_start, dag_node_complete, dag_node_failed, and dag_complete events. This gives you the same audit trail you get from Chapter 11's production daemon.

Worked example: multi-stage code review#

Imagine the flow:

            plan
              |
         implement
          /      \
      review    test
          \      /
            fix
              |
           approve

plan produces a design document. implement writes the code. review and test both consume the implementation; they are independent, so they run in parallel. fix depends on both reports. approve finalizes.

from swarm.batch.dag_executor import DAG, Node

async def plan(goal: str) -> str: ...
async def implement(design: str) -> str: ...
async def review(code: str) -> str: ...
async def test(code: str) -> str: ...
async def fix(review: str, test: str) -> str: ...
async def approve(patch: str) -> str: ...

dag = (
    DAG()
    .add(Node("plan", plan, kwargs={"goal": "add undo button"}))
    .add(Node("impl", implement, depends_on=["plan"]))
    .add(Node("review", review, depends_on=["impl"]))
    .add(Node("test", test, depends_on=["impl"]))
    .add(Node("fix", fix, depends_on=["review", "test"]))
    .add(Node("approve", approve, depends_on=["fix"]))
)

results = await dag.run(bus=bus)

The DAG runs through four levels: plan alone, then impl alone, then review and test in parallel (level 3 is the diamond), then fix, then approve. Results come back keyed by node name.

The kwargs field is the static input. For dynamic wiring (feeding one node's output to another's input), keep that coordination outside the DAG, either via shared state or by composing higher-level nodes. Keeping the executor dumb is deliberate: the moment it knows about data flow, it becomes a workflow engine, and then you are writing Airflow.

Failure isolation#

When a node raises, the executor catches the exception, stores {"error": str(e)} under that node's name, and marks its dependents blocked with {"error": "blocked", "blocked_by": <ancestor>}. Sibling branches that do not depend on the failed node keep running.

results = await dag.run()
if "review" in results and "error" in results["review"]:
    # review failed; `fix` and `approve` came back blocked; `test` ran fine
    handle_partial(results)

This is the behavior you want for long-running agent pipelines. One tool timeout on a side branch should not kill the work already committed on the other branches.

When to build your own vs use a framework#

DAG in 120 lines covers the 80% case: fan-out/fan-in, diamond topologies, partial failure, hook integration. You do not need Airflow, Prefect, or Temporal for most agent workflows.

Reach for a real workflow engine when:

  • You need durability across processes. If a node takes hours and the host crashes, you want checkpointing, a scheduler, and a database-backed state machine. That is what Temporal and Prefect solve.
  • You need backfill and versioning. Airflow was built for data pipelines where you re-run historical windows; agent pipelines rarely need that.
  • You need cross-language DAGs. If Python nodes must hand off to Go or Rust workers, use a protocol-level system (Temporal, gRPC workflows). Do not invent one.
  • You need a UI your team will actually use. Staring at a Gantt chart matters for a data team. For an agent team, logs plus hook events are usually enough.

If none of those apply, the 120-line DAG is cheaper to own, debug, and extend.

Making it durable#

The DAG above is in-memory. A crash loses state. To make it durable, persist two things on every dag_node_complete event:

  1. Results map: the dict[str, Any] of completed nodes, written atomically to disk (the same pattern Chapter 08's daemon uses for checkpointing).
  2. Remaining set: the names of nodes not yet started.

On restart, load both, subtract the completed set from the full node list, and call run() on the reduced DAG. The hook event gives you the exact place to wire this in.

async def persist_checkpoint(payload: dict) -> None:
    state["completed"].add(payload["name"])
    state["results"][payload["name"]] = payload.get("result")
    atomic_write(checkpoint_path, json.dumps(state))

bus.on("dag_node_complete", persist_checkpoint)

The same pattern from Chapter 08 (write-before-commit, atomic renames, recovery on startup) applies unchanged. The DAG does not need to know about durability; the hook does.

This is the production shape: a small, composable executor plus a durability hook. When you outgrow it, you will already be modelling your work as a graph, and the migration to Temporal or Prefect is a port, not a rewrite.