Chapter 4 - Memory Visualization¶
Companion to book/ch04_state_collaboration.md. Runs top-to-bottom in Google Colab in mock mode with no API key required.
# Clone the repo (skip if already present - Colab keeps files across runs in one session)
import os
if not os.path.exists("crafting-agentic-swarms"):
!git clone https://github.com/TheAiSingularity/crafting-agentic-swarms.git
%cd crafting-agentic-swarms
!pip install -e ".[dev]" --quiet
!pip install matplotlib plotly ipywidgets --quiet
import os
try:
from google.colab import userdata
os.environ["ANTHROPIC_API_KEY"] = userdata.get("ANTHROPIC_API_KEY")
print("Using real API (key from Colab secrets).")
except (ImportError, Exception):
os.environ.setdefault("SWARM_MOCK", "true")
print("Running in mock mode (no API key needed).")
# Required for ipywidgets to work in Colab
try:
from google.colab import output
output.enable_custom_widget_manager()
except ImportError:
pass
What you'll build here¶
- Spin up
swarm.memory.store.MemoryStorepointed at a fresh temp directory. - Log 20 fake transcript entries and watch the JSONL file grow.
- Run
consolidate()and inspect the index/topic deltas. - Plot topic count over simulated sessions and use an ipywidgets slider to explore the triple-gate.
1. Set up a throwaway memory store¶
import tempfile, shutil
from pathlib import Path
from swarm.memory.store import MemoryStore
tmp_dir = Path(tempfile.mkdtemp(prefix="swarm_memory_ch04_"))
store = MemoryStore(memory_dir=tmp_dir)
print(f"Memory dir: {tmp_dir}")
print("Initial context:\n" + store.get_context())
2. Log 20 fake transcript entries¶
import asyncio
import random
AGENTS = [("worker", "I handled the retry case."),
("verifier", "Checked edge cases, looks good."),
("dream", "Consolidated 3 new topics."),
("worker", "Refactored the client module."),
("planner", "Outlined step-by-step plan.")]
for i in range(20):
agent_id, base = random.choice(AGENTS)
role = agent_id
await store.log_turn(agent_id=f"{agent_id}_{i % 3}", role=role, content=f"Entry {i}: {base}")
transcripts_dir = tmp_dir / "transcripts"
jsonl_files = sorted(transcripts_dir.glob("*.jsonl"))
print(f"Transcript files: {[f.name for f in jsonl_files]}")
for f in jsonl_files:
lines = f.read_text().splitlines()
print(f" {f.name}: {len(lines)} lines, {f.stat().st_size} bytes")
if lines:
print(f" head: {lines[0][:100]}")
3. Before consolidation - index is empty¶
index_before = (tmp_dir / "MEMORY.md").read_text() if (tmp_dir / "MEMORY.md").exists() else "(file missing)"
print("MEMORY.md BEFORE:\n" + index_before)
print("\nTopic files:", [p.name for p in tmp_dir.glob("*.md") if p.name != "MEMORY.md"])
4. Force a consolidate() in mock mode¶
The triple-gate (5+ sessions, 24h elapsed, no lock) normally blocks consolidate(). In mock mode we push past the gate by bumping the session counter and clearing the time constraint so the dream role runs.
import json, datetime
# Bypass the time gate by writing a stale dream_state.
state_path = tmp_dir / "dream_state.json"
state_path.write_text(json.dumps({
"last_dream": "2020-01-01T00:00:00+00:00",
"session_count": 0,
"last_summary": "",
}))
# Bump the session counter past the 5-session gate.
for _ in range(6):
store.increment_session()
summary = await store.consolidate()
print(f"consolidate() returned: {summary!r}")
5. After consolidation - inspect the diffs¶
index_after = (tmp_dir / "MEMORY.md").read_text() if (tmp_dir / "MEMORY.md").exists() else "(file missing)"
print("MEMORY.md AFTER:")
print(index_after or "(empty)")
topic_files = [p for p in tmp_dir.glob("*.md") if p.name != "MEMORY.md"]
print(f"\nTopic files now: {len(topic_files)}")
for tf in topic_files:
print(f" {tf.name}: {tf.stat().st_size} bytes")
print(" preview:", tf.read_text()[:80].replace("\n", " "))
The mock dream fixture returns a canned summary rather than real JSON upserts, so topic files may not change in mock mode. Real API runs produce a list of upsert + delete operations applied to the index.
6. Topic count growth across simulated sessions¶
import matplotlib.pyplot as plt
import numpy as np
# Model: each session produces 2 candidate topics, autoDream fires every 5 sessions
# and merges/prunes, keeping the growth sub-linear.
sessions = np.arange(0, 30)
naive_growth = sessions * 2 # no consolidation
dreamed_growth = np.array([
# Every 5 sessions, consolidate: keep roughly 70% of unique topics.
int((s * 2) * (1.0 if s < 5 else 0.7 if s < 15 else 0.5)) for s in sessions
])
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(sessions, naive_growth, marker="o", color="#ef4444", label="no autoDream")
ax.plot(sessions, dreamed_growth, marker="s", color="#10b981", label="with autoDream")
ax.set_xlabel("session #")
ax.set_ylabel("topic files")
ax.set_title("Memory footprint - with and without autoDream")
ax.axvspan(5, 15, alpha=0.1, color="#10b981", label="dream zone")
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
7. Triple-gate slider¶
Try the slider. should_dream() returns True only when sessions >= 5 AND hours since last dream >= 24 AND no lock is held. If any one fails, autoDream skips. Move the sliders to see which condition trips.
import ipywidgets as widgets
from IPython.display import display
MIN_SESSIONS = 5
MIN_HOURS = 24
sess_slider = widgets.IntSlider(min=0, max=15, value=3, description="sessions")
hours_slider = widgets.IntSlider(min=0, max=72, value=12, description="hours")
lock_toggle = widgets.Checkbox(value=False, description="lock held?")
out = widgets.Output()
def evaluate(*_):
with out:
out.clear_output()
s, h, locked = sess_slider.value, hours_slider.value, lock_toggle.value
checks = [
("sessions >= 5", s >= MIN_SESSIONS),
("hours >= 24", h >= MIN_HOURS),
("no lock", not locked),
]
for name, ok in checks:
mark = "PASS" if ok else "FAIL"
print(f" [{mark}] {name}")
verdict = "DREAM FIRES" if all(ok for _, ok in checks) else "DREAM SKIPPED"
print(f"\n=> {verdict}")
for w in (sess_slider, hours_slider, lock_toggle):
w.observe(evaluate, names="value")
display(widgets.VBox([sess_slider, hours_slider, lock_toggle, out]))
evaluate()
If the widget manager isn't enabled, the sliders still render but won't update live. The output.enable_custom_widget_manager() call in the setup cell handles this for Colab.
8. Clean up¶
shutil.rmtree(tmp_dir)
print(f"Removed {tmp_dir}")
9. Layer-by-layer read comparison¶
Every session starts by reading the index. Topic files are read lazily. Transcripts are grep-only (searched, never loaded whole). The three layers trade recency for cost: index loads are cheap and fast, transcript scans are slow and only when explicitly requested.
import time
# Re-create a store with some content for timing.
tmp2 = Path(tempfile.mkdtemp(prefix="swarm_mem_timing_"))
store2 = MemoryStore(memory_dir=tmp2)
for i in range(50):
await store2.log_turn(agent_id="a", role="worker", content=f"entry {i}: lorem ipsum sits amet")
# Layer 1: index (hot)
t0 = time.monotonic()
_ = store2.get_context()
index_ms = (time.monotonic() - t0) * 1000
# Layer 2: topic read (warm - no topics yet, so miss)
t0 = time.monotonic()
_ = store2.read_topic("nonexistent.md")
topic_ms = (time.monotonic() - t0) * 1000
# Layer 3: transcript grep (cold)
t0 = time.monotonic()
_ = await store2.search_transcripts("lorem", days_back=1)
grep_ms = (time.monotonic() - t0) * 1000
print(f"Layer 1 (index read): {index_ms:.2f} ms")
print(f"Layer 2 (topic miss): {topic_ms:.2f} ms")
print(f"Layer 3 (grep): {grep_ms:.2f} ms (50 entries)")
shutil.rmtree(tmp2)
fig, ax = plt.subplots(figsize=(6.5, 3.5))
layers = ["L1 index", "L2 topic", "L3 grep"]
times = [index_ms, topic_ms, grep_ms]
bars = ax.bar(layers, times, color=["#10b981", "#3b82f6", "#ef4444"])
for bar, val in zip(bars, times):
ax.text(bar.get_x() + bar.get_width() / 2, val, f"{val:.2f} ms", ha="center", va="bottom")
ax.set_ylabel("ms")
ax.set_title("Access time by memory layer")
plt.tight_layout()
plt.show()
Index reads are roughly free - you can afford to load them into every system prompt. Grep is ~orders of magnitude slower and only makes sense when the agent explicitly asks. Topic reads sit between.
10. autoDream failure modes¶
Dream cycles can fail for three reasons: (a) the LLM returns malformed JSON, (b) the lock is held by another process, (c) the triple-gate fires but no meaningful consolidation is possible. The code handles each; here's the decision table.
table = [
("malformed JSON from LLM", "return \"parse_error\"", "Log, retry next cycle"),
("LLM call itself fails", "return \"llm_error\"", "Log, retry next cycle"),
("lock already held", "return \"skipped\"", "Another process is dreaming"),
("stale lock (>5 min)", "remove lock, proceed", "Previous process crashed"),
("gate fails (time/sess)", "return None", "Not yet time to dream"),
]
print(f" {'condition':<28s} {'outcome':<28s} {'meaning':<30s}")
print(f" {'-' * 28} {'-' * 28} {'-' * 30}")
for cond, outcome, meaning in table:
print(f" {cond:<28s} {outcome:<28s} {meaning:<30s}")
11. Memory budget¶
Index lines are capped at 150 chars, and the whole index at 200 lines (= 30,000 chars ceiling, approx 7-8K tokens). That keeps session-start cost under ~$0.025 even at Opus pricing.
from swarm.core.models import MEMORY_CAP_BYTES, MEMORY_INDEX_LINE_LIMIT, MODEL_PRICING as MP
p = MP["claude-opus-4-6"]
worst_case_tokens = MEMORY_CAP_BYTES // 4 # rough chars-per-token
cost_per_load = worst_case_tokens * p["input"] / 1_000_000
print(f"Memory cap: {MEMORY_CAP_BYTES} bytes = {MEMORY_CAP_BYTES / 1024:.1f} KB")
print(f"Per-line limit: {MEMORY_INDEX_LINE_LIMIT} chars")
print(f"Worst-case load: ~{worst_case_tokens} tokens -> ${cost_per_load:.5f} per session @ Opus")
print(f"With caching: ~${worst_case_tokens * p['cache_read'] / 1_000_000:.5f} after first load")
Takeaways¶
- Three layers: Index (hot), Topic files (warm), Transcripts (cold / JSONL).
- autoDream consolidates episodic transcripts into semantic topics behind a triple-gate so it never fires mid-workflow.
- Without consolidation memory grows linearly; with it, growth plateaus.
- Layer access time spans orders of magnitude - keep the hot path small.
- Budgets keep session-start cost bounded even at Opus pricing, and caching pushes it toward zero.