Of course. Here is a detailed technical document for the sixth proposed feature enhancement: “A Richer Events
System with Querying and Structuring.”
Technical Specification: Enhanced Foundation.Events
System
Document Version: 1.0 Author: AI Assistant Status: PROPOSED
1. Overview
This document specifies a significant enhancement to the Foundation.Events
system. The current system provides a solid foundation for storing and retrieving events based on IDs and correlation IDs. However, for complex systems like DSPEx
, where execution involves deeply nested and structured traces, more advanced capabilities are needed for debugging, analysis, and observability.
This enhancement will evolve the Events
system in three key areas:
- Structured Event Data: Introducing a
behaviour
to allow developers to define strongly-typed event data structs, moving beyond the opaqueterm()
in the:data
field. - Advanced Querying: Upgrading the
EventStore
’s query engine to support filtering and searching within the structureddata
field of events. - Causal Trace Reconstruction: Adding a new API to reconstruct a full, hierarchical execution tree from a flat list of events using
event_id
andparent_id
relationships.
2. Problem Statement & Use Case
A developer is debugging a failed DSPEx.ReAct
execution. The ReAct
agent failed because a search
tool returned an unexpected 404 Not Found
error.
Current
Events
System: The developer can useEvents.get_by_correlation("req-123")
to retrieve a flat list of all events associated with the request. They would then have to manually sift through this list, inspecting the:data
field of each event to piece together the sequence ofThought -> Action -> Observation
. This is tedious and error-prone.DSPEx
Need: The developer needs to answer specific questions quickly:- “Show me all
tool_executed
events for this trace.” - “Filter for events where the
tool_name
wassearch
and thehttp_status
in the observation was not200
.” - “Give me a nested tree view of the entire operation, showing which
thought
led to whichaction
.”
- “Show me all
The current system cannot answer these questions efficiently. This enhancement will provide the necessary tools.
3. Proposed API and Data Structure Changes
3.1. Foundation.EventData
Behaviour (New)
To enable structured data, we will introduce a new behaviour
.
File: foundation/contracts/event_data.ex
defmodule Foundation.Contracts.EventData do
@moduledoc "A behaviour for defining structured event data schemas."
@callback schema() :: map()
# The schema could be a simple map of field names to types,
# or a more complex Ecto.Changeset-style definition for validation.
end
# Example DSPEx Event Data Struct
defmodule DSPEx.Events.ToolExecuted do
@behaviour Foundation.Contracts.EventData
defstruct [:tool_name, :tool_args, :output, :duration_ms]
def schema do
%{
tool_name: :string,
tool_args: :map,
output: :string,
duration_ms: :integer
}
end
end
- When
Foundation.Events.new_event/3
is called with a struct that implements thisbehaviour
, theEventStore
can be aware of its schema.
3.2. Foundation.Events.query/1
(Enhanced)
The query/1
function will be upgraded to accept a new :data_filter
key.
New Signature:
@spec query(query_params :: map() | keyword()) :: {:ok, [Event.t()]} | {:error, Error.t()}
New :data_filter
Option:
- The value can be a
map
for simple equality matching (e.g.,%{tool_name: "search"}
). - The value can be a
function/1
for complex filtering, which will be applied to thedata
field of each event.
Example Usage:
# Find all failed tool executions
Foundation.Events.query(%{
event_type: :tool_executed,
data_filter: fn data ->
# data is the %DSPEx.Events.ToolExecuted{} struct
data.output |> String.starts_with?("Error:")
end
})
# Find a specific tool call
Foundation.Events.query(%{
event_type: :tool_executed,
data_filter: %{tool_name: "search_api"}
})
3.3. Foundation.Events.get_trace_tree/1
(New Function)
This new function will be the cornerstone of causal trace reconstruction.
New Signature:
@spec get_trace_tree(correlation_id :: String.t()) :: {:ok, trace_node()} | {:error, :not_found}
@typedoc "A node in a reconstructed execution trace."
@type trace_node :: %{
event: Event.t(),
children: list(trace_node())
}
Behavior:
- It first calls
get_by_correlation/1
to fetch all relevant events. - It then builds a map of
event_id -> event
. - It iterates through the events, using the
parent_id
field to build a nested tree structure. - It returns the root of the tree (the first event in the trace).
4. Internal Implementation Details
4.1. EventStore
Query Engine Enhancement
The EventStore
GenServer
’s handle_call({:query_events, ...})
needs to be updated.
# in foundation/services/event_store.ex
defp apply_query_filters(events, query_map) do
events
|> Map.values()
|> filter_by_event_type(query_map[:event_type])
|> filter_by_time_range(query_map[:time_range])
|> filter_by_data(query_map[:data_filter]) # <--- NEW STEP
|> apply_pagination(query_map)
end
defp filter_by_data(events, nil), do: events
# Simple map-based filtering
defp filter_by_data(events, filter_map) when is_map(filter_map) do
Enum.filter(events, fn event ->
# Check if event.data is a map and is a superset of filter_map
is_map(event.data) &&
Enum.all?(filter_map, fn {key, value} ->
Map.get(event.data, key) == value
end)
end)
end
# Advanced function-based filtering
defp filter_by_data(events, filter_fun) when is_function(filter_fun, 1) do
Enum.filter(events, fn event ->
try do
filter_fun.(event.data)
rescue
_ -> false # If the filter function crashes, the event doesn't match
end
end)
end
Note on Performance: Filtering on the data
field will be a full scan of the events that pass the other filters. For very large event stores, this could be slow. A production-grade implementation might use a secondary index (like an ETS table or an external search engine like Elasticsearch) to index the data
field for faster searching. For the initial implementation, a full scan is acceptable.
4.2. Trace Tree Reconstruction Logic
This is a classic tree-building algorithm.
# in foundation/events.ex or a new foundation/logic/trace_logic.ex
def get_trace_tree(correlation_id) do
with {:ok, events} <- get_by_correlation(correlation_id) do
if Enum.empty?(events) do
{:error, :not_found}
else
# 1. Index events by their own ID
events_by_id = Enum.into(events, %{}, fn e -> {e.event_id, e} end)
# 2. Group events by their parent ID
# The root event will have a parent_id of nil
children_by_parent_id =
events
|> Enum.group_by(fn e -> e.parent_id end)
# 3. Recursively build the tree starting from the root (parent_id: nil)
root_events = Map.get(children_by_parent_id, nil, [])
# Assuming a single root for a given correlation_id
case root_events do
[root_event | _] ->
tree = build_node(root_event, children_by_parent_id)
{:ok, tree}
[] ->
{:error, :no_root_event_found}
end
end
end
end
defp build_node(event, children_by_parent_id) do
children_events = Map.get(children_by_parent_id, event.event_id, [])
# Recursively build the children nodes
child_nodes =
children_events
|> Enum.sort_by(& &1.timestamp) # Ensure chronological order
|> Enum.map(&build_node(&1, children_by_parent_id))
%{event: event, children: child_nodes}
end
5. DSPEx
Integration Example
To make this useful, DSPEx
needs a way to manage the parent_id
for its events. This can be done with the ErrorContext
.
# in a hypothetical dspex/react.ex
# When starting the ReAct forward pass
parent_context = Foundation.ErrorContext.new(...)
# When generating a thought
thought_context = Foundation.ErrorContext.child_context(parent_context, ...)
# Events created here will automatically get the right correlation_id
# We need a way to link parent/child events. Let's assume an event
# for the thought generation is created with its own ID.
{:ok, thought_event} = Events.new_event(:thought_generated, ...)
Events.store(thought_event)
# When executing a tool based on that thought
tool_context = Foundation.ErrorContext.child_context(thought_context, ...)
{:ok, tool_event} = Events.new_event(
:tool_executed,
...,
parent_id: thought_event.event_id # <-- Explicitly link the events
)
Events.store(tool_event)
This requires passing the event_id
of the parent operation down to the child operation, which can be managed as part of the program’s internal state during execution.
6. Conclusion
Enhancing the Events
system with structured data, advanced querying, and causal trace reconstruction transforms it from a simple logging mechanism into a powerful debugging and analysis tool.
For DSPEx
, this provides:
- Deep Debuggability: Developers can pinpoint the exact failing step in a long chain of reasoning and inspect its full data context.
- Performance Analysis: By querying and aggregating events, developers can answer questions like, “Which tool is the slowest in my agent?” or “How many reasoning steps does my program typically take?”
- Automated Analysis: The structured trace tree can be fed into another
DSPEx
program for automated fault diagnosis or performance analysis, creating a self-reflecting system.
This feature is a strategic investment in the long-term maintainability and observability of complex systems built on foundation
, and it is essential for making DSPEx
a production-ready framework.