← Back to Gap analysis

DSPEX GAP ANALYSIS CORE 02

Documentation for DSPEX_GAP_ANALYSIS_CORE_02 from the Ds ex repository.

DSPEx Gap Analysis: Module System and Advanced Predictors

Overview

This document analyzes the gaps in DSPEx’s module system and advanced predictor implementations compared to DSPy. The focus is on the sophisticated reasoning patterns and module composition capabilities that make DSPy powerful for complex AI applications.


🧠 Advanced Reasoning Modules: Critical Gaps

1. Chain of Thought (CoT) - MISSING

Status: ❌ Not implemented in DSPEx

Python DSPy Implementation Analysis:

class ChainOfThought(Module):
    def __init__(self, signature, rationale_type=None, activated=True, **config):
        super().__init__()
        self.activated = activated
        self.signature = signature
        
        # Create extended signature with rationale
        signature = signature.with_updated_fields(
            "rationale", desc="Let's think step by step to answer this question."
        )
        self.predict = Predict(signature, **config)
        
    def forward(self, **kwargs):
        if not self.activated:
            return self.predict(**kwargs)
        return self.predict(**kwargs)

Required DSPEx Implementation:

defmodule DSPEx.Predict.ChainOfThought do
  @moduledoc """
  Chain of Thought reasoning that adds intermediate reasoning steps.
  
  Automatically extends signatures to include rationale/thinking fields
  and prompts models to show their reasoning process.
  """
  
  use DSPEx.Program
  alias DSPEx.{Signature, Predict}
  
  defstruct [:original_signature, :extended_signature, :predict, :activated, :rationale_type]
  
  @type t :: %__MODULE__{
    original_signature: module(),
    extended_signature: module(),
    predict: Predict.t(),
    activated: boolean(),
    rationale_type: atom()
  }
  
  @spec new(module(), keyword()) :: t()
  def new(signature, opts \\ []) do
    activated = Keyword.get(opts, :activated, true)
    rationale_type = Keyword.get(opts, :rationale_type, :thinking)
    
    # Create extended signature with rationale field
    extended_signature = create_extended_signature(signature, rationale_type)
    predict = Predict.new(extended_signature, opts)
    
    %__MODULE__{
      original_signature: signature,
      extended_signature: extended_signature,
      predict: predict,
      activated: activated,
      rationale_type: rationale_type
    }
  end
  
  @impl DSPEx.Program
  def forward(program, inputs, opts \\ []) do
    if program.activated do
      case DSPEx.Program.forward(program.predict, inputs, opts) do
        {:ok, outputs} ->
          # Extract final answer from rationale + answer
          final_outputs = extract_final_outputs(outputs, program.original_signature)
          {:ok, final_outputs}
        error -> error
      end
    else
      # Direct prediction without reasoning
      basic_predict = Predict.new(program.original_signature, opts)
      DSPEx.Program.forward(basic_predict, inputs, opts)
    end
  end
  
  # Create signature with added reasoning field
  defp create_extended_signature(original_signature, rationale_type) do
    reasoning_desc = case rationale_type do
      :thinking -> "Let's think step by step to solve this problem."
      :analysis -> "Let's analyze this step by step."
      :reasoning -> "Let's reason through this carefully."
      _ -> "Let's work through this systematically."
    end
    
    # This would need integration with DSPEx.Signature system
    # to dynamically create extended signatures
    Signature.extend_with_field(original_signature, :rationale, reasoning_desc)
  end
  
  defp extract_final_outputs(outputs, original_signature) do
    # Remove rationale field and return only original signature outputs
    original_fields = Signature.output_fields(original_signature)
    Map.take(outputs, original_fields)
  end
end

2. ReAct (Reason + Act) - MISSING

Status: ❌ Not implemented in DSPEx

Python DSPy ReAct Pattern:

class ReAct(Module):
    def __init__(self, signature, tools, max_iters=5):
        self.signature = signature
        self.tools = tools
        self.max_iters = max_iters
        
        # Create signatures for different steps
        self.think_sig = Signature("observation -> thought, action, action_input")
        self.act_sig = Signature("action, action_input -> observation")
        
    def forward(self, **kwargs):
        observation = kwargs.get("question", "")
        
        for i in range(self.max_iters):
            # Think step
            thought_pred = dspy.Predict(self.think_sig)(observation=observation)
            
            # Act step
            if thought_pred.action == "Final Answer":
                return thought_pred.action_input
                
            # Execute action
            observation = self.execute_action(thought_pred.action, thought_pred.action_input)

Required DSPEx Implementation:

defmodule DSPEx.Predict.ReAct do
  @moduledoc """
  ReAct (Reason + Act) module for agent-style reasoning with tool use.
  
  Implements the Reason-Act-Observe loop for multi-step problem solving
  with external tool integration.
  """
  
  use DSPEx.Program
  alias DSPEx.{Signature, Predict}
  
  defstruct [
    :signature,
    :tools,
    :max_iters,
    :think_predictor,
    :final_predictor,
    :tool_executor
  ]
  
  @type tool :: %{
    name: String.t(),
    description: String.t(),
    function: function()
  }
  
  @type t :: %__MODULE__{
    signature: module(),
    tools: [tool()],
    max_iters: pos_integer(),
    think_predictor: Predict.t(),
    final_predictor: Predict.t(),
    tool_executor: module()
  }
  
  def new(signature, tools, opts \\ []) do
    max_iters = Keyword.get(opts, :max_iters, 5)
    
    # Create specialized signatures for ReAct steps
    think_signature = create_think_signature(tools)
    final_signature = create_final_signature(signature)
    
    %__MODULE__{
      signature: signature,
      tools: tools,
      max_iters: max_iters,
      think_predictor: Predict.new(think_signature, opts),
      final_predictor: Predict.new(final_signature, opts),
      tool_executor: DSPEx.Tools.Executor
    }
  end
  
  @impl DSPEx.Program
  def forward(program, inputs, opts \\ []) do
    initial_observation = format_initial_observation(inputs)
    
    react_loop(program, initial_observation, 0, [])
  end
  
  defp react_loop(program, observation, iteration, history) when iteration >= program.max_iters do
    # Max iterations reached, return best effort
    {:ok, %{answer: "Maximum iterations reached. Unable to complete task.", history: history}}
  end
  
  defp react_loop(program, observation, iteration, history) do
    # THINK: Decide what to do next
    think_inputs = %{
      observation: observation,
      available_tools: format_tools(program.tools),
      iteration: iteration
    }
    
    case DSPEx.Program.forward(program.think_predictor, think_inputs) do
      {:ok, %{thought: thought, action: action, action_input: action_input}} ->
        updated_history = [{:thought, thought}, {:action, action} | history]
        
        case action do
          "Final Answer" ->
            # Generate final answer
            final_inputs = %{
              question: observation,
              reasoning_history: format_history(updated_history),
              answer: action_input
            }
            
            case DSPEx.Program.forward(program.final_predictor, final_inputs) do
              {:ok, final_outputs} ->
                {:ok, Map.put(final_outputs, :history, Enum.reverse(updated_history))}
              error -> error
            end
            
          _ ->
            # Execute the action
            case execute_tool_action(program, action, action_input) do
              {:ok, new_observation} ->
                react_loop(program, new_observation, iteration + 1, 
                          [{:observation, new_observation} | updated_history])
              {:error, error_msg} ->
                error_observation = "Error: #{error_msg}"
                react_loop(program, error_observation, iteration + 1,
                          [{:error, error_observation} | updated_history])
            end
        end
        
      {:error, reason} ->
        {:error, {:react_thinking_failed, reason}}
    end
  end
  
  defp create_think_signature(tools) do
    tool_descriptions = Enum.map_join(tools, "\n", fn tool ->
      "- #{tool.name}: #{tool.description}"
    end)
    
    instructions = """
    You are a helpful assistant that can use tools to answer questions.
    
    Available tools:
    #{tool_descriptions}
    
    Given an observation, think about what to do next. You can either:
    1. Use a tool by specifying the action name and input
    2. Provide a "Final Answer" if you have enough information
    
    Format your response with:
    - thought: Your reasoning about what to do next
    - action: Either a tool name or "Final Answer" 
    - action_input: The input for the tool or your final answer
    """
    
    # This would use DSPEx signature creation
    create_signature("observation -> thought, action, action_input", instructions)
  end
  
  defp create_final_signature(_original_signature) do
    instructions = """
    Based on your reasoning history, provide a comprehensive final answer to the original question.
    """
    
    create_signature("question, reasoning_history, answer -> final_answer", instructions)
  end
  
  defp execute_tool_action(program, action, action_input) do
    case Enum.find(program.tools, &(&1.name == action)) do
      nil ->
        {:error, "Unknown tool: #{action}"}
      tool ->
        try do
          result = tool.function.(action_input)
          {:ok, result}
        rescue
          error ->
            {:error, "Tool execution failed: #{inspect(error)}"}
        end
    end
  end
  
  defp format_tools(tools) do
    Enum.map_join(tools, "\n", fn tool ->
      "#{tool.name}: #{tool.description}"
    end)
  end
  
  defp format_initial_observation(inputs) do
    case inputs do
      %{question: question} -> question
      %{input: input} -> input
      _ -> inspect(inputs)
    end
  end
  
  defp format_history(history) do
    history
    |> Enum.reverse()
    |> Enum.map_join("\n", fn
      {:thought, thought} -> "Thought: #{thought}"
      {:action, action} -> "Action: #{action}"
      {:observation, obs} -> "Observation: #{obs}"
      {:error, error} -> "Error: #{error}"
    end)
  end
  
  # Helper to create signatures - would integrate with DSPEx.Signature
  defp create_signature(spec, instructions) do
    # Implementation depends on DSPEx signature system
    spec
  end
end

3. Program of Thought (PoT) - MISSING

Status: ❌ Not implemented in DSPEx

Python DSPy Features:

  • Code generation for mathematical reasoning
  • Safe code execution environment
  • Integration with Python interpreter
  • Result verification and error handling

Required DSPEx Implementation:

defmodule DSPEx.Predict.ProgramOfThought do
  @moduledoc """
  Program of Thought module for code-based reasoning.
  
  Generates and executes code to solve problems that require
  mathematical or algorithmic thinking.
  """
  
  use DSPEx.Program
  alias DSPEx.{Signature, Predict}
  
  defstruct [
    :signature,
    :code_predictor,
    :interpreter,
    :max_retries,
    :timeout
  ]
  
  def new(signature, opts \\ []) do
    max_retries = Keyword.get(opts, :max_retries, 3)
    timeout = Keyword.get(opts, :timeout, 10_000)
    
    # Create signature for code generation
    code_signature = create_code_signature(signature)
    
    %__MODULE__{
      signature: signature,
      code_predictor: Predict.new(code_signature, opts),
      interpreter: DSPEx.CodeInterpreter,
      max_retries: max_retries,
      timeout: timeout
    }
  end
  
  @impl DSPEx.Program
  def forward(program, inputs, opts \\ []) do
    generate_and_execute_code(program, inputs, 0)
  end
  
  defp generate_and_execute_code(program, inputs, attempt) when attempt >= program.max_retries do
    {:error, :max_retries_exceeded}
  end
  
  defp generate_and_execute_code(program, inputs, attempt) do
    # Generate code
    case DSPEx.Program.forward(program.code_predictor, inputs) do
      {:ok, %{code: code, explanation: explanation}} ->
        # Execute the generated code
        case execute_code_safely(program.interpreter, code, program.timeout) do
          {:ok, result} ->
            {:ok, %{answer: result, code: code, explanation: explanation}}
          {:error, execution_error} ->
            # Retry with error context
            error_context = Map.put(inputs, :previous_error, execution_error)
            generate_and_execute_code(program, error_context, attempt + 1)
        end
      error -> error
    end
  end
  
  defp create_code_signature(_original_signature) do
    instructions = """
    Solve this problem by writing Python code. 
    
    Your code should:
    1. Be complete and executable
    2. Include clear variable names
    3. Print or return the final answer
    4. Handle edge cases appropriately
    
    Provide both the code and a brief explanation of your approach.
    """
    
    create_signature("problem -> code, explanation", instructions)
  end
  
  defp execute_code_safely(interpreter, code, timeout) do
    # This would integrate with a safe code execution environment
    # Could use Docker, restricted Python environment, or Elixir ports
    interpreter.execute(code, timeout: timeout)
  end
end

4. Multi-Chain Comparison - MISSING

Status: ❌ Not implemented in DSPEx

Python DSPy Pattern:

class MultiChainComparison(Module):
    def __init__(self, signature, M=3):
        self.M = M
        self.signature = signature
        self.chains = [ChainOfThought(signature) for _ in range(M)]
        self.compare = ChainOfThought("reasoning_chains -> best_reasoning")
        
    def forward(self, **kwargs):
        # Generate M different reasoning chains
        chains = [chain(**kwargs) for chain in self.chains]
        
        # Compare and select best
        comparison_input = format_chains_for_comparison(chains)
        best = self.compare(reasoning_chains=comparison_input)
        return best

Required DSPEx Implementation:

defmodule DSPEx.Predict.MultiChainComparison do
  @moduledoc """
  Multi-Chain Comparison for generating and comparing multiple reasoning paths.
  
  Generates multiple independent reasoning chains and selects the best one
  through comparison and evaluation.
  """
  
  use DSPEx.Program
  alias DSPEx.Predict.ChainOfThought
  
  defstruct [:signature, :chains, :comparator, :num_chains, :selection_strategy]
  
  def new(signature, opts \\ []) do
    num_chains = Keyword.get(opts, :num_chains, 3)
    selection_strategy = Keyword.get(opts, :selection_strategy, :comparison)
    
    # Create multiple reasoning chains
    chains = for _i <- 1..num_chains do
      ChainOfThought.new(signature, opts)
    end
    
    # Create comparator for selecting best chain
    comparator = create_comparator(signature, opts)
    
    %__MODULE__{
      signature: signature,
      chains: chains,
      comparator: comparator,
      num_chains: num_chains,
      selection_strategy: selection_strategy
    }
  end
  
  @impl DSPEx.Program
  def forward(program, inputs, opts \\ []) do
    # Generate multiple reasoning chains in parallel
    chain_tasks = for chain <- program.chains do
      Task.async(fn -> DSPEx.Program.forward(chain, inputs, opts) end)
    end
    
    # Collect results
    chain_results = Task.await_many(chain_tasks, 30_000)
    
    # Filter successful results
    successful_chains = chain_results
    |> Enum.with_index()
    |> Enum.filter(fn {result, _idx} -> match?({:ok, _}, result) end)
    |> Enum.map(fn {{:ok, result}, idx} -> {result, idx} end)
    
    if Enum.empty?(successful_chains) do
      {:error, :all_chains_failed}
    else
      # Select best chain based on strategy
      select_best_chain(program, successful_chains, inputs)
    end
  end
  
  defp select_best_chain(program, chains, inputs) do
    case program.selection_strategy do
      :comparison ->
        select_by_comparison(program.comparator, chains, inputs)
      :voting ->
        select_by_voting(chains)
      :confidence ->
        select_by_confidence(chains)
      _ ->
        # Default: return first successful chain
        {result, _idx} = List.first(chains)
        {:ok, result}
    end
  end
  
  defp select_by_comparison(comparator, chains, original_inputs) do
    # Format chains for comparison
    comparison_input = format_chains_for_comparison(chains, original_inputs)
    
    case DSPEx.Program.forward(comparator, comparison_input) do
      {:ok, %{best_chain: best_idx, reasoning: reasoning}} ->
        case Enum.find(chains, fn {_result, idx} -> idx == best_idx end) do
          {best_result, _} ->
            {:ok, Map.put(best_result, :selection_reasoning, reasoning)}
          nil ->
            # Fallback to first chain if comparison failed
            {result, _} = List.first(chains)
            {:ok, result}
        end
      _error ->
        # Fallback to first chain if comparison failed
        {result, _} = List.first(chains)
        {:ok, result}
    end
  end
  
  defp select_by_voting(chains) do
    # Simple majority voting on answers
    answers = Enum.map(chains, fn {result, _} -> 
      result[:answer] || result[:output] 
    end)
    
    most_common = answers
    |> Enum.frequencies()
    |> Enum.max_by(fn {_answer, count} -> count end)
    |> elem(0)
    
    # Return the first chain with the most common answer
    {result, _} = Enum.find(chains, fn {result, _} ->
      (result[:answer] || result[:output]) == most_common
    end)
    
    {:ok, Map.put(result, :selection_method, :voting)}
  end
  
  defp select_by_confidence(chains) do
    # Select chain with highest confidence score
    # This would require confidence scoring in the base results
    {result, _} = Enum.max_by(chains, fn {result, _} ->
      result[:confidence] || 0.5
    end)
    
    {:ok, Map.put(result, :selection_method, :confidence)}
  end
  
  defp create_comparator(signature, opts) do
    comparison_signature = create_comparison_signature(signature)
    ChainOfThought.new(comparison_signature, opts)
  end
  
  defp format_chains_for_comparison(chains, original_inputs) do
    chain_summaries = chains
    |> Enum.with_index()
    |> Enum.map(fn {{result, chain_idx}, display_idx} ->
      """
      Chain #{display_idx + 1}:
      Reasoning: #{result[:rationale] || "No rationale provided"}
      Answer: #{result[:answer] || result[:output] || "No answer"}
      """
    end)
    |> Enum.join("\n\n")
    
    %{
      original_question: original_inputs[:question] || inspect(original_inputs),
      reasoning_chains: chain_summaries
    }
  end
  
  defp create_comparison_signature(_original_signature) do
    instructions = """
    Compare multiple reasoning chains and select the best one.
    
    Evaluate each chain based on:
    1. Logical consistency
    2. Completeness of reasoning
    3. Accuracy of the approach
    4. Clarity of explanation
    
    Return the index (1-based) of the best chain and explain your reasoning.
    """
    
    create_signature("original_question, reasoning_chains -> best_chain, reasoning", instructions)
  end
end

🔄 Missing Composition and Control Flow Modules

1. Retry Module - MISSING

Status: ❌ Not implemented in DSPEx

Python DSPy Features:

  • Automatic retry on failure
  • Backoff strategies
  • Error-specific retry logic
  • Max attempt limits

2. Parallel Module - PARTIAL

Status: ⚠️ DSPEx has superior Task.async_stream but lacks DSPy’s Parallel module interface

Missing Features:

  • Standardized parallel execution interface
  • Result aggregation patterns
  • Error handling across parallel tasks
  • Resource management

3. Ensemble Module - MISSING

Status: ❌ Not implemented in DSPEx

Required Features:

  • Multiple model combination
  • Voting mechanisms
  • Weighted averaging
  • Confidence-based selection

🎯 Module System Infrastructure Gaps

1. Module Composition

Python DSPy Strength:

class ComplexPipeline(dspy.Module):
    def __init__(self):
        self.retriever = dspy.Retrieve(k=5)
        self.chain_of_thought = dspy.ChainOfThought("context, question -> answer")
        self.refine = dspy.Refine("context, question, draft_answer -> refined_answer")
        
    def forward(self, question):
        context = self.retriever(question)
        draft = self.chain_of_thought(context=context, question=question)
        final = self.refine(context=context, question=question, draft_answer=draft.answer)
        return final

DSPEx Current Limitations:

  • No standardized module composition patterns
  • Limited pipeline building capabilities
  • Missing module lifecycle management

2. State Management

DSPy Features:

  • Automatic parameter management
  • Demo storage and retrieval
  • History tracking
  • State serialization

DSPEx Gaps:

  • Basic state management
  • Limited history tracking
  • No standardized serialization

3. Module Introspection

DSPy Features:

# Automatic predictor discovery
for name, predictor in module.named_predictors():
    print(f"Found predictor: {name}")
    
# Parameter extraction
module.parameters()

DSPEx Status: ❌ Missing introspection capabilities


📊 Implementation Priority Assessment

ModulePriorityEffortDSPy UsageImplementation Status
ChainOfThoughtCRITICALMEDIUMVERY HIGH❌ Missing
ReActHIGHHIGHHIGH❌ Missing
MultiChainComparisonMEDIUMMEDIUMMEDIUM❌ Missing
ProgramOfThoughtMEDIUMHIGHMEDIUM❌ Missing
RetryHIGHLOWHIGH❌ Missing
ParallelMEDIUMLOWMEDIUM⚠️ Partial
EnsembleMEDIUMMEDIUMMEDIUM❌ Missing
Module CompositionHIGHMEDIUMHIGH⚠️ Limited
IntrospectionMEDIUMMEDIUMMEDIUM❌ Missing

  1. ChainOfThought - Most widely used reasoning pattern
  2. Retry Module - Essential for robust production use
  3. Module Composition Framework - Enables complex pipelines
  4. ReAct - Critical for agent-style applications
  5. MultiChainComparison - Improves reasoning quality
  6. Module Introspection - Enables advanced optimizers
  7. ProgramOfThought - Specialized for mathematical reasoning
  8. Ensemble - Advanced combination strategies

This analysis shows DSPEx needs significant work on advanced reasoning modules to match DSPy’s capabilities for complex AI applications.