← Back to 002 foundation enhancements

011 10of10

Documentation for 011_10of10 from the Ds ex repository.

Of course. Here is a detailed technical document for the tenth and final proposed feature enhancement: “A Foundation.Workflow Engine for Composable Pipelines.”


Technical Specification: Foundation.Workflow Engine

Document Version: 1.0 Author: AI Assistant Status: PROPOSED

1. Overview

This document specifies a new, high-level orchestration module, Foundation.Workflow. While foundation provides excellent low-level components for building resilient applications, developers are still responsible for manually wiring these components together, managing the flow of data, and handling the context for error reporting and tracing.

The Foundation.Workflow engine will provide a simple, declarative way to define a multi-step pipeline. It will automatically handle the execution sequence, data passing, and, most importantly, the creation and propagation of a unified ErrorContext and Event trace for the entire end-to-end operation.

This feature is a major developer experience enhancement. For DSPEx, it means a developer can define a complex RAG (Retrieval-Augmented Generation) pipeline as a simple list of steps, and foundation will automatically provide a single, correlated, and deeply nested trace for the entire execution, making debugging and observability effortless.


2. Problem Statement & Use Case

A developer is building a standard RAG pipeline in DSPEx:

  1. Step 1: Take a user’s question.
  2. Step 2: Call an RM.Client to retrieve relevant context.
  3. Step 3: Call an LM.Client to generate an answer using the question and context.
  4. Step 4: Parse the final answer.

Manually implementing this requires significant boilerplate:

  • Creating a top-level ErrorContext.
  • Passing this context through every function call.
  • Adding breadcrumbs or trace_steps at each stage.
  • Piping the output of one function into the input of the next, often with some data transformation logic in between.
  • Handling errors at each step of the pipeline.
# The verbose, manual way
def run_rag_pipeline(inputs) do
  context = Foundation.ErrorContext.new(__MODULE__, :run_rag_pipeline, metadata: %{inputs: inputs})
  
  with {context, {:ok, retrieved}} <- Foundation.ErrorContext.measure_trace_step(context, :retrieve, %{}, fn ->
         DSPEx.Client.RM.retrieve(inputs.question)
       end),
       
       # Manually add breadcrumbs, etc.
       context <- Foundation.ErrorContext.add_breadcrumb(context, __MODULE__, :prepare_generation, %{}),
       
       generation_inputs = %{question: inputs.question, context: retrieved.passages},
       
       {context, {:ok, generated}} <- Foundation.ErrorContext.measure_trace_step(context, :generate, %{}, fn ->
         DSPEx.Client.LM.request(:openai, format_prompt(generation_inputs))
       end),
       
       # ... and so on ...
  do
    #...
  else
    #...
  end
end

This is tedious and error-prone. The Workflow engine will abstract all of this away.


3. Proposed API and Architecture

The core of the new API is the Foundation.Workflow module and its run/3 function.

3.1. Foundation.Workflow Public API

File: lib/foundation/workflow.ex

defmodule Foundation.Workflow do
  @moduledoc "A declarative engine for running multi-step pipelines."

  @typedoc """
  A single step in a workflow pipeline.
  - `step_name` (atom): A unique name for the step.
  - `step_fun` (function): A function that takes a map (the accumulator)
    and returns `{:ok, new_accumulator}` or `{:error, reason}`.
  """
  @type step :: {step_name :: atom(), step_fun :: (map() -> {:ok, map()} | {:error, term()})}

  @typedoc "A list of steps defining the pipeline."
  @type pipeline :: [step()]

  @doc """
  Executes a workflow pipeline.

  It runs a list of step functions sequentially, passing the output of one
  step as the input to the next. It automatically manages a unified

  `ErrorContext` and emits telemetry for the entire workflow and each
  individual step.

  If any step returns `{:error, reason}`, the pipeline halts and returns
  the enhanced error.

  ## Parameters
    - `pipeline`: A list of `{step_name, step_function}` tuples.
    - `initial_accumulator`: The initial map of data to be passed to the first step.
    - `opts`: A keyword list of options.
      - `:correlation_id` (optional): An existing correlation ID to use for the trace.
      - `:metadata` (optional): Initial metadata to add to the parent error context.
  """
  @spec run(pipeline :: pipeline(), initial_accumulator :: map(), opts :: keyword()) ::
    {:ok, map()} | {:error, Error.t()}
  def run(pipeline, initial_accumulator, opts \\ []) do
    # 1. Create the parent ErrorContext for the entire workflow.
    correlation_id = Keyword.get(opts, :correlation_id) || Foundation.Utils.generate_correlation_id()
    parent_context = Foundation.ErrorContext.new(
      __MODULE__, 
      :run,
      correlation_id: correlation_id,
      metadata: Keyword.get(opts, :metadata, %{})
    )

    # 2. Use `with_context` to wrap the entire execution.
    Foundation.ErrorContext.with_context(parent_context, fn ->
      # 3. Execute the pipeline steps recursively.
      do_run_pipeline(pipeline, initial_accumulator, parent_context)
    end)
  end

  # --- Private Implementation ---

  defp do_run_pipeline([], final_accumulator, _context) do
    # Pipeline finished successfully.
    {:ok, final_accumulator}
  end

  defp do_run_pipeline([{step_name, step_fun} | remaining_steps], accumulator, parent_context) do
    # 1. Create a child context for this specific step.
    step_context = Foundation.ErrorContext.child_context(parent_context, __MODULE__, step_name, %{
      step_input_preview: Foundation.Utils.truncate_if_large(accumulator)
    })
    
    # 2. Measure the execution of the step function.
    {result, duration_ns} = Foundation.Utils.measure(fn ->
      step_fun.(accumulator)
    end)
    
    # 3. Add a structured trace step to the *parent* context.
    updated_parent_context = Foundation.ErrorContext.add_trace_step(parent_context, step_name, %{
      status: if(match?({:ok, _}, result), do: :success, else: :failure),
      duration_ns: duration_ns,
      # Do not include full data in trace to avoid bloat; it's in the error context if needed.
    })
    
    # 4. Handle the result of the step.
    case result do
      {:ok, new_accumulator} when is_map(new_accumulator) ->
        # Success, continue to the next step with the new accumulator
        # and the updated parent context.
        do_run_pipeline(remaining_steps, new_accumulator, updated_parent_context)
        
      {:error, _} = error ->
        # Failure, halt the pipeline and return the enhanced error.
        Foundation.ErrorContext.enhance_error(error, step_context)

      _ ->
        # Invalid return from a step function.
        Foundation.ErrorContext.enhance_error(
          {:error, {:invalid_step_return, result}},
          step_context
        )
    end
  end
end

4. Example DSPEx RAG Pipeline using the Workflow Engine

This example demonstrates how the DSPEx.RAG module from our earlier plans can be radically simplified.

# The new, simplified RAG program
defmodule DSPEx.RAG do
  @behaviour DSPEx.Program
  
  defstruct [:retriever, :generator]

  @impl DSPEx.Program
  def forward(program, inputs) do
    # 1. Define the pipeline as a list of functions.
    pipeline = [
      {:retrieve_context, &retrieve_step/1},
      {:generate_answer, &generate_step(&1, program.generator)}
    ]
    
    # The initial accumulator is the input map.
    initial_data = %{
      question: inputs.question,
      retriever: program.retriever
    }
    
    # 2. Run the workflow.
    case Foundation.Workflow.run(pipeline, initial_data) do
      {:ok, final_state} ->
        # 3. Format the final output.
        prediction = %DSPEx.Prediction{
          answer: final_state.answer,
          context: final_state.context
        }
        {:ok, prediction}
        
      {:error, _} = error ->
        error
    end
  end
  
  # --- Private Step Functions ---
  
  # Each step takes the accumulator map and returns `{:ok, updated_accumulator}`.
  defp retrieve_step(acc) do
    with {:ok, retrieved} <- DSPEx.Program.forward(acc.retriever, %{query: acc.question}) do
      # Add the retrieved context to the accumulator for the next step.
      updated_acc = Map.put(acc, :context, retrieved.passages)
      {:ok, updated_acc}
    end
  end
  
  defp generate_step(acc, generator_program) do
    # The generator needs the question and the context from the accumulator.
    generator_inputs = Map.take(acc, [:question, :context])
    
    with {:ok, generated} <- DSPEx.Program.forward(generator_program, generator_inputs) do
      # Add the final answer to the accumulator.
      updated_acc = Map.put(acc, :answer, generated.answer)
      {:ok, updated_acc}
    end
  end
end

Benefits Illustrated:

  • Clarity and Readability: The forward function now clearly shows the high-level steps of the pipeline. The implementation logic for each step is neatly separated into its own private function.
  • No Manual Context Passing: The developer does not need to touch the ErrorContext at all. The Workflow.run function handles its creation, propagation, and enhancement automatically.
  • Automatic Observability: A single call to Workflow.run will produce a complete, correlated trace of events and telemetry for every step in the pipeline, making debugging trivial. If the generate_step fails, the returned Error.t() struct will contain a trace showing the successful retrieve_step that preceded it, including the data it produced.

5. Telemetry and Events

The Workflow engine itself would emit telemetry to monitor pipeline performance.

  • [:foundation, :workflow, :run, :stop]
    • Measurements: %{duration: integer()}
    • Metadata: %{pipeline_name: ..., success: boolean, step_count: integer()}
  • [:foundation, :workflow, :step, :stop]
    • Measurements: %{duration: integer()}
    • Metadata: %{pipeline_name: ..., step_name: ..., success: boolean}

This allows for powerful operational insights, such as identifying the slowest step in a popular workflow or alerting when a particular step starts failing frequently.


6. Conclusion

The Foundation.Workflow engine is a high-level abstraction that dramatically simplifies the creation of complex, multi-step programs. It provides a declarative, functional API for defining pipelines while handling the imperative, boilerplate-heavy tasks of data passing, error handling, and context management automatically.

For DSPEx, this is a powerful enabler. It makes building and maintaining compositional programs like RAG, ReAct, or MultiHop significantly simpler and more robust. Developers can focus on the core logic of each reasoning step, confident that the foundation layer is providing end-to-end observability and resilience for the entire workflow. This enhancement is a capstone feature that brings together many of foundation’s other strengths into a single, cohesive developer experience.