How I Built a Complete Agent Tool Framework to Understand LangGraph Internals using pure python
A deep dive into ai agents frame work built in pure python using __init_subclass__, data descriptors, ParamSpec decorators, Protocol structural typing, context manager sessions, generator streaming, and TypedDict contracts — all in one real project

Most people learning "agentic AI" learn it from the outside. They use LangGraph, CrewAI, and PydanticAI. They call the API. They get results. They never ask what's happening inside.
I'm going through a 16-week Agentic AI roadmap right now. Week 1 was meant to be the easy warm-up — Python fundamentals. Instead I built a complete mini-framework that mirrors how real agent systems actually organize their internals: a self-registering tool catalog, descriptor-validated config, mixin-composed base classes, ParamSpec decorators, a Protocol-based type contract, TypedDict runtime data shapes, a context-managed execution session, generator streaming, and a unit test suite.
No external dependencies. No LLM calls. Just the machinery.
Here's the full deep dive.
Why model internals instead of using the real thing?
Because using LangGraph tells you what it does. Building a simplified version tells you why it's built that way.
After writing this framework, I now understand why:
__init_subclass__is better than a manual tool registry dictDescriptors beat validation in
__init__for config objectsProtocolkeeps a framework extensible without forcing inheritanceTypedDictover plain dicts is not pedantry — it's early error detectionContext managers are the right shape for session lifecycle, not try/finally spread across call sites
These aren't abstract lessons. I hit the wrong version of each one first, then refactored.
The full architecture
agent_cli/
├── core/
│ ├── base.py ← BaseTool with __init_subclass__ and run()
│ ├── registry.py ← ToolRegistry: global dict + safety guards
│ ├── config.py ← ToolConfig: descriptor-backed slotted class
│ ├── mixins.py ← LoggingMixin, RetryMixin, MetricsMixin
│ ├── session.py ← ExecutionSession context manager
│ ├── metrics.py ← ToolMetrics slotted counter object
│ ├── protocols.py ← ToolProtocol structural interface
│ ├── types.py ← ToolContext, ToolOutput, ToolMetadata TypedDicts
│ └── exceptions.py ← Typed exception hierarchy
├── tools/
│ ├── search.py ← SearchTool (keyword scoring, streaming)
│ ├── summarize.py ← SummarizeTool (extractive, streaming)
│ └── translate.py ← TranslateTool (lexicon lookup, streaming)
├── decorators/
│ ├── tooling.py ← @tool: discovery metadata attachment
│ └── execution.py ← @log_execution, @measure_time (ParamSpec)
├── descriptors/
│ └── fields.py ← ValidatedField hierarchy
├── cli/
│ └── app.py ← argparse interface: list-tools, describe, run
└── tests/
└── test_framework.py
Let's go layer by layer, with the real code.
Layer 1: Self-registering tools
The central design question for any tool system: how does the runtime know what tools exist?
The wrong answer is a manually maintained dict. Any time humans maintain a list in parallel with a class hierarchy, the list will drift.
The right answer is __init_subclass__ — a Python hook that fires at class definition time:
class BaseTool(LoggingMixin, RetryMixin, MetricsMixin, ABC):
__slots__ = ("_metrics", "config")
_tool_name: ClassVar[str]
_tool_tags: ClassVar[tuple[str, ...]] = ()
_tool_examples: ClassVar[tuple[str, ...]] = ()
_streamable: ClassVar[bool] = False
description: ClassVar[str] = ""
def __init_subclass__(
cls,
*,
tool_name: str | None = None,
description: str = "",
streamable: bool = False,
abstract: bool = False,
**kwargs: Any,
) -> None:
super().__init_subclass__(**kwargs)
if abstract:
return
if tool_name is None:
raise TypeError(f"{cls.__name__} must define tool_name='...'")
normalized_name = tool_name.strip().lower()
if not normalized_name:
raise ToolValidationError(f"{cls.__name__} received an empty tool name")
cls._tool_name = normalized_name
cls.description = description.strip()
cls._streamable = streamable
ToolRegistry.register(normalized_name, cls)
A few important design choices here:
The abstract=True escape hatch. Without it, any intermediate abstract base class in your hierarchy would try to register as a real tool and fail (no tool_name). This is what lets you build multi-level hierarchies cleanly.
Normalization at registration time. tool_name.strip().lower() means you can't accidentally register "Search" and "search" as different tools. Case-insensitive by design.
TypeError vs ToolValidationError. Missing tool_name= is a programming error (wrong class definition), so it gets TypeError. Empty string after stripping is a validation error (bad value), so it gets the domain exception. That distinction matters when you're debugging.
The ToolRegistry is intentionally a global class-level singleton:
class ToolRegistry:
__slots__ = ()
_tools: dict[str, type[Any]] = {}
@classmethod
def register(cls, name: str, tool_cls: type[Any]) -> None:
existing = cls._tools.get(name)
if existing is not None and existing is not tool_cls:
raise DuplicateToolError(
f"tool name {name!r} is already registered by {existing.__name__}"
)
cls._tools[name] = tool_cls
@classmethod
def create(cls, name: str, **kwargs: Any) -> ToolProtocol:
tool_cls = cls.get(name)
return cast(ToolProtocol, tool_cls(**kwargs))
@classmethod
def names(cls) -> tuple[str, ...]:
return tuple(sorted(cls._tools))
Sorted output from names() and items() is deliberate — deterministic ordering matters for tests and for CLI output that shouldn't flicker between runs.
The clear() method exists for tests that need to isolate registry state. It mirrors how plugin systems expose reset mechanisms for test isolation.
Layer 2: Descriptor-backed configuration
Most config validation I see lives in __init__. The problem: the error fires at instantiation time, inside whatever code is trying to build the config, which is usually three layers deep.
Python data descriptors fire at assignment time — the moment you write config.retries = 9:
class ValidatedField(Generic[T]):
__slots__ = ("default", "private_name", "public_name")
def __set_name__(self, owner: type, name: str) -> None:
self.public_name = name
self.private_name = f"_{name}"
def __set__(self, instance: object, value: Any) -> None:
setattr(instance, self.private_name, self.validate(value))
def __get__(self, instance, owner):
if instance is None:
return self
if hasattr(instance, self.private_name):
return getattr(instance, self.private_name)
if self.default is not _MISSING:
return self.default
raise AttributeError(f"{self.public_name} has not been configured")
The private_name storage (e.g. _retries) works cleanly with slotted classes. This is important — ToolConfig uses __slots__ to prevent accidental attribute creation on config objects, which is exactly the kind of silent bug that slips through in dynamic languages.
The concrete field types build a small hierarchy:
ValidatedField
├── NonEmptyString (strips, rejects empty)
│ └── IdentifierField (regex: ^[a-z][a-z0-9_-]*$)
├── IntegerRange (bounds-checked int, rejects bool)
├── FloatRange (bounds-checked float, coerces int)
└── BooleanField (strict: rejects int 0/1 as bool)
IdentifierField uses re.compile(r"^[a-z][a-z0-9_-]*$").fullmatch() — the same pattern you'd use for a CLI tool name or an API route segment. IntegerRange.validate() checks isinstance(value, bool) first because in Python, bool is a subclass of int, so True would pass an integer check without the explicit guard.
ToolConfig composes them:
class ToolConfig:
__slots__ = ("_retries", "_streaming_enabled", "_timeout", "_tool_name")
tool_name: str = IdentifierField()
retries: int = IntegerRange(0, 5, default=1)
timeout: float = FloatRange(0.1, 120.0, default=10.0)
streaming_enabled: bool = BooleanField(default=True)
@property
def max_attempts(self) -> int:
return self.retries + 1
@property
def reliability_profile(self) -> str:
if self.retries >= 3 and self.timeout >= 15:
return "resilient"
if self.retries == 0:
return "fast-fail"
return "balanced"
reliability_profile is a computed property exposed for observability — logging "search config (resilient)" is more useful than logging the raw numbers. The as_dict() method serializes everything including computed properties for inspection or future persistence.
Layer 3: Mixins composed through MRO
The base class signature:
class BaseTool(LoggingMixin, RetryMixin, MetricsMixin, ABC):
Python resolves this left to right via C3 linearization. The MRO you get:
BaseTool → LoggingMixin → RetryMixin → MetricsMixin → ABC → object
All three mixins are __slots__ = () — no instance variables, just methods. This is critical. If a mixin carries instance state, you start getting MRO conflicts when the instance dict and __slots__ from multiple parents collide. Pure-behavior mixins avoid this entirely.
LoggingMixin exposes self.log(message) and a self.name property that falls back to the class name if _tool_name hasn't been set yet (useful in intermediate abstract classes):
class LoggingMixin:
__slots__ = ()
@property
def name(self) -> str:
return getattr(self, "_tool_name", self.__class__.__name__.lower())
def log(self, message: str) -> None:
FrameworkLogger.info(f"{self.name}: {message}")
RetryMixin takes any Callable[[], R] and wraps it with retry semantics from self.config.max_attempts. No coupling to specific exception types — it catches Exception broadly and lets the caller decide what to pass:
def with_retries(self, operation: Callable[[], R]) -> R:
last_error: Exception | None = None
for attempt in range(1, self.config.max_attempts + 1):
try:
return operation()
except Exception as error:
last_error = error
FrameworkLogger.warning(
f"{self.config.tool_name}: attempt {attempt}/{self.config.max_attempts} failed: {error}"
)
raise last_error
MetricsMixin exposes self.record_metric(duration_ms=..., failed=...) backed by ToolMetrics:
class ToolMetrics:
__slots__ = ("failures", "runs", "total_duration_ms")
@property
def average_duration_ms(self) -> float:
return self.total_duration_ms / len(self) if len(self) > 0 else 0.0
def record(self, *, duration_ms: float, failed: bool = False) -> None:
self.runs += 1
self.total_duration_ms += duration_ms
if failed:
self.failures += 1
Every execution records its outcome in finally inside BaseTool.run(), so metrics are always captured — including on failure.
Layer 4: ParamSpec decorators
Decorators that naively use *args, **kwargs erase the wrapped function's type signature. mypy sees Callable[..., Any]. Calling tool.run("input", stream="yes") passes type checking incorrectly.
ParamSpec captures the parameter specification:
P = ParamSpec("P")
R = TypeVar("R")
def log_execution(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
owner = args[0].__class__.__name__ if args else func.__module__
FrameworkLogger.info(f"starting {owner}.{func.__name__}")
try:
result = func(*args, **kwargs)
except Exception as error:
FrameworkLogger.error(f"failed {owner}.{func.__name__}: {error}")
raise
FrameworkLogger.info(f"finished {owner}.{func.__name__}")
return result
return wrapper
P.args and P.kwargs tell mypy: this wrapper accepts exactly the same parameters as func. Stack both decorators on run():
@log_execution
@measure_time
def run(self, raw_input: str, *, stream: bool = False, session_id: str = "standalone") -> ToolOutput:
...
mypy sees the full run() signature through both wrappers. Passing stream="yes" is caught statically. This is how production frameworks layer instrumentation without breaking their public type interfaces.
The @tool class decorator (from tooling.py) is simpler — it attaches discovery metadata after __init_subclass__ has already handled registration:
def tool(*, tags=(), examples=()):
def decorate(cls):
cls._tool_tags = tuple(tag.strip().lower() for tag in tags if tag.strip())
cls._tool_examples = tuple(e.strip() for e in examples if e.strip())
return cls
return decorate
Separation of concerns: __init_subclass__ owns registration, @tool owns discovery metadata.
Layer 5: ExecutionSession
The context manager handles session lifecycle — timing, resource tracking, and cleanup:
class ExecutionSession:
__slots__ = ("_closed", "_resources", "_started_at", "session_id")
def __init__(self, session_id=None):
self.session_id = session_id or f"session-{uuid4().hex[:10]}"
self._resources: list[str] = []
self._started_at = 0.0
self._closed = False
def __enter__(self) -> ExecutionSession:
self._started_at = perf_counter()
FrameworkLogger.info(f"session {self.session_id} started")
return self
def __exit__(self, exc_type, exc_value, traceback) -> bool:
self.cleanup()
duration_ms = (perf_counter() - self._started_at) * 1000
if exc_value is None:
FrameworkLogger.info(f"session {self.session_id} ended in {duration_ms:.2f} ms")
else:
FrameworkLogger.error(f"session {self.session_id} failed after {duration_ms:.2f} ms: {exc_value}")
return False # don't suppress exceptions
cleanup() releases resources in reversed order — reversed(self._resources) — matching LIFO semantics for things like database connections and locks. The _closed guard prevents double-cleanup if cleanup() is called both explicitly and through __exit__.
return False in __exit__ means exceptions propagate normally. The session logs the failure but doesn't swallow it — the right behavior for a framework that production code will build on.
The session_id flows into ToolContext and propagates all the way out through ToolOutput, making every result traceable.
Layer 6: TypedDicts and Protocol
Three TypedDicts carry all data through the framework:
class ToolContext(TypedDict):
raw_input: str
session_id: str
metadata: dict[str, str]
class ToolOutput(TypedDict):
tool: str
content: str
tokens: list[str]
duration_ms: float
session_id: str
class ToolMetadata(TypedDict):
name: str
description: str
tags: tuple[str, ...]
examples: tuple[str, ...]
streamable: bool
These are not documentation. mypy enforces them. Misuse a key or type anywhere in the system and it's caught statically.
ToolProtocol defines the structural interface the registry exposes:
@runtime_checkable
class ToolProtocol(Protocol):
@property
def name(self) -> str: ...
@property
def metadata(self) -> ToolMetadata: ...
def execute(self, context: ToolContext) -> str: ...
def stream(self, context: ToolContext) -> Iterator[str]: ...
def run(self, raw_input: str, *, stream: bool, session_id: str) -> ToolOutput: ...
@runtime_checkable means you can use isinstance(tool, ToolProtocol) at runtime too. The CLI and tests use ToolRegistry.create() which returns ToolProtocol — they never import BaseTool directly. This is the boundary that keeps the framework extensible.
What the test suite checks
# Registration happens automatically from import
def test_builtin_tools_register_automatically(self) -> None:
self.assertEqual(("search", "summarize", "translate"), ToolRegistry.names())
# Descriptor validation fires at construction time
def test_invalid_config_fails_fast(self) -> None:
with self.assertRaises(ValueError):
ToolConfig(tool_name="Bad Name", retries=1, timeout=10.0)
# Per-instance config override works cleanly
def test_streaming_can_be_disabled_per_instance(self) -> None:
tool = ToolRegistry.create("search", streaming_enabled=False)
with self.assertRaises(StreamNotSupportedError):
tool.run("descriptors", stream=True)
# Streaming path produces correct output
def test_streaming_uses_generator_tokens(self) -> None:
tool = ToolRegistry.create("translate")
result = tool.run("es::hello agent framework", stream=True)
self.assertEqual("es: hola agente marco", result["content"])
What this maps to in the real world
| This framework | Production equivalent |
|---|---|
ToolRegistry + __init_subclass__ |
LangGraph's tool catalog, CrewAI's task registry |
ToolProtocol |
PydanticAI's Tool protocol |
ToolContext / ToolOutput |
LangGraph's State TypedDicts |
@tool decorator |
LangChain's @tool / @structured_tool |
ToolConfig + descriptors |
Pydantic model validation in PydanticAI |
ExecutionSession |
LangGraph's graph execution context |
| Generator streaming | LLM streaming token iteration |
ToolMetrics |
LangSmith / tracing layers |
Each week I'll extends this same codebase. By Week 16, it should be a coherent system that grew with the syllabus — not a collection of throwaway experiments.