← Back to 002 foundation enhancements

007 6of10

Documentation for 007_6of10 from the Ds ex repository.

Of course. Here is a detailed technical document for the sixth proposed feature enhancement: “A Richer Events System with Querying and Structuring.”


Technical Specification: Enhanced Foundation.Events System

Document Version: 1.0 Author: AI Assistant Status: PROPOSED

1. Overview

This document specifies a significant enhancement to the Foundation.Events system. The current system provides a solid foundation for storing and retrieving events based on IDs and correlation IDs. However, for complex systems like DSPEx, where execution involves deeply nested and structured traces, more advanced capabilities are needed for debugging, analysis, and observability.

This enhancement will evolve the Events system in three key areas:

  1. Structured Event Data: Introducing a behaviour to allow developers to define strongly-typed event data structs, moving beyond the opaque term() in the :data field.
  2. Advanced Querying: Upgrading the EventStore’s query engine to support filtering and searching within the structured data field of events.
  3. Causal Trace Reconstruction: Adding a new API to reconstruct a full, hierarchical execution tree from a flat list of events using event_id and parent_id relationships.

2. Problem Statement & Use Case

A developer is debugging a failed DSPEx.ReAct execution. The ReAct agent failed because a search tool returned an unexpected 404 Not Found error.

  • Current Events System: The developer can use Events.get_by_correlation("req-123") to retrieve a flat list of all events associated with the request. They would then have to manually sift through this list, inspecting the :data field of each event to piece together the sequence of Thought -> Action -> Observation. This is tedious and error-prone.

  • DSPEx Need: The developer needs to answer specific questions quickly:

    • “Show me all tool_executed events for this trace.”
    • “Filter for events where the tool_name was search and the http_status in the observation was not 200.”
    • “Give me a nested tree view of the entire operation, showing which thought led to which action.”

The current system cannot answer these questions efficiently. This enhancement will provide the necessary tools.


3. Proposed API and Data Structure Changes

3.1. Foundation.EventData Behaviour (New)

To enable structured data, we will introduce a new behaviour.

File: foundation/contracts/event_data.ex

defmodule Foundation.Contracts.EventData do
  @moduledoc "A behaviour for defining structured event data schemas."
  
  @callback schema() :: map()
  # The schema could be a simple map of field names to types,
  # or a more complex Ecto.Changeset-style definition for validation.
end

# Example DSPEx Event Data Struct
defmodule DSPEx.Events.ToolExecuted do
  @behaviour Foundation.Contracts.EventData

  defstruct [:tool_name, :tool_args, :output, :duration_ms]
  
  def schema do
    %{
      tool_name: :string,
      tool_args: :map,
      output: :string,
      duration_ms: :integer
    }
  end
end
  • When Foundation.Events.new_event/3 is called with a struct that implements this behaviour, the EventStore can be aware of its schema.
3.2. Foundation.Events.query/1 (Enhanced)

The query/1 function will be upgraded to accept a new :data_filter key.

New Signature:

@spec query(query_params :: map() | keyword()) :: {:ok, [Event.t()]} | {:error, Error.t()}

New :data_filter Option:

  • The value can be a map for simple equality matching (e.g., %{tool_name: "search"}).
  • The value can be a function/1 for complex filtering, which will be applied to the data field of each event.

Example Usage:

# Find all failed tool executions
Foundation.Events.query(%{
  event_type: :tool_executed,
  data_filter: fn data ->
    # data is the %DSPEx.Events.ToolExecuted{} struct
    data.output |> String.starts_with?("Error:")
  end
})

# Find a specific tool call
Foundation.Events.query(%{
  event_type: :tool_executed,
  data_filter: %{tool_name: "search_api"}
})
3.3. Foundation.Events.get_trace_tree/1 (New Function)

This new function will be the cornerstone of causal trace reconstruction.

New Signature:

@spec get_trace_tree(correlation_id :: String.t()) :: {:ok, trace_node()} | {:error, :not_found}

@typedoc "A node in a reconstructed execution trace."
@type trace_node :: %{
  event: Event.t(),
  children: list(trace_node())
}

Behavior:

  1. It first calls get_by_correlation/1 to fetch all relevant events.
  2. It then builds a map of event_id -> event.
  3. It iterates through the events, using the parent_id field to build a nested tree structure.
  4. It returns the root of the tree (the first event in the trace).

4. Internal Implementation Details

4.1. EventStore Query Engine Enhancement

The EventStore GenServer’s handle_call({:query_events, ...}) needs to be updated.

# in foundation/services/event_store.ex

defp apply_query_filters(events, query_map) do
  events
  |> Map.values()
  |> filter_by_event_type(query_map[:event_type])
  |> filter_by_time_range(query_map[:time_range])
  |> filter_by_data(query_map[:data_filter]) # <--- NEW STEP
  |> apply_pagination(query_map)
end

defp filter_by_data(events, nil), do: events

# Simple map-based filtering
defp filter_by_data(events, filter_map) when is_map(filter_map) do
  Enum.filter(events, fn event ->
    # Check if event.data is a map and is a superset of filter_map
    is_map(event.data) &&
    Enum.all?(filter_map, fn {key, value} ->
      Map.get(event.data, key) == value
    end)
  end)
end

# Advanced function-based filtering
defp filter_by_data(events, filter_fun) when is_function(filter_fun, 1) do
  Enum.filter(events, fn event ->
    try do
      filter_fun.(event.data)
    rescue
      _ -> false # If the filter function crashes, the event doesn't match
    end
  end)
end

Note on Performance: Filtering on the data field will be a full scan of the events that pass the other filters. For very large event stores, this could be slow. A production-grade implementation might use a secondary index (like an ETS table or an external search engine like Elasticsearch) to index the data field for faster searching. For the initial implementation, a full scan is acceptable.

4.2. Trace Tree Reconstruction Logic

This is a classic tree-building algorithm.

# in foundation/events.ex or a new foundation/logic/trace_logic.ex

def get_trace_tree(correlation_id) do
  with {:ok, events} <- get_by_correlation(correlation_id) do
    if Enum.empty?(events) do
      {:error, :not_found}
    else
      # 1. Index events by their own ID
      events_by_id = Enum.into(events, %{}, fn e -> {e.event_id, e} end)
      
      # 2. Group events by their parent ID
      # The root event will have a parent_id of nil
      children_by_parent_id =
        events
        |> Enum.group_by(fn e -> e.parent_id end)
        
      # 3. Recursively build the tree starting from the root (parent_id: nil)
      root_events = Map.get(children_by_parent_id, nil, [])
      
      # Assuming a single root for a given correlation_id
      case root_events do
        [root_event | _] ->
          tree = build_node(root_event, children_by_parent_id)
          {:ok, tree}
        [] ->
          {:error, :no_root_event_found}
      end
    end
  end
end

defp build_node(event, children_by_parent_id) do
  children_events = Map.get(children_by_parent_id, event.event_id, [])
  
  # Recursively build the children nodes
  child_nodes =
    children_events
    |> Enum.sort_by(& &1.timestamp) # Ensure chronological order
    |> Enum.map(&build_node(&1, children_by_parent_id))
    
  %{event: event, children: child_nodes}
end

5. DSPEx Integration Example

To make this useful, DSPEx needs a way to manage the parent_id for its events. This can be done with the ErrorContext.

# in a hypothetical dspex/react.ex

# When starting the ReAct forward pass
parent_context = Foundation.ErrorContext.new(...)

# When generating a thought
thought_context = Foundation.ErrorContext.child_context(parent_context, ...)
# Events created here will automatically get the right correlation_id
# We need a way to link parent/child events. Let's assume an event
# for the thought generation is created with its own ID.
{:ok, thought_event} = Events.new_event(:thought_generated, ...)
Events.store(thought_event)

# When executing a tool based on that thought
tool_context = Foundation.ErrorContext.child_context(thought_context, ...)
{:ok, tool_event} = Events.new_event(
  :tool_executed, 
  ..., 
  parent_id: thought_event.event_id # <-- Explicitly link the events
)
Events.store(tool_event)

This requires passing the event_id of the parent operation down to the child operation, which can be managed as part of the program’s internal state during execution.


6. Conclusion

Enhancing the Events system with structured data, advanced querying, and causal trace reconstruction transforms it from a simple logging mechanism into a powerful debugging and analysis tool.

For DSPEx, this provides:

  • Deep Debuggability: Developers can pinpoint the exact failing step in a long chain of reasoning and inspect its full data context.
  • Performance Analysis: By querying and aggregating events, developers can answer questions like, “Which tool is the slowest in my agent?” or “How many reasoning steps does my program typically take?”
  • Automated Analysis: The structured trace tree can be fed into another DSPEx program for automated fault diagnosis or performance analysis, creating a self-reflecting system.

This feature is a strategic investment in the long-term maintainability and observability of complex systems built on foundation, and it is essential for making DSPEx a production-ready framework.