← Back to Pipeline ex

JULY 1 ARCH DOCS 05 DSPY INTEGRATION

Documentation for JULY_1_ARCH_DOCS_05_DSPY_INTEGRATION from the Pipeline ex repository.

JULY_1_ARCH_DOCS_05: DSPy Integration & Optimization Layer

Overview

The DSPy integration layer transforms ElexirionDSP from a static pipeline executor into a self-optimizing system. By treating pipeline prompts as parameters to be learned rather than fixed instructions, we enable continuous improvement of AI workflow performance.

Core Philosophy

From Static Prompts to Learned Programs

Traditional Approach (Static):

# Fixed prompt that never improves
- name: analyze_code
  type: claude
  prompt: "Analyze this code for bugs and suggest improvements"

DSPy Approach (Dynamic):

# Prompt that optimizes itself based on success metrics
class CodeAnalysisModule(dspy.Module):
    def __init__(self):
        self.analyze = dspy.ChainOfThought("code -> analysis")
    
    def forward(self, code):
        return self.analyze(code=code)

# DSPy finds the best prompt automatically
optimized = dspy.compile(CodeAnalysisModule(), metric=validate_analysis_quality)

The Hybrid Execution Model

┌─────────────────────────────────────────────────────────────┐
│                   DSPy Optimizer (Python)                   │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                Learning Loop                        │    │
│  │  1. Generate candidate prompts                      │    │
│  │  2. Test via Elixir execution                       │    │
│  │  3. Measure success metrics                         │    │
│  │  4. Update prompt parameters                        │    │
│  │  5. Repeat until convergence                        │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                              │
                    subprocess calls
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                Elixir Execution Runtime                     │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Production Pipeline                    │    │
│  │  • Robust error handling                           │    │
│  │  • OTP supervision                                 │    │
│  │  • Multi-provider support                          │    │
│  │  • Concurrent execution                            │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Implementation Architecture

1. Python-Elixir Bridge

# bridge/elixir_pipeline.py
import subprocess
import json
import tempfile
from typing import Dict, Any, Optional

class ElixirPipelineExecutor:
    """Bridge between DSPy and Elixir pipeline execution"""
    
    def __init__(self, elixir_project_path: str):
        self.project_path = elixir_project_path
        self.temp_dir = tempfile.mkdtemp()
    
    def execute_pipeline(self, 
                        pipeline_config: Dict[str, Any], 
                        input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute an Elixir pipeline and return results"""
        
        # 1. Write temporary pipeline config
        config_path = self._write_temp_config(pipeline_config)
        
        # 2. Write input data
        input_path = self._write_temp_input(input_data)
        
        # 3. Execute via mix command
        result = subprocess.run([
            'mix', 'pipeline.execute', 
            '--config', config_path,
            '--input', input_path,
            '--format', 'json'
        ], 
        cwd=self.project_path,
        capture_output=True, 
        text=True,
        timeout=300  # 5 minute timeout
        )
        
        if result.returncode != 0:
            raise Exception(f"Pipeline execution failed: {result.stderr}")
        
        # 4. Parse and return results
        return json.loads(result.stdout)
    
    def _write_temp_config(self, config: Dict[str, Any]) -> str:
        """Write pipeline config to temporary YAML file"""
        import yaml
        
        config_path = f"{self.temp_dir}/pipeline_{id(config)}.yaml"
        with open(config_path, 'w') as f:
            yaml.dump(config, f)
        return config_path
    
    def _write_temp_input(self, input_data: Dict[str, Any]) -> str:
        """Write input data to temporary JSON file"""
        input_path = f"{self.temp_dir}/input_{id(input_data)}.json"
        with open(input_path, 'w') as f:
            json.dump(input_data, f)
        return input_path

2. DSPy Module Wrappers

# modules/elixir_modules.py
import dspy
from typing import Dict, Any
from .bridge.elixir_pipeline import ElixirPipelineExecutor

class ElixirPipelineModule(dspy.Module):
    """DSPy module that wraps Elixir pipeline execution"""
    
    def __init__(self, 
                 pipeline_template: Dict[str, Any],
                 elixir_path: str = "/path/to/pipeline_ex"):
        super().__init__()
        self.pipeline_template = pipeline_template
        self.executor = ElixirPipelineExecutor(elixir_path)
        
        # Extract optimizable prompt from template
        self.prompt_step = self._find_prompt_step(pipeline_template)
        
        # Create DSPy predictor for the prompt
        signature = self._build_signature(self.prompt_step)
        self.predictor = dspy.Predict(signature)
    
    def forward(self, **kwargs):
        """Execute the pipeline with DSPy-optimized prompts"""
        
        # 1. Generate optimized prompt using DSPy
        optimized_prompt = self.predictor(**kwargs).prediction
        
        # 2. Update pipeline config with optimized prompt
        updated_config = self._update_pipeline_prompt(
            self.pipeline_template, 
            optimized_prompt
        )
        
        # 3. Execute via Elixir
        result = self.executor.execute_pipeline(updated_config, kwargs)
        
        # 4. Return structured result
        return dspy.Prediction(
            pipeline_result=result,
            optimized_prompt=optimized_prompt
        )
    
    def _find_prompt_step(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """Find the step containing the prompt to optimize"""
        steps = config.get('workflow', {}).get('steps', [])
        
        for step in steps:
            if step.get('type') in ['claude', 'gemini', 'claude_smart']:
                return step
        
        raise ValueError("No optimizable prompt step found in pipeline")
    
    def _build_signature(self, step: Dict[str, Any]) -> str:
        """Build DSPy signature from pipeline step"""
        # Extract input/output structure from step configuration
        # This would be customized based on your pipeline conventions
        return "input_data -> pipeline_output"
    
    def _update_pipeline_prompt(self, 
                               config: Dict[str, Any], 
                               new_prompt: str) -> Dict[str, Any]:
        """Update pipeline config with new prompt"""
        import copy
        updated_config = copy.deepcopy(config)
        
        # Find and update the prompt step
        steps = updated_config['workflow']['steps']
        for step in steps:
            if step.get('type') in ['claude', 'gemini', 'claude_smart']:
                if isinstance(step['prompt'], list):
                    # Update static content
                    for prompt_item in step['prompt']:
                        if prompt_item.get('type') == 'static':
                            prompt_item['content'] = new_prompt
                else:
                    step['prompt'] = new_prompt
                break
        
        return updated_config

class CodeAnalysisModule(ElixirPipelineModule):
    """Specialized module for code analysis optimization"""
    
    def __init__(self, elixir_path: str = "/path/to/pipeline_ex"):
        pipeline_template = {
            'workflow': {
                'name': 'optimizable_code_analysis',
                'steps': [{
                    'name': 'analyze_code',
                    'type': 'claude_smart',
                    'preset': 'analysis',
                    'prompt': [{
                        'type': 'static',
                        'content': 'PLACEHOLDER_FOR_OPTIMIZATION'
                    }]
                }]
            }
        }
        super().__init__(pipeline_template, elixir_path)

class RefactoringModule(ElixirPipelineModule):
    """Specialized module for refactoring optimization"""
    
    def __init__(self, elixir_path: str = "/path/to/pipeline_ex"):
        pipeline_template = {
            'workflow': {
                'name': 'optimizable_refactoring',
                'steps': [
                    {
                        'name': 'analyze_for_refactoring',
                        'type': 'claude_smart',
                        'preset': 'analysis',
                        'prompt': [{
                            'type': 'static',
                            'content': 'ANALYSIS_PLACEHOLDER'
                        }]
                    },
                    {
                        'name': 'generate_refactoring_plan',
                        'type': 'claude_smart', 
                        'preset': 'development',
                        'prompt': [{
                            'type': 'static',
                            'content': 'REFACTORING_PLACEHOLDER'
                        }, {
                            'type': 'previous_response',
                            'name': 'analyze_for_refactoring'
                        }]
                    }
                ]
            }
        }
        super().__init__(pipeline_template, elixir_path)

3. Training Data Management

# training/data_manager.py
import json
import yaml
from pathlib import Path
from typing import List, Dict, Any, Tuple

class TrainingDataManager:
    """Manages training examples for DSPy optimization"""
    
    def __init__(self, data_dir: str):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(exist_ok=True)
    
    def create_example(self, 
                      input_data: Dict[str, Any],
                      expected_output: Dict[str, Any],
                      metadata: Dict[str, Any] = None) -> str:
        """Create a new training example"""
        
        example_id = self._generate_example_id()
        example_data = {
            'id': example_id,
            'input': input_data,
            'expected_output': expected_output,
            'metadata': metadata or {},
            'created_at': datetime.utcnow().isoformat()
        }
        
        example_path = self.data_dir / f"{example_id}.json"
        with open(example_path, 'w') as f:
            json.dump(example_data, f, indent=2)
        
        return example_id
    
    def load_examples(self) -> List[dspy.Example]:
        """Load all training examples as DSPy examples"""
        examples = []
        
        for example_file in self.data_dir.glob("*.json"):
            with open(example_file) as f:
                data = json.load(f)
            
            example = dspy.Example(
                **data['input'],
                expected_output=data['expected_output']
            ).with_inputs(*data['input'].keys())
            
            examples.append(example)
        
        return examples
    
    def create_from_successful_execution(self, 
                                       pipeline_config: Dict[str, Any],
                                       input_data: Dict[str, Any],
                                       execution_result: Dict[str, Any],
                                       quality_score: float):
        """Create training example from successful pipeline execution"""
        
        if quality_score >= 8.0:  # Only save high-quality examples
            self.create_example(
                input_data=input_data,
                expected_output=execution_result,
                metadata={
                    'pipeline_config': pipeline_config,
                    'quality_score': quality_score,
                    'source': 'successful_execution'
                }
            )
    
    def _generate_example_id(self) -> str:
        import uuid
        return str(uuid.uuid4())[:8]

# Example: Create training data from manual examples
def bootstrap_training_data():
    manager = TrainingDataManager("training_data/code_analysis")
    
    # Example 1: OTP analysis
    manager.create_example(
        input_data={
            "code": """
            defmodule MyAgent do
              use Agent
              
              def start_link do
                Agent.start_link(fn -> %{} end)
              end
              
              def get_state(agent) do
                Agent.get(agent, & &1)
              end
            end
            """,
            "analysis_type": "otp_patterns"
        },
        expected_output={
            "findings": [
                "Agent started without supervision",
                "No error handling for Agent.start_link",
                "State is not persistent across crashes"
            ],
            "recommendations": [
                "Add Agent to supervision tree",
                "Implement proper error handling",
                "Consider using GenServer for stateful processes"
            ],
            "severity": "medium"
        }
    )

4. Metrics and Evaluation

# metrics/evaluators.py
import dspy
from typing import Dict, Any, List

class CodeAnalysisEvaluator:
    """Evaluates the quality of code analysis outputs"""
    
    def __init__(self):
        # Use another LLM as a judge
        self.judge = dspy.Predict(
            "analysis_output, expected_output -> quality_score_and_reasoning"
        )
    
    def evaluate(self, example: dspy.Example, prediction: dspy.Prediction) -> bool:
        """Evaluate if the analysis meets quality standards"""
        
        analysis_output = prediction.pipeline_result
        expected_output = example.expected_output
        
        # Get LLM evaluation
        evaluation = self.judge(
            analysis_output=json.dumps(analysis_output),
            expected_output=json.dumps(expected_output)
        )
        
        try:
            result = json.loads(evaluation.quality_score_and_reasoning)
            score = float(result.get('score', 0))
            reasoning = result.get('reasoning', '')
            
            print(f"Analysis Quality Score: {score}/10")
            print(f"Reasoning: {reasoning}")
            
            return score >= 7.0  # Threshold for "good enough"
            
        except (json.JSONDecodeError, ValueError, KeyError):
            print(f"Failed to parse evaluation: {evaluation.quality_score_and_reasoning}")
            return False

class RefactoringEvaluator:
    """Evaluates the quality of refactoring suggestions"""
    
    def __init__(self):
        self.judge = dspy.Predict(
            "original_code, refactoring_plan, analysis_context -> feasibility_and_quality"
        )
    
    def evaluate(self, example: dspy.Example, prediction: dspy.Prediction) -> bool:
        """Evaluate refactoring plan quality"""
        
        refactor_result = prediction.pipeline_result
        
        evaluation = self.judge(
            original_code=example.input_code,
            refactoring_plan=json.dumps(refactor_result),
            analysis_context=json.dumps(example.analysis_context)
        )
        
        try:
            result = json.loads(evaluation.feasibility_and_quality)
            
            feasibility_score = float(result.get('feasibility', 0))
            quality_score = float(result.get('quality', 0))
            
            # Both scores must be high
            return feasibility_score >= 7.0 and quality_score >= 7.0
            
        except (json.JSONDecodeError, ValueError, KeyError):
            return False

def create_composite_metric(evaluators: List[Any]) -> callable:
    """Create a metric that combines multiple evaluators"""
    
    def composite_metric(example: dspy.Example, prediction: dspy.Prediction) -> bool:
        scores = []
        
        for evaluator in evaluators:
            try:
                score = evaluator.evaluate(example, prediction)
                scores.append(score)
            except Exception as e:
                print(f"Evaluator failed: {e}")
                scores.append(False)
        
        # All evaluators must pass
        return all(scores)
    
    return composite_metric

5. Optimization Scripts

# optimize_pipelines.py
import dspy
from modules.elixir_modules import CodeAnalysisModule, RefactoringModule
from training.data_manager import TrainingDataManager
from metrics.evaluators import CodeAnalysisEvaluator, RefactoringEvaluator

def optimize_code_analysis_pipeline():
    """Optimize the code analysis pipeline prompts"""
    
    print("🔄 Starting Code Analysis Pipeline Optimization...")
    
    # 1. Load training data
    data_manager = TrainingDataManager("training_data/code_analysis")
    training_examples = data_manager.load_examples()
    
    print(f"📊 Loaded {len(training_examples)} training examples")
    
    # 2. Configure LLM (using Claude for main execution)
    claude = dspy.Anthropic(model="claude-3-sonnet-20240229")
    dspy.settings.configure(lm=claude)
    
    # 3. Create module and evaluator
    analysis_module = CodeAnalysisModule()
    evaluator = CodeAnalysisEvaluator()
    
    # 4. Set up optimizer
    optimizer = dspy.teleprompt.BootstrapFewShot(
        metric=evaluator.evaluate,
        max_bootstrapped_demos=3,
        max_labeled_demos=5
    )
    
    # 5. Run optimization
    print("🎯 Running optimization...")
    optimized_module = optimizer.compile(
        student=analysis_module,
        trainset=training_examples[:10],  # Use subset for faster iteration
        valset=training_examples[10:15] if len(training_examples) > 10 else None
    )
    
    # 6. Save optimized prompts
    save_optimized_module(optimized_module, "optimized_code_analysis")
    
    print("✅ Code Analysis Pipeline Optimization Complete!")
    return optimized_module

def optimize_refactoring_pipeline():
    """Optimize the refactoring pipeline prompts"""
    
    print("🔄 Starting Refactoring Pipeline Optimization...")
    
    # Similar process for refactoring
    data_manager = TrainingDataManager("training_data/refactoring")
    training_examples = data_manager.load_examples()
    
    refactoring_module = RefactoringModule()
    evaluator = RefactoringEvaluator()
    
    optimizer = dspy.teleprompt.BootstrapFewShot(
        metric=evaluator.evaluate,
        max_bootstrapped_demos=2,
        max_labeled_demos=3
    )
    
    optimized_module = optimizer.compile(
        student=refactoring_module,
        trainset=training_examples[:8],
        valset=training_examples[8:12] if len(training_examples) > 8 else None
    )
    
    save_optimized_module(optimized_module, "optimized_refactoring")
    
    print("✅ Refactoring Pipeline Optimization Complete!")
    return optimized_module

def save_optimized_module(module, name: str):
    """Save optimized module prompts back to Elixir configs"""
    
    # Extract the optimized prompts
    optimized_prompts = extract_prompts_from_module(module)
    
    # Update Elixir YAML files with optimized prompts
    update_elixir_configs(name, optimized_prompts)
    
    # Save DSPy module for future use
    module.save(f"optimized_modules/{name}.dspy")

def extract_prompts_from_module(module) -> Dict[str, str]:
    """Extract optimized prompts from DSPy module"""
    
    prompts = {}
    
    # Access the predictor's optimized prompt
    if hasattr(module, 'predictor'):
        prompts['main_prompt'] = module.predictor.signature
        
        # Get few-shot examples if available
        if hasattr(module.predictor, 'demos'):
            prompts['examples'] = module.predictor.demos
    
    return prompts

def update_elixir_configs(module_name: str, optimized_prompts: Dict[str, str]):
    """Update Elixir YAML configurations with optimized prompts"""
    
    import yaml
    
    config_path = f"../pipelines/optimized/{module_name}.yaml"
    
    # Load existing config or create new one
    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
    except FileNotFoundError:
        config = {'workflow': {'name': module_name, 'steps': []}}
    
    # Update prompts in config
    for step in config['workflow']['steps']:
        if step.get('type') in ['claude', 'gemini', 'claude_smart']:
            step['prompt'] = [{
                'type': 'static',
                'content': optimized_prompts.get('main_prompt', step['prompt'])
            }]
    
    # Save updated config
    with open(config_path, 'w') as f:
        yaml.dump(config, f, default_flow_style=False)
    
    print(f"💾 Saved optimized config to {config_path}")

if __name__ == "__main__":
    # Run optimization for all modules
    optimize_code_analysis_pipeline()
    optimize_refactoring_pipeline()
    
    print("🚀 All pipeline optimizations complete!")

6. Continuous Learning System

# continuous_learning.py
import schedule
import time
from datetime import datetime, timedelta

class ContinuousLearningSystem:
    """Continuously improves pipelines based on usage data"""
    
    def __init__(self, elixir_project_path: str):
        self.elixir_path = elixir_project_path
        self.last_optimization = datetime.now()
        
    def setup_learning_schedule(self):
        """Set up scheduled optimization runs"""
        
        # Daily light optimization
        schedule.every().day.at("02:00").do(self.daily_optimization)
        
        # Weekly full re-optimization
        schedule.every().week.do(self.weekly_full_optimization)
        
        # Real-time learning from feedback
        schedule.every(10).minutes.do(self.process_recent_feedback)
    
    def daily_optimization(self):
        """Light optimization based on recent usage"""
        print(f"🌅 Running daily optimization at {datetime.now()}")
        
        # Collect usage metrics from last 24 hours
        usage_data = self.collect_recent_usage_data(hours=24)
        
        if self.should_optimize(usage_data):
            # Run quick optimization on pipelines with poor performance
            self.optimize_underperforming_pipelines(usage_data)
    
    def weekly_full_optimization(self):
        """Full re-optimization of all pipelines"""
        print(f"📅 Running weekly full optimization at {datetime.now()}")
        
        # Collect week's worth of data
        usage_data = self.collect_recent_usage_data(hours=168)  # 7 days
        
        # Re-optimize all modules
        optimize_code_analysis_pipeline()
        optimize_refactoring_pipeline()
        
        # Update performance baselines
        self.update_performance_baselines(usage_data)
    
    def collect_recent_usage_data(self, hours: int) -> Dict[str, Any]:
        """Collect pipeline usage and performance data"""
        
        # Query Elixir system for metrics
        # This would interface with your telemetry system
        
        import subprocess
        result = subprocess.run([
            'mix', 'telemetry.export',
            '--format', 'json',
            '--since', f"{hours}h"
        ], 
        cwd=self.elixir_path,
        capture_output=True,
        text=True
        )
        
        if result.returncode == 0:
            return json.loads(result.stdout)
        else:
            return {}
    
    def should_optimize(self, usage_data: Dict[str, Any]) -> bool:
        """Determine if optimization is needed"""
        
        # Check performance degradation
        current_success_rate = usage_data.get('success_rate', 1.0)
        baseline_success_rate = 0.95
        
        if current_success_rate < baseline_success_rate:
            return True
        
        # Check if enough new data is available
        new_examples_count = usage_data.get('new_examples', 0)
        if new_examples_count > 10:
            return True
        
        return False
    
    def run_forever(self):
        """Run the continuous learning system"""
        print("🔄 Starting continuous learning system...")
        
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute

if __name__ == "__main__":
    learning_system = ContinuousLearningSystem("/path/to/pipeline_ex")
    learning_system.setup_learning_schedule()
    learning_system.run_forever()

Integration with Elixir

1. Enhanced Mix Tasks

# lib/mix/tasks/dspy.ex
defmodule Mix.Tasks.Dspy.Optimize do
  use Mix.Task
  
  @shortdoc "Optimize pipeline prompts using DSPy"
  
  def run(args) do
    {opts, args, _} = OptionParser.parse(args,
      switches: [
        pipeline: :string,
        module: :string,
        training_data: :string
      ]
    )
    
    python_script = Path.join([__DIR__, "../../../scripts/optimize_pipeline.py"])
    
    cmd_args = [
      "python3", python_script,
      "--elixir-path", File.cwd!(),
      "--pipeline", Keyword.get(opts, :pipeline, "all"),
      "--training-data", Keyword.get(opts, :training_data, "training_data/")
    ]
    
    case System.cmd("python3", tl(cmd_args)) do
      {output, 0} ->
        IO.puts("✅ Optimization completed successfully")
        IO.puts(output)
        
      {error, exit_code} ->
        IO.puts("❌ Optimization failed with exit code #{exit_code}")
        IO.puts(error)
        System.halt(1)
    end
  end
end

2. Telemetry Integration

defmodule Pipeline.DSPyTelemetry do
  @moduledoc """
  Collects telemetry data for DSPy optimization
  """
  
  def setup() do
    :telemetry.attach_many(
      "dspy-data-collection",
      [
        [:pipeline, :execution, :stop],
        [:pipeline, :step, :stop]
      ],
      &handle_telemetry_event/4,
      %{}
    )
  end
  
  def handle_telemetry_event([:pipeline, :execution, :stop], measurements, metadata, _config) do
    # Record execution results for DSPy training
    execution_data = %{
      pipeline_id: metadata.pipeline_id,
      success: measurements.success,
      duration_ms: measurements.duration,
      quality_score: calculate_quality_score(metadata.result),
      input_data: metadata.input,
      output_data: metadata.result,
      timestamp: DateTime.utc_now()
    }
    
    # Store for DSPy training data collection
    Pipeline.DSPyDataStore.record_execution(execution_data)
  end
  
  defp calculate_quality_score(result) do
    # Implement heuristics to score result quality
    # This can be enhanced with user feedback
    case result do
      %{status: :success, findings: findings} when length(findings) > 0 -> 8.5
      %{status: :success} -> 7.0
      %{status: :partial} -> 5.0
      _ -> 2.0
    end
  end
end

The DSPy integration transforms ElexirionDSP from a static system into a continuously learning platform. By treating prompts as learnable parameters rather than fixed instructions, we enable the system to improve its performance automatically based on real-world usage patterns and feedback.