Current State Management Analysis in Pipeline_ex
Overview
Pipeline_ex currently uses a flexible but unstructured approach to state management, relying primarily on Elixir maps and custom modules. This document analyzes the current implementation and compares it with LangGraph’s Pydantic-based approach.
Current State Architecture
1. Core State Container: The Context Map
The execution context is a plain Elixir map containing:
%{
# Core execution state
workflow_name: String.t(),
step_index: integer(),
results: %{String.t() => any()},
# Variable management
variable_state: %VariableEngine{},
global_vars: %{String.t() => any()},
# Infrastructure
workspace_dir: String.t(),
output_dir: String.t(),
checkpoint_dir: String.t(),
# Execution tracking
execution_log: [map()],
start_time: DateTime.t(),
# Configuration
config: map(),
debug_enabled: boolean()
}
2. Variable State Management
The VariableEngine
provides a three-tier scoping system:
%Pipeline.State.VariableEngine{
global: %{}, # Pipeline-wide
session: %{}, # Session-scoped
loop: %{}, # Loop-iteration scoped
current_step: String.t() | nil,
step_index: integer()
}
3. Result Storage
Results are stored in a ResultManager
:
%Pipeline.ResultManager{
results: %{String.t() => any()},
metadata: %{
created_at: DateTime.t(),
last_updated: DateTime.t()
}
}
Comparison with LangGraph’s Approach
LangGraph State Management
LangGraph uses Pydantic models for state definition:
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next_agent: str
# Type-safe access throughout the graph
def agent_node(state: AgentState) -> AgentState:
messages = state["messages"]
# Type checking and validation built-in
Key Differences
Feature | Pipeline_ex | LangGraph |
---|---|---|
Type Safety | Runtime maps, no compile-time guarantees | Pydantic models with full type validation |
Schema Definition | Implicit, scattered across modules | Explicit, centralized in TypedDict/BaseModel |
Validation | Manual, ad-hoc | Automatic via Pydantic |
State Updates | Direct map manipulation | Type-safe dictionary updates |
Serialization | Custom JSON encoding | Pydantic’s built-in serialization |
IDE Support | Limited | Full autocomplete and type hints |
State Migration | Not supported | Pydantic’s schema evolution |
Strengths of Current Approach
- Flexibility: Easy to add new fields without schema changes
- Simplicity: No complex type definitions required
- Performance: Direct map access is fast
- Elixir-native: Uses idiomatic Elixir patterns
Weaknesses of Current Approach
- No Type Safety: Errors only caught at runtime
- No Schema Documentation: State structure isn’t self-documenting
- Inconsistent Validation: Each module handles validation differently
- Limited IDE Support: No autocomplete for state fields
- Error-Prone: Typos in field names cause runtime errors
- No State Evolution: Can’t handle schema migrations
Requirements for LangGraph-Style State
To achieve parity with LangGraph’s state management, we need:
- Formal State Schemas: Define state structure explicitly
- Type Validation: Validate state at boundaries
- Reducer Functions: Define how state updates are merged
- Serialization Support: Built-in JSON/YAML conversion
- Schema Evolution: Handle state migrations
- Developer Experience: IDE support and documentation
Proposed Solution: Exdantic-Based State
Using Exdantic (our chosen validation library), we can achieve similar capabilities:
defmodule Pipeline.State.Schema do
use Exdantic, define_struct: true
schema "Pipeline execution state" do
field :messages, {:array, MessageSchema} do
required()
default([])
metadata(reducer: :append)
end
field :current_step, :string do
optional()
end
field :variables, {:map, :string, :any} do
default(%{})
metadata(reducer: :merge)
end
field :results, {:map, :string, :any} do
default(%{})
metadata(reducer: :merge)
end
end
# Custom reducer definitions
def reduce_field(:messages, old, new), do: old ++ new
def reduce_field(:variables, old, new), do: Map.merge(old, new)
def reduce_field(:results, old, new), do: Map.merge(old, new)
end
This would provide:
- Type safety and validation
- Self-documenting schemas
- Built-in serialization
- Custom reducer logic
- Schema evolution support
Migration Path
- Phase 1: Define Exdantic schemas for current state
- Phase 2: Add validation at pipeline boundaries
- Phase 3: Migrate internal code to use schemas
- Phase 4: Add reducer-based state updates
- Phase 5: Implement state migration system
Conclusion
While Pipeline_ex’s current state management is functional and flexible, adopting a schema-based approach similar to LangGraph would provide significant benefits in terms of type safety, developer experience, and maintainability. The combination of Exdantic for schemas and a reducer-based update pattern would achieve feature parity with LangGraph while maintaining Elixir’s strengths.