← Back to Tool bridge

Compositional dspy integration

Documentation for compositional_dspy_integration from the Dspex repository.

Compositional DSPy Integration: Alternative Approaches to Tool Bridge

Executive Summary

This document explores compositional approaches to DSPy integration that could complement or potentially replace the need for a full tool bridge in certain scenarios. We examine direct Python-based composition patterns, hybrid approaches, and architectural considerations for building more complex DSPy programs within DSPex.

Current State Analysis

Tool Bridge Design vs. Implementation

The tool bridge architecture documented in DSPex represents a sophisticated bidirectional RPC system, but analysis of the codebase reveals:

  1. Design Phase: The tool bridge remains unimplemented, with placeholder code in modules like DSPex.Modules.ReAct
  2. Mock Pattern: Examples use mock tools rather than actual bridge integration
  3. Direct Invocation: All current DSPy integration uses direct Snakepit.Python.call/3 invocations

Existing Compositional Patterns

Current DSPex examples demonstrate several compositional patterns without a tool bridge:

# Sequential composition
{:ok, result1} = DSPex.call(module1, input)
{:ok, result2} = DSPex.call(module2, result1.answer)

# Parallel composition
tasks = Enum.map(inputs, fn input ->
  Task.async(fn -> DSPex.call(module, input) end)
end)
results = Task.await_many(tasks)

# Conditional composition
module = if complex_query?(input), do: cot_module, else: basic_module
{:ok, result} = DSPex.call(module, input)

Compositional Python Approaches

1. Enhanced Python Session Management

Instead of individual tool calls, we can create richer Python sessions that maintain state and compose operations:

# Python side (enhanced_dspy_session.py)
class DSPyComposer:
    def __init__(self):
        self.modules = {}
        self.pipelines = {}
        self.results = {}
    
    def create_pipeline(self, name, steps):
        """Create a reusable pipeline of DSPy operations"""
        pipeline = []
        for step in steps:
            if isinstance(step, dict):
                module = self._create_module(step)
                pipeline.append(module)
            else:
                pipeline.append(step)
        self.pipelines[name] = pipeline
        return name
    
    def execute_pipeline(self, pipeline_name, input_data):
        """Execute a named pipeline with branching and aggregation"""
        pipeline = self.pipelines[pipeline_name]
        current_data = input_data
        
        for step in pipeline:
            if callable(step):
                current_data = step(current_data)
            elif isinstance(step, dict) and step.get("type") == "branch":
                # Conditional branching
                condition = step["condition"]
                if condition(current_data):
                    current_data = self.execute_pipeline(step["true_branch"], current_data)
                else:
                    current_data = self.execute_pipeline(step["false_branch"], current_data)
            elif isinstance(step, dict) and step.get("type") == "parallel":
                # Parallel execution
                results = []
                for branch in step["branches"]:
                    results.append(self.execute_pipeline(branch, current_data))
                current_data = step.get("aggregator", lambda x: x)(results)
        
        return current_data

2. DSPex-Native Compositional Framework

We can extend DSPex with native compositional capabilities that reduce the need for cross-language tool calls:

defmodule DSPex.Composer do
  @moduledoc """
  Native DSPy composition without tool bridge overhead
  """
  
  defstruct [:session_id, :stored_modules, :pipelines]
  
  def new(opts \\ []) do
    session_id = ID.generate("composer")
    
    # Initialize a stateful Python session
    {:ok, _} = Snakepit.Python.call(:runtime, """
    from enhanced_dspy_session import DSPyComposer
    composer = DSPyComposer()
    """, store_as: session_id)
    
    %__MODULE__{
      session_id: session_id,
      stored_modules: %{},
      pipelines: %{}
    }
  end
  
  def pipeline(composer, name, steps) do
    """Define a compositional pipeline"""
    python_steps = Enum.map(steps, &convert_step_to_python/1)
    
    {:ok, _} = Snakepit.Python.call(:runtime, """
    composer.create_pipeline(#{inspect(name)}, #{inspect(python_steps)})
    """, session_id: composer.session_id)
    
    %{composer | pipelines: Map.put(composer.pipelines, name, steps)}
  end
  
  def execute(composer, pipeline_name, input) do
    """Execute a pipeline with full composition support"""
    {:ok, result} = Snakepit.Python.call(:runtime, """
    result = composer.execute_pipeline(#{inspect(pipeline_name)}, #{inspect(input)})
    result
    """, session_id: composer.session_id)
    
    result
  end
  
  defp convert_step_to_python(%{type: :module} = step) do
    %{
      "type" => "module",
      "class" => step.class,
      "signature" => step.signature,
      "config" => step.config
    }
  end
  
  defp convert_step_to_python(%{type: :branch} = step) do
    %{
      "type" => "branch",
      "condition" => compile_condition(step.condition),
      "true_branch" => step.true_branch,
      "false_branch" => step.false_branch
    }
  end
end

3. Hybrid Approach: Selective Tool Bridge

For cases where tool bridge functionality is essential, we can implement a lightweight version that coexists with compositional patterns:

defmodule DSPex.LightweightBridge do
  @moduledoc """
  Minimal tool bridge for specific use cases
  """
  
  def register_batch(functions) do
    """Register multiple functions as a batch operation"""
    # Instead of individual tool registration, register batches
    batch_id = ID.generate("batch")
    
    # Store function references
    functions_map = Map.new(functions, fn {name, func} ->
      {to_string(name), func}
    end)
    
    # Create a Python-side batch processor
    {:ok, _} = Snakepit.Python.call(:runtime, """
    class BatchToolProcessor:
        def __init__(self, batch_id):
            self.batch_id = batch_id
            self.results = {}
        
        def process_batch(self, requests):
            # Send all requests at once to Elixir
            return elixir_batch_call(self.batch_id, requests)
    
    batch_processor = BatchToolProcessor(#{inspect(batch_id)})
    """, store_as: batch_id)
    
    {:ok, batch_id, functions_map}
  end
end

4. Data Multiplication Pattern

A pattern where data flows both through compositional DSPy programs AND back via bridges:

class MultiPathProcessor:
    """Process data through multiple paths simultaneously"""
    
    def __init__(self):
        self.paths = {}
        self.collectors = {}
    
    def add_path(self, name, processor, collector=None):
        """Add a processing path with optional result collection"""
        self.paths[name] = processor
        if collector:
            self.collectors[name] = collector
    
    def process(self, input_data):
        """Process input through all paths"""
        results = {}
        
        # Fork the data to all paths
        for path_name, processor in self.paths.items():
            try:
                # Clone the input for each path
                path_input = copy.deepcopy(input_data)
                result = processor(path_input)
                results[path_name] = result
                
                # If there's a collector, send result back to Elixir
                if path_name in self.collectors:
                    self.collectors[path_name](result)
            except Exception as e:
                results[path_name] = {"error": str(e)}
        
        return results

Architectural Recommendations

1. Layered Integration Strategy

┌─────────────────────────────────────────┐
│         Application Layer               │
│   (Business logic, orchestration)       │
├─────────────────────────────────────────┤
│      DSPex Compositional Layer          │
│ (Pipelines, branches, aggregations)     │
├─────────────────────────────────────────┤
│      Selective Tool Bridge Layer        │
│  (Only for external tool integration)   │
├─────────────────────────────────────────┤
│         Core DSPy Layer                 │
│    (Direct module invocation)           │
└─────────────────────────────────────────┘

2. Decision Matrix for Integration Approach

Use CaseRecommended ApproachRationale
Sequential DSPy operationsDirect compositionNo bridge overhead needed
Parallel processingNative Elixir + Python sessionsBetter resource utilization
External tool integrationSelective tool bridgeOnly where necessary
Complex branching logicCompositional frameworkCleaner code, better debugging
Real-time streamingHybrid with data multiplicationFlexibility for different consumers

3. Implementation Priorities

  1. Phase 1: Enhance session management for stateful compositions
  2. Phase 2: Implement native compositional framework
  3. Phase 3: Add selective tool bridge for external integrations
  4. Phase 4: Implement data multiplication patterns

Benefits of Compositional Approach

1. Performance

  • Reduced RPC overhead for internal operations
  • Batch processing capabilities
  • Better caching of intermediate results

2. Maintainability

  • Clearer separation of concerns
  • Easier debugging with fewer cross-language calls
  • More idiomatic code in both languages

3. Flexibility

  • Mix and match approaches as needed
  • Progressive enhancement of capabilities
  • Easier to extend without breaking changes

Example: Compositional DSPy Program

defmodule DSPex.Examples.CompositeReasoning do
  alias DSPex.Composer
  
  def build_reasoning_pipeline do
    composer = Composer.new()
    
    # Define a complex pipeline with branching
    composer
    |> Composer.pipeline("advanced_reasoning", [
      %{type: :module, class: "ChainOfThought", signature: "question -> reasoning, answer"},
      %{
        type: :branch,
        condition: &contains_math?/1,
        true_branch: "math_solver",
        false_branch: "general_solver"
      },
      %{type: :aggregator, function: &combine_results/1}
    ])
    |> Composer.pipeline("math_solver", [
      %{type: :module, class: "ProgramOfThought", signature: "problem -> code, result"},
      %{type: :module, class: "Predict", signature: "result -> final_answer"}
    ])
    |> Composer.pipeline("general_solver", [
      %{type: :module, class: "Predict", signature: "reasoning -> final_answer"}
    ])
  end
  
  def solve(composer, question) do
    Composer.execute(composer, "advanced_reasoning", %{question: question})
  end
end

Conclusion

While the tool bridge architecture provides a comprehensive solution for bidirectional communication, many DSPy integration scenarios can be better served by compositional approaches that:

  1. Keep related operations within the same language boundary
  2. Reduce serialization overhead
  3. Provide clearer abstractions for complex workflows
  4. Allow selective use of bridge functionality where truly needed

The recommended approach is to implement a layered architecture that starts with compositional patterns and adds tool bridge capabilities only where external tool integration is required. This provides the best balance of performance, maintainability, and flexibility for building sophisticated DSPy applications within DSPex.