DSPEx Gap Analysis: Module System and Advanced Predictors
Overview
This document analyzes the gaps in DSPEx’s module system and advanced predictor implementations compared to DSPy. The focus is on the sophisticated reasoning patterns and module composition capabilities that make DSPy powerful for complex AI applications.
🧠 Advanced Reasoning Modules: Critical Gaps
1. Chain of Thought (CoT) - MISSING
Status: ❌ Not implemented in DSPEx
Python DSPy Implementation Analysis:
class ChainOfThought(Module):
def __init__(self, signature, rationale_type=None, activated=True, **config):
super().__init__()
self.activated = activated
self.signature = signature
# Create extended signature with rationale
signature = signature.with_updated_fields(
"rationale", desc="Let's think step by step to answer this question."
)
self.predict = Predict(signature, **config)
def forward(self, **kwargs):
if not self.activated:
return self.predict(**kwargs)
return self.predict(**kwargs)
Required DSPEx Implementation:
defmodule DSPEx.Predict.ChainOfThought do
@moduledoc """
Chain of Thought reasoning that adds intermediate reasoning steps.
Automatically extends signatures to include rationale/thinking fields
and prompts models to show their reasoning process.
"""
use DSPEx.Program
alias DSPEx.{Signature, Predict}
defstruct [:original_signature, :extended_signature, :predict, :activated, :rationale_type]
@type t :: %__MODULE__{
original_signature: module(),
extended_signature: module(),
predict: Predict.t(),
activated: boolean(),
rationale_type: atom()
}
@spec new(module(), keyword()) :: t()
def new(signature, opts \\ []) do
activated = Keyword.get(opts, :activated, true)
rationale_type = Keyword.get(opts, :rationale_type, :thinking)
# Create extended signature with rationale field
extended_signature = create_extended_signature(signature, rationale_type)
predict = Predict.new(extended_signature, opts)
%__MODULE__{
original_signature: signature,
extended_signature: extended_signature,
predict: predict,
activated: activated,
rationale_type: rationale_type
}
end
@impl DSPEx.Program
def forward(program, inputs, opts \\ []) do
if program.activated do
case DSPEx.Program.forward(program.predict, inputs, opts) do
{:ok, outputs} ->
# Extract final answer from rationale + answer
final_outputs = extract_final_outputs(outputs, program.original_signature)
{:ok, final_outputs}
error -> error
end
else
# Direct prediction without reasoning
basic_predict = Predict.new(program.original_signature, opts)
DSPEx.Program.forward(basic_predict, inputs, opts)
end
end
# Create signature with added reasoning field
defp create_extended_signature(original_signature, rationale_type) do
reasoning_desc = case rationale_type do
:thinking -> "Let's think step by step to solve this problem."
:analysis -> "Let's analyze this step by step."
:reasoning -> "Let's reason through this carefully."
_ -> "Let's work through this systematically."
end
# This would need integration with DSPEx.Signature system
# to dynamically create extended signatures
Signature.extend_with_field(original_signature, :rationale, reasoning_desc)
end
defp extract_final_outputs(outputs, original_signature) do
# Remove rationale field and return only original signature outputs
original_fields = Signature.output_fields(original_signature)
Map.take(outputs, original_fields)
end
end
2. ReAct (Reason + Act) - MISSING
Status: ❌ Not implemented in DSPEx
Python DSPy ReAct Pattern:
class ReAct(Module):
def __init__(self, signature, tools, max_iters=5):
self.signature = signature
self.tools = tools
self.max_iters = max_iters
# Create signatures for different steps
self.think_sig = Signature("observation -> thought, action, action_input")
self.act_sig = Signature("action, action_input -> observation")
def forward(self, **kwargs):
observation = kwargs.get("question", "")
for i in range(self.max_iters):
# Think step
thought_pred = dspy.Predict(self.think_sig)(observation=observation)
# Act step
if thought_pred.action == "Final Answer":
return thought_pred.action_input
# Execute action
observation = self.execute_action(thought_pred.action, thought_pred.action_input)
Required DSPEx Implementation:
defmodule DSPEx.Predict.ReAct do
@moduledoc """
ReAct (Reason + Act) module for agent-style reasoning with tool use.
Implements the Reason-Act-Observe loop for multi-step problem solving
with external tool integration.
"""
use DSPEx.Program
alias DSPEx.{Signature, Predict}
defstruct [
:signature,
:tools,
:max_iters,
:think_predictor,
:final_predictor,
:tool_executor
]
@type tool :: %{
name: String.t(),
description: String.t(),
function: function()
}
@type t :: %__MODULE__{
signature: module(),
tools: [tool()],
max_iters: pos_integer(),
think_predictor: Predict.t(),
final_predictor: Predict.t(),
tool_executor: module()
}
def new(signature, tools, opts \\ []) do
max_iters = Keyword.get(opts, :max_iters, 5)
# Create specialized signatures for ReAct steps
think_signature = create_think_signature(tools)
final_signature = create_final_signature(signature)
%__MODULE__{
signature: signature,
tools: tools,
max_iters: max_iters,
think_predictor: Predict.new(think_signature, opts),
final_predictor: Predict.new(final_signature, opts),
tool_executor: DSPEx.Tools.Executor
}
end
@impl DSPEx.Program
def forward(program, inputs, opts \\ []) do
initial_observation = format_initial_observation(inputs)
react_loop(program, initial_observation, 0, [])
end
defp react_loop(program, observation, iteration, history) when iteration >= program.max_iters do
# Max iterations reached, return best effort
{:ok, %{answer: "Maximum iterations reached. Unable to complete task.", history: history}}
end
defp react_loop(program, observation, iteration, history) do
# THINK: Decide what to do next
think_inputs = %{
observation: observation,
available_tools: format_tools(program.tools),
iteration: iteration
}
case DSPEx.Program.forward(program.think_predictor, think_inputs) do
{:ok, %{thought: thought, action: action, action_input: action_input}} ->
updated_history = [{:thought, thought}, {:action, action} | history]
case action do
"Final Answer" ->
# Generate final answer
final_inputs = %{
question: observation,
reasoning_history: format_history(updated_history),
answer: action_input
}
case DSPEx.Program.forward(program.final_predictor, final_inputs) do
{:ok, final_outputs} ->
{:ok, Map.put(final_outputs, :history, Enum.reverse(updated_history))}
error -> error
end
_ ->
# Execute the action
case execute_tool_action(program, action, action_input) do
{:ok, new_observation} ->
react_loop(program, new_observation, iteration + 1,
[{:observation, new_observation} | updated_history])
{:error, error_msg} ->
error_observation = "Error: #{error_msg}"
react_loop(program, error_observation, iteration + 1,
[{:error, error_observation} | updated_history])
end
end
{:error, reason} ->
{:error, {:react_thinking_failed, reason}}
end
end
defp create_think_signature(tools) do
tool_descriptions = Enum.map_join(tools, "\n", fn tool ->
"- #{tool.name}: #{tool.description}"
end)
instructions = """
You are a helpful assistant that can use tools to answer questions.
Available tools:
#{tool_descriptions}
Given an observation, think about what to do next. You can either:
1. Use a tool by specifying the action name and input
2. Provide a "Final Answer" if you have enough information
Format your response with:
- thought: Your reasoning about what to do next
- action: Either a tool name or "Final Answer"
- action_input: The input for the tool or your final answer
"""
# This would use DSPEx signature creation
create_signature("observation -> thought, action, action_input", instructions)
end
defp create_final_signature(_original_signature) do
instructions = """
Based on your reasoning history, provide a comprehensive final answer to the original question.
"""
create_signature("question, reasoning_history, answer -> final_answer", instructions)
end
defp execute_tool_action(program, action, action_input) do
case Enum.find(program.tools, &(&1.name == action)) do
nil ->
{:error, "Unknown tool: #{action}"}
tool ->
try do
result = tool.function.(action_input)
{:ok, result}
rescue
error ->
{:error, "Tool execution failed: #{inspect(error)}"}
end
end
end
defp format_tools(tools) do
Enum.map_join(tools, "\n", fn tool ->
"#{tool.name}: #{tool.description}"
end)
end
defp format_initial_observation(inputs) do
case inputs do
%{question: question} -> question
%{input: input} -> input
_ -> inspect(inputs)
end
end
defp format_history(history) do
history
|> Enum.reverse()
|> Enum.map_join("\n", fn
{:thought, thought} -> "Thought: #{thought}"
{:action, action} -> "Action: #{action}"
{:observation, obs} -> "Observation: #{obs}"
{:error, error} -> "Error: #{error}"
end)
end
# Helper to create signatures - would integrate with DSPEx.Signature
defp create_signature(spec, instructions) do
# Implementation depends on DSPEx signature system
spec
end
end
3. Program of Thought (PoT) - MISSING
Status: ❌ Not implemented in DSPEx
Python DSPy Features:
- Code generation for mathematical reasoning
- Safe code execution environment
- Integration with Python interpreter
- Result verification and error handling
Required DSPEx Implementation:
defmodule DSPEx.Predict.ProgramOfThought do
@moduledoc """
Program of Thought module for code-based reasoning.
Generates and executes code to solve problems that require
mathematical or algorithmic thinking.
"""
use DSPEx.Program
alias DSPEx.{Signature, Predict}
defstruct [
:signature,
:code_predictor,
:interpreter,
:max_retries,
:timeout
]
def new(signature, opts \\ []) do
max_retries = Keyword.get(opts, :max_retries, 3)
timeout = Keyword.get(opts, :timeout, 10_000)
# Create signature for code generation
code_signature = create_code_signature(signature)
%__MODULE__{
signature: signature,
code_predictor: Predict.new(code_signature, opts),
interpreter: DSPEx.CodeInterpreter,
max_retries: max_retries,
timeout: timeout
}
end
@impl DSPEx.Program
def forward(program, inputs, opts \\ []) do
generate_and_execute_code(program, inputs, 0)
end
defp generate_and_execute_code(program, inputs, attempt) when attempt >= program.max_retries do
{:error, :max_retries_exceeded}
end
defp generate_and_execute_code(program, inputs, attempt) do
# Generate code
case DSPEx.Program.forward(program.code_predictor, inputs) do
{:ok, %{code: code, explanation: explanation}} ->
# Execute the generated code
case execute_code_safely(program.interpreter, code, program.timeout) do
{:ok, result} ->
{:ok, %{answer: result, code: code, explanation: explanation}}
{:error, execution_error} ->
# Retry with error context
error_context = Map.put(inputs, :previous_error, execution_error)
generate_and_execute_code(program, error_context, attempt + 1)
end
error -> error
end
end
defp create_code_signature(_original_signature) do
instructions = """
Solve this problem by writing Python code.
Your code should:
1. Be complete and executable
2. Include clear variable names
3. Print or return the final answer
4. Handle edge cases appropriately
Provide both the code and a brief explanation of your approach.
"""
create_signature("problem -> code, explanation", instructions)
end
defp execute_code_safely(interpreter, code, timeout) do
# This would integrate with a safe code execution environment
# Could use Docker, restricted Python environment, or Elixir ports
interpreter.execute(code, timeout: timeout)
end
end
4. Multi-Chain Comparison - MISSING
Status: ❌ Not implemented in DSPEx
Python DSPy Pattern:
class MultiChainComparison(Module):
def __init__(self, signature, M=3):
self.M = M
self.signature = signature
self.chains = [ChainOfThought(signature) for _ in range(M)]
self.compare = ChainOfThought("reasoning_chains -> best_reasoning")
def forward(self, **kwargs):
# Generate M different reasoning chains
chains = [chain(**kwargs) for chain in self.chains]
# Compare and select best
comparison_input = format_chains_for_comparison(chains)
best = self.compare(reasoning_chains=comparison_input)
return best
Required DSPEx Implementation:
defmodule DSPEx.Predict.MultiChainComparison do
@moduledoc """
Multi-Chain Comparison for generating and comparing multiple reasoning paths.
Generates multiple independent reasoning chains and selects the best one
through comparison and evaluation.
"""
use DSPEx.Program
alias DSPEx.Predict.ChainOfThought
defstruct [:signature, :chains, :comparator, :num_chains, :selection_strategy]
def new(signature, opts \\ []) do
num_chains = Keyword.get(opts, :num_chains, 3)
selection_strategy = Keyword.get(opts, :selection_strategy, :comparison)
# Create multiple reasoning chains
chains = for _i <- 1..num_chains do
ChainOfThought.new(signature, opts)
end
# Create comparator for selecting best chain
comparator = create_comparator(signature, opts)
%__MODULE__{
signature: signature,
chains: chains,
comparator: comparator,
num_chains: num_chains,
selection_strategy: selection_strategy
}
end
@impl DSPEx.Program
def forward(program, inputs, opts \\ []) do
# Generate multiple reasoning chains in parallel
chain_tasks = for chain <- program.chains do
Task.async(fn -> DSPEx.Program.forward(chain, inputs, opts) end)
end
# Collect results
chain_results = Task.await_many(chain_tasks, 30_000)
# Filter successful results
successful_chains = chain_results
|> Enum.with_index()
|> Enum.filter(fn {result, _idx} -> match?({:ok, _}, result) end)
|> Enum.map(fn {{:ok, result}, idx} -> {result, idx} end)
if Enum.empty?(successful_chains) do
{:error, :all_chains_failed}
else
# Select best chain based on strategy
select_best_chain(program, successful_chains, inputs)
end
end
defp select_best_chain(program, chains, inputs) do
case program.selection_strategy do
:comparison ->
select_by_comparison(program.comparator, chains, inputs)
:voting ->
select_by_voting(chains)
:confidence ->
select_by_confidence(chains)
_ ->
# Default: return first successful chain
{result, _idx} = List.first(chains)
{:ok, result}
end
end
defp select_by_comparison(comparator, chains, original_inputs) do
# Format chains for comparison
comparison_input = format_chains_for_comparison(chains, original_inputs)
case DSPEx.Program.forward(comparator, comparison_input) do
{:ok, %{best_chain: best_idx, reasoning: reasoning}} ->
case Enum.find(chains, fn {_result, idx} -> idx == best_idx end) do
{best_result, _} ->
{:ok, Map.put(best_result, :selection_reasoning, reasoning)}
nil ->
# Fallback to first chain if comparison failed
{result, _} = List.first(chains)
{:ok, result}
end
_error ->
# Fallback to first chain if comparison failed
{result, _} = List.first(chains)
{:ok, result}
end
end
defp select_by_voting(chains) do
# Simple majority voting on answers
answers = Enum.map(chains, fn {result, _} ->
result[:answer] || result[:output]
end)
most_common = answers
|> Enum.frequencies()
|> Enum.max_by(fn {_answer, count} -> count end)
|> elem(0)
# Return the first chain with the most common answer
{result, _} = Enum.find(chains, fn {result, _} ->
(result[:answer] || result[:output]) == most_common
end)
{:ok, Map.put(result, :selection_method, :voting)}
end
defp select_by_confidence(chains) do
# Select chain with highest confidence score
# This would require confidence scoring in the base results
{result, _} = Enum.max_by(chains, fn {result, _} ->
result[:confidence] || 0.5
end)
{:ok, Map.put(result, :selection_method, :confidence)}
end
defp create_comparator(signature, opts) do
comparison_signature = create_comparison_signature(signature)
ChainOfThought.new(comparison_signature, opts)
end
defp format_chains_for_comparison(chains, original_inputs) do
chain_summaries = chains
|> Enum.with_index()
|> Enum.map(fn {{result, chain_idx}, display_idx} ->
"""
Chain #{display_idx + 1}:
Reasoning: #{result[:rationale] || "No rationale provided"}
Answer: #{result[:answer] || result[:output] || "No answer"}
"""
end)
|> Enum.join("\n\n")
%{
original_question: original_inputs[:question] || inspect(original_inputs),
reasoning_chains: chain_summaries
}
end
defp create_comparison_signature(_original_signature) do
instructions = """
Compare multiple reasoning chains and select the best one.
Evaluate each chain based on:
1. Logical consistency
2. Completeness of reasoning
3. Accuracy of the approach
4. Clarity of explanation
Return the index (1-based) of the best chain and explain your reasoning.
"""
create_signature("original_question, reasoning_chains -> best_chain, reasoning", instructions)
end
end
🔄 Missing Composition and Control Flow Modules
1. Retry Module - MISSING
Status: ❌ Not implemented in DSPEx
Python DSPy Features:
- Automatic retry on failure
- Backoff strategies
- Error-specific retry logic
- Max attempt limits
2. Parallel Module - PARTIAL
Status: ⚠️ DSPEx has superior Task.async_stream
but lacks DSPy’s Parallel module interface
Missing Features:
- Standardized parallel execution interface
- Result aggregation patterns
- Error handling across parallel tasks
- Resource management
3. Ensemble Module - MISSING
Status: ❌ Not implemented in DSPEx
Required Features:
- Multiple model combination
- Voting mechanisms
- Weighted averaging
- Confidence-based selection
🎯 Module System Infrastructure Gaps
1. Module Composition
Python DSPy Strength:
class ComplexPipeline(dspy.Module):
def __init__(self):
self.retriever = dspy.Retrieve(k=5)
self.chain_of_thought = dspy.ChainOfThought("context, question -> answer")
self.refine = dspy.Refine("context, question, draft_answer -> refined_answer")
def forward(self, question):
context = self.retriever(question)
draft = self.chain_of_thought(context=context, question=question)
final = self.refine(context=context, question=question, draft_answer=draft.answer)
return final
DSPEx Current Limitations:
- No standardized module composition patterns
- Limited pipeline building capabilities
- Missing module lifecycle management
2. State Management
DSPy Features:
- Automatic parameter management
- Demo storage and retrieval
- History tracking
- State serialization
DSPEx Gaps:
- Basic state management
- Limited history tracking
- No standardized serialization
3. Module Introspection
DSPy Features:
# Automatic predictor discovery
for name, predictor in module.named_predictors():
print(f"Found predictor: {name}")
# Parameter extraction
module.parameters()
DSPEx Status: ❌ Missing introspection capabilities
📊 Implementation Priority Assessment
Module | Priority | Effort | DSPy Usage | Implementation Status |
---|---|---|---|---|
ChainOfThought | CRITICAL | MEDIUM | VERY HIGH | ❌ Missing |
ReAct | HIGH | HIGH | HIGH | ❌ Missing |
MultiChainComparison | MEDIUM | MEDIUM | MEDIUM | ❌ Missing |
ProgramOfThought | MEDIUM | HIGH | MEDIUM | ❌ Missing |
Retry | HIGH | LOW | HIGH | ❌ Missing |
Parallel | MEDIUM | LOW | MEDIUM | ⚠️ Partial |
Ensemble | MEDIUM | MEDIUM | MEDIUM | ❌ Missing |
Module Composition | HIGH | MEDIUM | HIGH | ⚠️ Limited |
Introspection | MEDIUM | MEDIUM | MEDIUM | ❌ Missing |
🎯 Recommended Implementation Order
- ChainOfThought - Most widely used reasoning pattern
- Retry Module - Essential for robust production use
- Module Composition Framework - Enables complex pipelines
- ReAct - Critical for agent-style applications
- MultiChainComparison - Improves reasoning quality
- Module Introspection - Enables advanced optimizers
- ProgramOfThought - Specialized for mathematical reasoning
- Ensemble - Advanced combination strategies
This analysis shows DSPEx needs significant work on advanced reasoning modules to match DSPy’s capabilities for complex AI applications.