Of course. Here is a detailed technical document for the tenth and final proposed feature enhancement: “A Foundation.Workflow
Engine for Composable Pipelines.”
Technical Specification: Foundation.Workflow
Engine
Document Version: 1.0 Author: AI Assistant Status: PROPOSED
1. Overview
This document specifies a new, high-level orchestration module, Foundation.Workflow
. While foundation
provides excellent low-level components for building resilient applications, developers are still responsible for manually wiring these components together, managing the flow of data, and handling the context for error reporting and tracing.
The Foundation.Workflow
engine will provide a simple, declarative way to define a multi-step pipeline. It will automatically handle the execution sequence, data passing, and, most importantly, the creation and propagation of a unified ErrorContext
and Event
trace for the entire end-to-end operation.
This feature is a major developer experience enhancement. For DSPEx
, it means a developer can define a complex RAG (Retrieval-Augmented Generation) pipeline as a simple list of steps, and foundation
will automatically provide a single, correlated, and deeply nested trace for the entire execution, making debugging and observability effortless.
2. Problem Statement & Use Case
A developer is building a standard RAG pipeline in DSPEx
:
- Step 1: Take a user’s
question
. - Step 2: Call an
RM.Client
toretrieve
relevant context. - Step 3: Call an
LM.Client
togenerate
an answer using thequestion
andcontext
. - Step 4:
Parse
the final answer.
Manually implementing this requires significant boilerplate:
- Creating a top-level
ErrorContext
. - Passing this context through every function call.
- Adding
breadcrumbs
ortrace_steps
at each stage. - Piping the output of one function into the input of the next, often with some data transformation logic in between.
- Handling errors at each step of the pipeline.
# The verbose, manual way
def run_rag_pipeline(inputs) do
context = Foundation.ErrorContext.new(__MODULE__, :run_rag_pipeline, metadata: %{inputs: inputs})
with {context, {:ok, retrieved}} <- Foundation.ErrorContext.measure_trace_step(context, :retrieve, %{}, fn ->
DSPEx.Client.RM.retrieve(inputs.question)
end),
# Manually add breadcrumbs, etc.
context <- Foundation.ErrorContext.add_breadcrumb(context, __MODULE__, :prepare_generation, %{}),
generation_inputs = %{question: inputs.question, context: retrieved.passages},
{context, {:ok, generated}} <- Foundation.ErrorContext.measure_trace_step(context, :generate, %{}, fn ->
DSPEx.Client.LM.request(:openai, format_prompt(generation_inputs))
end),
# ... and so on ...
do
#...
else
#...
end
end
This is tedious and error-prone. The Workflow
engine will abstract all of this away.
3. Proposed API and Architecture
The core of the new API is the Foundation.Workflow
module and its run/3
function.
3.1. Foundation.Workflow
Public API
File: lib/foundation/workflow.ex
defmodule Foundation.Workflow do
@moduledoc "A declarative engine for running multi-step pipelines."
@typedoc """
A single step in a workflow pipeline.
- `step_name` (atom): A unique name for the step.
- `step_fun` (function): A function that takes a map (the accumulator)
and returns `{:ok, new_accumulator}` or `{:error, reason}`.
"""
@type step :: {step_name :: atom(), step_fun :: (map() -> {:ok, map()} | {:error, term()})}
@typedoc "A list of steps defining the pipeline."
@type pipeline :: [step()]
@doc """
Executes a workflow pipeline.
It runs a list of step functions sequentially, passing the output of one
step as the input to the next. It automatically manages a unified
`ErrorContext` and emits telemetry for the entire workflow and each
individual step.
If any step returns `{:error, reason}`, the pipeline halts and returns
the enhanced error.
## Parameters
- `pipeline`: A list of `{step_name, step_function}` tuples.
- `initial_accumulator`: The initial map of data to be passed to the first step.
- `opts`: A keyword list of options.
- `:correlation_id` (optional): An existing correlation ID to use for the trace.
- `:metadata` (optional): Initial metadata to add to the parent error context.
"""
@spec run(pipeline :: pipeline(), initial_accumulator :: map(), opts :: keyword()) ::
{:ok, map()} | {:error, Error.t()}
def run(pipeline, initial_accumulator, opts \\ []) do
# 1. Create the parent ErrorContext for the entire workflow.
correlation_id = Keyword.get(opts, :correlation_id) || Foundation.Utils.generate_correlation_id()
parent_context = Foundation.ErrorContext.new(
__MODULE__,
:run,
correlation_id: correlation_id,
metadata: Keyword.get(opts, :metadata, %{})
)
# 2. Use `with_context` to wrap the entire execution.
Foundation.ErrorContext.with_context(parent_context, fn ->
# 3. Execute the pipeline steps recursively.
do_run_pipeline(pipeline, initial_accumulator, parent_context)
end)
end
# --- Private Implementation ---
defp do_run_pipeline([], final_accumulator, _context) do
# Pipeline finished successfully.
{:ok, final_accumulator}
end
defp do_run_pipeline([{step_name, step_fun} | remaining_steps], accumulator, parent_context) do
# 1. Create a child context for this specific step.
step_context = Foundation.ErrorContext.child_context(parent_context, __MODULE__, step_name, %{
step_input_preview: Foundation.Utils.truncate_if_large(accumulator)
})
# 2. Measure the execution of the step function.
{result, duration_ns} = Foundation.Utils.measure(fn ->
step_fun.(accumulator)
end)
# 3. Add a structured trace step to the *parent* context.
updated_parent_context = Foundation.ErrorContext.add_trace_step(parent_context, step_name, %{
status: if(match?({:ok, _}, result), do: :success, else: :failure),
duration_ns: duration_ns,
# Do not include full data in trace to avoid bloat; it's in the error context if needed.
})
# 4. Handle the result of the step.
case result do
{:ok, new_accumulator} when is_map(new_accumulator) ->
# Success, continue to the next step with the new accumulator
# and the updated parent context.
do_run_pipeline(remaining_steps, new_accumulator, updated_parent_context)
{:error, _} = error ->
# Failure, halt the pipeline and return the enhanced error.
Foundation.ErrorContext.enhance_error(error, step_context)
_ ->
# Invalid return from a step function.
Foundation.ErrorContext.enhance_error(
{:error, {:invalid_step_return, result}},
step_context
)
end
end
end
4. Example DSPEx
RAG Pipeline using the Workflow Engine
This example demonstrates how the DSPEx.RAG
module from our earlier plans can be radically simplified.
# The new, simplified RAG program
defmodule DSPEx.RAG do
@behaviour DSPEx.Program
defstruct [:retriever, :generator]
@impl DSPEx.Program
def forward(program, inputs) do
# 1. Define the pipeline as a list of functions.
pipeline = [
{:retrieve_context, &retrieve_step/1},
{:generate_answer, &generate_step(&1, program.generator)}
]
# The initial accumulator is the input map.
initial_data = %{
question: inputs.question,
retriever: program.retriever
}
# 2. Run the workflow.
case Foundation.Workflow.run(pipeline, initial_data) do
{:ok, final_state} ->
# 3. Format the final output.
prediction = %DSPEx.Prediction{
answer: final_state.answer,
context: final_state.context
}
{:ok, prediction}
{:error, _} = error ->
error
end
end
# --- Private Step Functions ---
# Each step takes the accumulator map and returns `{:ok, updated_accumulator}`.
defp retrieve_step(acc) do
with {:ok, retrieved} <- DSPEx.Program.forward(acc.retriever, %{query: acc.question}) do
# Add the retrieved context to the accumulator for the next step.
updated_acc = Map.put(acc, :context, retrieved.passages)
{:ok, updated_acc}
end
end
defp generate_step(acc, generator_program) do
# The generator needs the question and the context from the accumulator.
generator_inputs = Map.take(acc, [:question, :context])
with {:ok, generated} <- DSPEx.Program.forward(generator_program, generator_inputs) do
# Add the final answer to the accumulator.
updated_acc = Map.put(acc, :answer, generated.answer)
{:ok, updated_acc}
end
end
end
Benefits Illustrated:
- Clarity and Readability: The
forward
function now clearly shows the high-level steps of the pipeline. The implementation logic for each step is neatly separated into its own private function. - No Manual Context Passing: The developer does not need to touch the
ErrorContext
at all. TheWorkflow.run
function handles its creation, propagation, and enhancement automatically. - Automatic Observability: A single call to
Workflow.run
will produce a complete, correlated trace of events and telemetry for every step in the pipeline, making debugging trivial. If thegenerate_step
fails, the returnedError.t()
struct will contain a trace showing the successfulretrieve_step
that preceded it, including the data it produced.
5. Telemetry and Events
The Workflow
engine itself would emit telemetry to monitor pipeline performance.
[:foundation, :workflow, :run, :stop]
- Measurements:
%{duration: integer()}
- Metadata:
%{pipeline_name: ..., success: boolean, step_count: integer()}
- Measurements:
[:foundation, :workflow, :step, :stop]
- Measurements:
%{duration: integer()}
- Metadata:
%{pipeline_name: ..., step_name: ..., success: boolean}
- Measurements:
This allows for powerful operational insights, such as identifying the slowest step in a popular workflow or alerting when a particular step starts failing frequently.
6. Conclusion
The Foundation.Workflow
engine is a high-level abstraction that dramatically simplifies the creation of complex, multi-step programs. It provides a declarative, functional API for defining pipelines while handling the imperative, boilerplate-heavy tasks of data passing, error handling, and context management automatically.
For DSPEx
, this is a powerful enabler. It makes building and maintaining compositional programs like RAG
, ReAct
, or MultiHop
significantly simpler and more robust. Developers can focus on the core logic of each reasoning step, confident that the foundation
layer is providing end-to-end observability and resilience for the entire workflow. This enhancement is a capstone feature that brings together many of foundation
’s other strengths into a single, cohesive developer experience.