← Back to Analysis

07 evaluation optimization system

Documentation for 07_evaluation_optimization_system from the Pipeline ex repository.

Evaluation and Optimization System for DSPy Integration

Overview

This document details the comprehensive evaluation and optimization system needed to transform pipeline_ex from a “generate YAML and pray” system into a systematic, evidence-based AI pipeline platform using DSPy’s optimization capabilities.

Core Evaluation Architecture

1. Metrics Collection System

Performance Metrics Module

defmodule Pipeline.DSPy.Metrics do
  @moduledoc """
  Comprehensive metrics collection for pipeline optimization.
  """
  
  defstruct [
    :execution_id,
    :pipeline_name,
    :step_name,
    :signature_name,
    :input_data,
    :output_data,
    :execution_time_ms,
    :token_usage,
    :cost_usd,
    :success,
    :error_type,
    :quality_score,
    :timestamp
  ]
  
  def record_execution(pipeline_name, step_name, signature_name, input, output, metadata) do
    metric = %__MODULE__{
      execution_id: generate_execution_id(),
      pipeline_name: pipeline_name,
      step_name: step_name,
      signature_name: signature_name,
      input_data: sanitize_input(input),
      output_data: sanitize_output(output),
      execution_time_ms: metadata[:execution_time_ms],
      token_usage: metadata[:token_usage],
      cost_usd: metadata[:cost_usd],
      success: metadata[:success],
      error_type: metadata[:error_type],
      quality_score: calculate_quality_score(output, metadata),
      timestamp: DateTime.utc_now()
    }
    
    # Store in database
    Pipeline.DSPy.Storage.store_metric(metric)
    
    # Update running statistics
    Pipeline.DSPy.Statistics.update_stats(metric)
    
    metric
  end
  
  def get_execution_history(pipeline_name, limit \\ 100) do
    Pipeline.DSPy.Storage.get_metrics(
      pipeline_name: pipeline_name,
      limit: limit,
      order_by: :timestamp
    )
  end
  
  def calculate_pipeline_performance(pipeline_name, time_window \\ {7, :days}) do
    metrics = get_recent_metrics(pipeline_name, time_window)
    
    %{
      total_executions: length(metrics),
      success_rate: calculate_success_rate(metrics),
      avg_execution_time: calculate_avg_execution_time(metrics),
      avg_cost: calculate_avg_cost(metrics),
      avg_quality_score: calculate_avg_quality_score(metrics),
      error_breakdown: analyze_error_patterns(metrics),
      performance_trend: calculate_performance_trend(metrics)
    }
  end
end

Quality Assessment Framework

defmodule Pipeline.DSPy.QualityAssessment do
  @moduledoc """
  Automated quality assessment for pipeline outputs.
  """
  
  def assess_output_quality(output, expected_output, assessment_config) do
    assessments = []
    
    # Structural assessment
    assessments = [
      assess_structure(output, expected_output, assessment_config.structure) | assessments
    ]
    
    # Content assessment
    assessments = [
      assess_content(output, expected_output, assessment_config.content) | assessments
    ]
    
    # Custom assessment
    assessments = [
      assess_custom_criteria(output, expected_output, assessment_config.custom) | assessments
    ]
    
    compile_quality_score(assessments)
  end
  
  def assess_structure(output, expected, config) do
    case config.type do
      "json_schema" ->
        assess_json_schema_compliance(output, expected, config.schema)
        
      "pattern_match" ->
        assess_pattern_matching(output, expected, config.patterns)
        
      "field_presence" ->
        assess_field_presence(output, expected, config.required_fields)
    end
  end
  
  def assess_content(output, expected, config) do
    case config.type do
      "semantic_similarity" ->
        assess_semantic_similarity(output, expected, config.threshold)
        
      "keyword_presence" ->
        assess_keyword_presence(output, expected, config.keywords)
        
      "length_appropriateness" ->
        assess_length_appropriateness(output, expected, config.bounds)
    end
  end
  
  def assess_custom_criteria(output, expected, config) do
    # Custom assessment logic based on pipeline-specific criteria
    Enum.map(config.criteria, fn criterion ->
      apply_criterion(output, expected, criterion)
    end)
  end
end

2. Training Data Management

Training Data Collection

defmodule Pipeline.DSPy.TrainingDataManager do
  @moduledoc """
  Manages training data collection and curation for DSPy optimization.
  """
  
  def collect_training_data(pipeline_name, collection_config) do
    # Collect from various sources
    sources = [
      collect_from_execution_history(pipeline_name, collection_config.history),
      collect_from_manual_examples(pipeline_name, collection_config.manual),
      collect_from_user_feedback(pipeline_name, collection_config.feedback)
    ]
    
    # Combine and deduplicate
    combined_data = combine_sources(sources)
    
    # Validate and clean
    validated_data = validate_training_examples(combined_data)
    
    # Split into training/validation sets
    split_training_data(validated_data, collection_config.split_ratio)
  end
  
  def collect_from_execution_history(pipeline_name, config) do
    # Get successful executions
    successful_executions = Pipeline.DSPy.Metrics.get_execution_history(
      pipeline_name,
      success: true,
      limit: config.limit
    )
    
    # Convert to training examples
    Enum.map(successful_executions, fn execution ->
      %{
        input: execution.input_data,
        output: execution.output_data,
        metadata: %{
          source: "execution_history",
          execution_id: execution.execution_id,
          quality_score: execution.quality_score,
          timestamp: execution.timestamp
        }
      }
    end)
  end
  
  def collect_from_user_feedback(pipeline_name, config) do
    # Get user corrections and improvements
    Pipeline.DSPy.Feedback.get_user_feedback(
      pipeline_name: pipeline_name,
      feedback_type: ["correction", "improvement"],
      limit: config.limit
    )
    |> Enum.map(fn feedback ->
      %{
        input: feedback.original_input,
        output: feedback.corrected_output,
        metadata: %{
          source: "user_feedback",
          feedback_id: feedback.id,
          user_id: feedback.user_id,
          improvement_type: feedback.improvement_type
        }
      }
    end)
  end
  
  def validate_training_examples(training_data) do
    Enum.filter(training_data, fn example ->
      validate_example(example)
    end)
    |> Enum.map(fn example ->
      %{example | quality_score: calculate_example_quality(example)}
    end)
    |> Enum.sort_by(& &1.quality_score, :desc)
  end
end

Synthetic Data Generation

defmodule Pipeline.DSPy.SyntheticDataGenerator do
  @moduledoc """
  Generates synthetic training data for DSPy optimization.
  """
  
  def generate_synthetic_examples(signature, count, generation_config) do
    # Use LLM to generate diverse examples
    base_prompt = build_generation_prompt(signature, generation_config)
    
    # Generate examples in batches
    batches = div(count, generation_config.batch_size) + 1
    
    Enum.flat_map(1..batches, fn batch_num ->
      generate_batch(base_prompt, generation_config.batch_size, batch_num)
    end)
    |> Enum.take(count)
    |> validate_synthetic_examples(signature)
  end
  
  def build_generation_prompt(signature, config) do
    """
    Generate diverse training examples for the following task:
    
    Task: #{signature.description}
    
    Input format: #{format_input_spec(signature.input_fields)}
    Output format: #{format_output_spec(signature.output_fields)}
    
    Requirements:
    - Generate #{config.diversity_level} diverse examples
    - Cover different complexity levels
    - Include edge cases
    - Ensure high quality outputs
    
    Generate examples in the following JSON format:
    {
      "examples": [
        {
          "input": {...},
          "output": {...},
          "explanation": "..."
        }
      ]
    }
    """
  end
  
  def validate_synthetic_examples(examples, signature) do
    Enum.filter(examples, fn example ->
      validate_input_format(example.input, signature.input_fields) and
      validate_output_format(example.output, signature.output_fields) and
      assess_example_quality(example) > 0.7
    end)
  end
end

3. Optimization Engine

DSPy Optimization Controller

defmodule Pipeline.DSPy.OptimizationController do
  @moduledoc """
  Controls DSPy optimization cycles and manages optimization state.
  """
  
  def run_optimization_cycle(pipeline_name, optimization_config) do
    # Phase 1: Data Collection
    {training_data, validation_data} = collect_optimization_data(
      pipeline_name,
      optimization_config.data_collection
    )
    
    # Phase 2: Baseline Evaluation
    baseline_performance = evaluate_current_pipeline(
      pipeline_name,
      validation_data
    )
    
    # Phase 3: Optimization
    optimization_results = run_dspy_optimization(
      pipeline_name,
      training_data,
      optimization_config.optimization
    )
    
    # Phase 4: Validation
    optimized_performance = evaluate_optimized_pipeline(
      optimization_results.optimized_pipeline,
      validation_data
    )
    
    # Phase 5: Decision
    optimization_decision = decide_optimization_adoption(
      baseline_performance,
      optimized_performance,
      optimization_config.adoption_criteria
    )
    
    # Phase 6: Deployment
    case optimization_decision.adopt do
      true ->
        deploy_optimization(pipeline_name, optimization_results)
        
      false ->
        log_optimization_rejection(pipeline_name, optimization_decision)
    end
    
    # Return comprehensive results
    %{
      pipeline_name: pipeline_name,
      baseline_performance: baseline_performance,
      optimized_performance: optimized_performance,
      optimization_decision: optimization_decision,
      training_data_size: length(training_data),
      validation_data_size: length(validation_data),
      optimization_timestamp: DateTime.utc_now()
    }
  end
  
  def run_dspy_optimization(pipeline_name, training_data, config) do
    # Convert pipeline to DSPy format
    dspy_program = Pipeline.DSPy.Converter.pipeline_to_dspy(pipeline_name)
    
    # Create optimizer based on config
    optimizer = create_optimizer(config)
    
    # Run optimization
    optimized_program = optimizer.optimize(dspy_program, training_data)
    
    # Convert back to pipeline format
    optimized_pipeline = Pipeline.DSPy.Converter.dspy_to_pipeline(optimized_program)
    
    %{
      original_pipeline: pipeline_name,
      optimized_pipeline: optimized_pipeline,
      optimizer_used: config.optimizer_type,
      training_data_size: length(training_data),
      optimization_metrics: extract_optimization_metrics(optimized_program)
    }
  end
  
  def create_optimizer(config) do
    case config.optimizer_type do
      "bootstrap_few_shot" ->
        Pipeline.DSPy.Optimizers.BootstrapFewShot.new(config.bootstrap_config)
        
      "copro" ->
        Pipeline.DSPy.Optimizers.CoPro.new(config.copro_config)
        
      "mipro" ->
        Pipeline.DSPy.Optimizers.MIPro.new(config.mipro_config)
        
      "ensemble" ->
        Pipeline.DSPy.Optimizers.Ensemble.new(config.ensemble_config)
    end
  end
end

Optimization Strategies

defmodule Pipeline.DSPy.OptimizationStrategies do
  @moduledoc """
  Different optimization strategies for various use cases.
  """
  
  def accuracy_focused_optimization(pipeline_name, config) do
    optimization_config = %{
      data_collection: %{
        history: %{limit: 1000},
        manual: %{limit: 50},
        feedback: %{limit: 100}
      },
      optimization: %{
        optimizer_type: "bootstrap_few_shot",
        bootstrap_config: %{
          max_bootstrapped_demos: 8,
          max_labeled_demos: 16,
          max_rounds: 3,
          teacher_settings: %{temperature: 0.1}
        }
      },
      adoption_criteria: %{
        min_accuracy_improvement: 0.05,
        max_cost_increase: 0.2,
        min_confidence_level: 0.95
      }
    }
    
    Pipeline.DSPy.OptimizationController.run_optimization_cycle(
      pipeline_name,
      optimization_config
    )
  end
  
  def cost_focused_optimization(pipeline_name, config) do
    optimization_config = %{
      data_collection: %{
        history: %{limit: 500},
        manual: %{limit: 20},
        feedback: %{limit: 50}
      },
      optimization: %{
        optimizer_type: "mipro",
        mipro_config: %{
          metric: "cost_effectiveness",
          num_candidates: 10,
          init_temperature: 1.0,
          max_bootstrapped_demos: 5
        }
      },
      adoption_criteria: %{
        min_cost_reduction: 0.15,
        max_accuracy_loss: 0.02,
        min_confidence_level: 0.90
      }
    }
    
    Pipeline.DSPy.OptimizationController.run_optimization_cycle(
      pipeline_name,
      optimization_config
    )
  end
  
  def speed_focused_optimization(pipeline_name, config) do
    optimization_config = %{
      data_collection: %{
        history: %{limit: 300},
        manual: %{limit: 10},
        feedback: %{limit: 20}
      },
      optimization: %{
        optimizer_type: "copro",
        copro_config: %{
          depth: 3,
          breadth: 2,
          max_num_trials: 5,
          optimize_for_speed: true
        }
      },
      adoption_criteria: %{
        min_speed_improvement: 0.25,
        max_accuracy_loss: 0.05,
        min_confidence_level: 0.85
      }
    }
    
    Pipeline.DSPy.OptimizationController.run_optimization_cycle(
      pipeline_name,
      optimization_config
    )
  end
end

4. Evaluation Metrics and Scoring

Comprehensive Evaluation Framework

defmodule Pipeline.DSPy.EvaluationFramework do
  @moduledoc """
  Comprehensive evaluation framework for pipeline performance.
  """
  
  def evaluate_pipeline(pipeline_config, test_cases, evaluation_config) do
    # Run pipeline on test cases
    results = execute_pipeline_on_test_cases(pipeline_config, test_cases)
    
    # Calculate multiple metrics
    metrics = %{
      accuracy: calculate_accuracy(results, evaluation_config.accuracy),
      precision: calculate_precision(results, evaluation_config.precision),
      recall: calculate_recall(results, evaluation_config.recall),
      f1_score: calculate_f1_score(results, evaluation_config.f1),
      latency: calculate_latency(results),
      cost: calculate_cost(results),
      quality: calculate_quality(results, evaluation_config.quality)
    }
    
    # Calculate composite score
    composite_score = calculate_composite_score(metrics, evaluation_config.weights)
    
    # Generate detailed report
    %{
      pipeline_name: pipeline_config["workflow"]["name"],
      test_cases_count: length(test_cases),
      metrics: metrics,
      composite_score: composite_score,
      individual_results: results,
      evaluation_timestamp: DateTime.utc_now(),
      evaluation_config: evaluation_config
    }
  end
  
  def calculate_accuracy(results, config) do
    correct_predictions = Enum.count(results, fn result ->
      evaluate_correctness(result.predicted, result.expected, config)
    end)
    
    correct_predictions / length(results)
  end
  
  def calculate_quality(results, config) do
    quality_scores = Enum.map(results, fn result ->
      Pipeline.DSPy.QualityAssessment.assess_output_quality(
        result.predicted,
        result.expected,
        config
      )
    end)
    
    Enum.sum(quality_scores) / length(quality_scores)
  end
  
  def calculate_composite_score(metrics, weights) do
    normalized_metrics = normalize_metrics(metrics)
    
    Enum.reduce(normalized_metrics, 0.0, fn {metric, value}, acc ->
      weight = Map.get(weights, metric, 1.0)
      acc + (value * weight)
    end) / Enum.sum(Map.values(weights))
  end
end

A/B Testing Framework

defmodule Pipeline.DSPy.ABTesting do
  @moduledoc """
  A/B testing framework for comparing pipeline versions.
  """
  
  def run_ab_test(pipeline_a, pipeline_b, test_config) do
    # Split test cases
    {test_a, test_b} = split_test_cases(test_config.test_cases, test_config.split_ratio)
    
    # Run both pipelines
    results_a = evaluate_pipeline(pipeline_a, test_a, test_config.evaluation)
    results_b = evaluate_pipeline(pipeline_b, test_b, test_config.evaluation)
    
    # Statistical analysis
    statistical_significance = calculate_statistical_significance(
      results_a.metrics,
      results_b.metrics,
      test_config.significance_level
    )
    
    # Generate recommendation
    recommendation = generate_ab_recommendation(
      results_a,
      results_b,
      statistical_significance,
      test_config.decision_criteria
    )
    
    %{
      pipeline_a: results_a,
      pipeline_b: results_b,
      statistical_significance: statistical_significance,
      recommendation: recommendation,
      test_timestamp: DateTime.utc_now()
    }
  end
  
  def calculate_statistical_significance(metrics_a, metrics_b, significance_level) do
    # Perform statistical tests (t-test, Mann-Whitney U, etc.)
    Enum.map(metrics_a, fn {metric_name, values_a} ->
      values_b = Map.get(metrics_b, metric_name)
      
      {p_value, test_statistic} = perform_statistical_test(values_a, values_b)
      
      %{
        metric: metric_name,
        p_value: p_value,
        test_statistic: test_statistic,
        significant: p_value < significance_level,
        effect_size: calculate_effect_size(values_a, values_b)
      }
    end)
  end
end

5. Continuous Improvement System

Continuous Learning Pipeline

defmodule Pipeline.DSPy.ContinuousImprovement do
  @moduledoc """
  Continuous learning and improvement system.
  """
  
  def start_continuous_improvement(pipeline_name, config) do
    # Schedule regular optimization cycles
    schedule_optimization_cycles(pipeline_name, config.schedule)
    
    # Monitor performance drift
    monitor_performance_drift(pipeline_name, config.drift_detection)
    
    # Collect new training data
    collect_new_training_data(pipeline_name, config.data_collection)
    
    # Update optimization strategies
    update_optimization_strategies(pipeline_name, config.strategy_updates)
  end
  
  def schedule_optimization_cycles(pipeline_name, schedule_config) do
    # Schedule daily, weekly, or monthly optimization cycles
    case schedule_config.frequency do
      "daily" ->
        schedule_daily_optimization(pipeline_name, schedule_config)
        
      "weekly" ->
        schedule_weekly_optimization(pipeline_name, schedule_config)
        
      "monthly" ->
        schedule_monthly_optimization(pipeline_name, schedule_config)
        
      "adaptive" ->
        schedule_adaptive_optimization(pipeline_name, schedule_config)
    end
  end
  
  def monitor_performance_drift(pipeline_name, drift_config) do
    # Track performance metrics over time
    # Detect significant performance degradation
    # Trigger automatic optimization when drift is detected
    
    current_performance = Pipeline.DSPy.Metrics.calculate_pipeline_performance(
      pipeline_name,
      drift_config.time_window
    )
    
    baseline_performance = Pipeline.DSPy.Storage.get_baseline_performance(pipeline_name)
    
    drift_detected = detect_performance_drift(current_performance, baseline_performance, drift_config)
    
    if drift_detected do
      trigger_emergency_optimization(pipeline_name, drift_config)
    end
  end
end

This comprehensive evaluation and optimization system transforms pipeline_ex from a basic execution engine into a sophisticated, self-improving AI system that learns from usage patterns and continuously optimizes performance.