Evaluation and Optimization System for DSPy Integration
Overview
This document details the comprehensive evaluation and optimization system needed to transform pipeline_ex from a “generate YAML and pray” system into a systematic, evidence-based AI pipeline platform using DSPy’s optimization capabilities.
Core Evaluation Architecture
1. Metrics Collection System
Performance Metrics Module
defmodule Pipeline.DSPy.Metrics do
@moduledoc """
Comprehensive metrics collection for pipeline optimization.
"""
defstruct [
:execution_id,
:pipeline_name,
:step_name,
:signature_name,
:input_data,
:output_data,
:execution_time_ms,
:token_usage,
:cost_usd,
:success,
:error_type,
:quality_score,
:timestamp
]
def record_execution(pipeline_name, step_name, signature_name, input, output, metadata) do
metric = %__MODULE__{
execution_id: generate_execution_id(),
pipeline_name: pipeline_name,
step_name: step_name,
signature_name: signature_name,
input_data: sanitize_input(input),
output_data: sanitize_output(output),
execution_time_ms: metadata[:execution_time_ms],
token_usage: metadata[:token_usage],
cost_usd: metadata[:cost_usd],
success: metadata[:success],
error_type: metadata[:error_type],
quality_score: calculate_quality_score(output, metadata),
timestamp: DateTime.utc_now()
}
# Store in database
Pipeline.DSPy.Storage.store_metric(metric)
# Update running statistics
Pipeline.DSPy.Statistics.update_stats(metric)
metric
end
def get_execution_history(pipeline_name, limit \\ 100) do
Pipeline.DSPy.Storage.get_metrics(
pipeline_name: pipeline_name,
limit: limit,
order_by: :timestamp
)
end
def calculate_pipeline_performance(pipeline_name, time_window \\ {7, :days}) do
metrics = get_recent_metrics(pipeline_name, time_window)
%{
total_executions: length(metrics),
success_rate: calculate_success_rate(metrics),
avg_execution_time: calculate_avg_execution_time(metrics),
avg_cost: calculate_avg_cost(metrics),
avg_quality_score: calculate_avg_quality_score(metrics),
error_breakdown: analyze_error_patterns(metrics),
performance_trend: calculate_performance_trend(metrics)
}
end
end
Quality Assessment Framework
defmodule Pipeline.DSPy.QualityAssessment do
@moduledoc """
Automated quality assessment for pipeline outputs.
"""
def assess_output_quality(output, expected_output, assessment_config) do
assessments = []
# Structural assessment
assessments = [
assess_structure(output, expected_output, assessment_config.structure) | assessments
]
# Content assessment
assessments = [
assess_content(output, expected_output, assessment_config.content) | assessments
]
# Custom assessment
assessments = [
assess_custom_criteria(output, expected_output, assessment_config.custom) | assessments
]
compile_quality_score(assessments)
end
def assess_structure(output, expected, config) do
case config.type do
"json_schema" ->
assess_json_schema_compliance(output, expected, config.schema)
"pattern_match" ->
assess_pattern_matching(output, expected, config.patterns)
"field_presence" ->
assess_field_presence(output, expected, config.required_fields)
end
end
def assess_content(output, expected, config) do
case config.type do
"semantic_similarity" ->
assess_semantic_similarity(output, expected, config.threshold)
"keyword_presence" ->
assess_keyword_presence(output, expected, config.keywords)
"length_appropriateness" ->
assess_length_appropriateness(output, expected, config.bounds)
end
end
def assess_custom_criteria(output, expected, config) do
# Custom assessment logic based on pipeline-specific criteria
Enum.map(config.criteria, fn criterion ->
apply_criterion(output, expected, criterion)
end)
end
end
2. Training Data Management
Training Data Collection
defmodule Pipeline.DSPy.TrainingDataManager do
@moduledoc """
Manages training data collection and curation for DSPy optimization.
"""
def collect_training_data(pipeline_name, collection_config) do
# Collect from various sources
sources = [
collect_from_execution_history(pipeline_name, collection_config.history),
collect_from_manual_examples(pipeline_name, collection_config.manual),
collect_from_user_feedback(pipeline_name, collection_config.feedback)
]
# Combine and deduplicate
combined_data = combine_sources(sources)
# Validate and clean
validated_data = validate_training_examples(combined_data)
# Split into training/validation sets
split_training_data(validated_data, collection_config.split_ratio)
end
def collect_from_execution_history(pipeline_name, config) do
# Get successful executions
successful_executions = Pipeline.DSPy.Metrics.get_execution_history(
pipeline_name,
success: true,
limit: config.limit
)
# Convert to training examples
Enum.map(successful_executions, fn execution ->
%{
input: execution.input_data,
output: execution.output_data,
metadata: %{
source: "execution_history",
execution_id: execution.execution_id,
quality_score: execution.quality_score,
timestamp: execution.timestamp
}
}
end)
end
def collect_from_user_feedback(pipeline_name, config) do
# Get user corrections and improvements
Pipeline.DSPy.Feedback.get_user_feedback(
pipeline_name: pipeline_name,
feedback_type: ["correction", "improvement"],
limit: config.limit
)
|> Enum.map(fn feedback ->
%{
input: feedback.original_input,
output: feedback.corrected_output,
metadata: %{
source: "user_feedback",
feedback_id: feedback.id,
user_id: feedback.user_id,
improvement_type: feedback.improvement_type
}
}
end)
end
def validate_training_examples(training_data) do
Enum.filter(training_data, fn example ->
validate_example(example)
end)
|> Enum.map(fn example ->
%{example | quality_score: calculate_example_quality(example)}
end)
|> Enum.sort_by(& &1.quality_score, :desc)
end
end
Synthetic Data Generation
defmodule Pipeline.DSPy.SyntheticDataGenerator do
@moduledoc """
Generates synthetic training data for DSPy optimization.
"""
def generate_synthetic_examples(signature, count, generation_config) do
# Use LLM to generate diverse examples
base_prompt = build_generation_prompt(signature, generation_config)
# Generate examples in batches
batches = div(count, generation_config.batch_size) + 1
Enum.flat_map(1..batches, fn batch_num ->
generate_batch(base_prompt, generation_config.batch_size, batch_num)
end)
|> Enum.take(count)
|> validate_synthetic_examples(signature)
end
def build_generation_prompt(signature, config) do
"""
Generate diverse training examples for the following task:
Task: #{signature.description}
Input format: #{format_input_spec(signature.input_fields)}
Output format: #{format_output_spec(signature.output_fields)}
Requirements:
- Generate #{config.diversity_level} diverse examples
- Cover different complexity levels
- Include edge cases
- Ensure high quality outputs
Generate examples in the following JSON format:
{
"examples": [
{
"input": {...},
"output": {...},
"explanation": "..."
}
]
}
"""
end
def validate_synthetic_examples(examples, signature) do
Enum.filter(examples, fn example ->
validate_input_format(example.input, signature.input_fields) and
validate_output_format(example.output, signature.output_fields) and
assess_example_quality(example) > 0.7
end)
end
end
3. Optimization Engine
DSPy Optimization Controller
defmodule Pipeline.DSPy.OptimizationController do
@moduledoc """
Controls DSPy optimization cycles and manages optimization state.
"""
def run_optimization_cycle(pipeline_name, optimization_config) do
# Phase 1: Data Collection
{training_data, validation_data} = collect_optimization_data(
pipeline_name,
optimization_config.data_collection
)
# Phase 2: Baseline Evaluation
baseline_performance = evaluate_current_pipeline(
pipeline_name,
validation_data
)
# Phase 3: Optimization
optimization_results = run_dspy_optimization(
pipeline_name,
training_data,
optimization_config.optimization
)
# Phase 4: Validation
optimized_performance = evaluate_optimized_pipeline(
optimization_results.optimized_pipeline,
validation_data
)
# Phase 5: Decision
optimization_decision = decide_optimization_adoption(
baseline_performance,
optimized_performance,
optimization_config.adoption_criteria
)
# Phase 6: Deployment
case optimization_decision.adopt do
true ->
deploy_optimization(pipeline_name, optimization_results)
false ->
log_optimization_rejection(pipeline_name, optimization_decision)
end
# Return comprehensive results
%{
pipeline_name: pipeline_name,
baseline_performance: baseline_performance,
optimized_performance: optimized_performance,
optimization_decision: optimization_decision,
training_data_size: length(training_data),
validation_data_size: length(validation_data),
optimization_timestamp: DateTime.utc_now()
}
end
def run_dspy_optimization(pipeline_name, training_data, config) do
# Convert pipeline to DSPy format
dspy_program = Pipeline.DSPy.Converter.pipeline_to_dspy(pipeline_name)
# Create optimizer based on config
optimizer = create_optimizer(config)
# Run optimization
optimized_program = optimizer.optimize(dspy_program, training_data)
# Convert back to pipeline format
optimized_pipeline = Pipeline.DSPy.Converter.dspy_to_pipeline(optimized_program)
%{
original_pipeline: pipeline_name,
optimized_pipeline: optimized_pipeline,
optimizer_used: config.optimizer_type,
training_data_size: length(training_data),
optimization_metrics: extract_optimization_metrics(optimized_program)
}
end
def create_optimizer(config) do
case config.optimizer_type do
"bootstrap_few_shot" ->
Pipeline.DSPy.Optimizers.BootstrapFewShot.new(config.bootstrap_config)
"copro" ->
Pipeline.DSPy.Optimizers.CoPro.new(config.copro_config)
"mipro" ->
Pipeline.DSPy.Optimizers.MIPro.new(config.mipro_config)
"ensemble" ->
Pipeline.DSPy.Optimizers.Ensemble.new(config.ensemble_config)
end
end
end
Optimization Strategies
defmodule Pipeline.DSPy.OptimizationStrategies do
@moduledoc """
Different optimization strategies for various use cases.
"""
def accuracy_focused_optimization(pipeline_name, config) do
optimization_config = %{
data_collection: %{
history: %{limit: 1000},
manual: %{limit: 50},
feedback: %{limit: 100}
},
optimization: %{
optimizer_type: "bootstrap_few_shot",
bootstrap_config: %{
max_bootstrapped_demos: 8,
max_labeled_demos: 16,
max_rounds: 3,
teacher_settings: %{temperature: 0.1}
}
},
adoption_criteria: %{
min_accuracy_improvement: 0.05,
max_cost_increase: 0.2,
min_confidence_level: 0.95
}
}
Pipeline.DSPy.OptimizationController.run_optimization_cycle(
pipeline_name,
optimization_config
)
end
def cost_focused_optimization(pipeline_name, config) do
optimization_config = %{
data_collection: %{
history: %{limit: 500},
manual: %{limit: 20},
feedback: %{limit: 50}
},
optimization: %{
optimizer_type: "mipro",
mipro_config: %{
metric: "cost_effectiveness",
num_candidates: 10,
init_temperature: 1.0,
max_bootstrapped_demos: 5
}
},
adoption_criteria: %{
min_cost_reduction: 0.15,
max_accuracy_loss: 0.02,
min_confidence_level: 0.90
}
}
Pipeline.DSPy.OptimizationController.run_optimization_cycle(
pipeline_name,
optimization_config
)
end
def speed_focused_optimization(pipeline_name, config) do
optimization_config = %{
data_collection: %{
history: %{limit: 300},
manual: %{limit: 10},
feedback: %{limit: 20}
},
optimization: %{
optimizer_type: "copro",
copro_config: %{
depth: 3,
breadth: 2,
max_num_trials: 5,
optimize_for_speed: true
}
},
adoption_criteria: %{
min_speed_improvement: 0.25,
max_accuracy_loss: 0.05,
min_confidence_level: 0.85
}
}
Pipeline.DSPy.OptimizationController.run_optimization_cycle(
pipeline_name,
optimization_config
)
end
end
4. Evaluation Metrics and Scoring
Comprehensive Evaluation Framework
defmodule Pipeline.DSPy.EvaluationFramework do
@moduledoc """
Comprehensive evaluation framework for pipeline performance.
"""
def evaluate_pipeline(pipeline_config, test_cases, evaluation_config) do
# Run pipeline on test cases
results = execute_pipeline_on_test_cases(pipeline_config, test_cases)
# Calculate multiple metrics
metrics = %{
accuracy: calculate_accuracy(results, evaluation_config.accuracy),
precision: calculate_precision(results, evaluation_config.precision),
recall: calculate_recall(results, evaluation_config.recall),
f1_score: calculate_f1_score(results, evaluation_config.f1),
latency: calculate_latency(results),
cost: calculate_cost(results),
quality: calculate_quality(results, evaluation_config.quality)
}
# Calculate composite score
composite_score = calculate_composite_score(metrics, evaluation_config.weights)
# Generate detailed report
%{
pipeline_name: pipeline_config["workflow"]["name"],
test_cases_count: length(test_cases),
metrics: metrics,
composite_score: composite_score,
individual_results: results,
evaluation_timestamp: DateTime.utc_now(),
evaluation_config: evaluation_config
}
end
def calculate_accuracy(results, config) do
correct_predictions = Enum.count(results, fn result ->
evaluate_correctness(result.predicted, result.expected, config)
end)
correct_predictions / length(results)
end
def calculate_quality(results, config) do
quality_scores = Enum.map(results, fn result ->
Pipeline.DSPy.QualityAssessment.assess_output_quality(
result.predicted,
result.expected,
config
)
end)
Enum.sum(quality_scores) / length(quality_scores)
end
def calculate_composite_score(metrics, weights) do
normalized_metrics = normalize_metrics(metrics)
Enum.reduce(normalized_metrics, 0.0, fn {metric, value}, acc ->
weight = Map.get(weights, metric, 1.0)
acc + (value * weight)
end) / Enum.sum(Map.values(weights))
end
end
A/B Testing Framework
defmodule Pipeline.DSPy.ABTesting do
@moduledoc """
A/B testing framework for comparing pipeline versions.
"""
def run_ab_test(pipeline_a, pipeline_b, test_config) do
# Split test cases
{test_a, test_b} = split_test_cases(test_config.test_cases, test_config.split_ratio)
# Run both pipelines
results_a = evaluate_pipeline(pipeline_a, test_a, test_config.evaluation)
results_b = evaluate_pipeline(pipeline_b, test_b, test_config.evaluation)
# Statistical analysis
statistical_significance = calculate_statistical_significance(
results_a.metrics,
results_b.metrics,
test_config.significance_level
)
# Generate recommendation
recommendation = generate_ab_recommendation(
results_a,
results_b,
statistical_significance,
test_config.decision_criteria
)
%{
pipeline_a: results_a,
pipeline_b: results_b,
statistical_significance: statistical_significance,
recommendation: recommendation,
test_timestamp: DateTime.utc_now()
}
end
def calculate_statistical_significance(metrics_a, metrics_b, significance_level) do
# Perform statistical tests (t-test, Mann-Whitney U, etc.)
Enum.map(metrics_a, fn {metric_name, values_a} ->
values_b = Map.get(metrics_b, metric_name)
{p_value, test_statistic} = perform_statistical_test(values_a, values_b)
%{
metric: metric_name,
p_value: p_value,
test_statistic: test_statistic,
significant: p_value < significance_level,
effect_size: calculate_effect_size(values_a, values_b)
}
end)
end
end
5. Continuous Improvement System
Continuous Learning Pipeline
defmodule Pipeline.DSPy.ContinuousImprovement do
@moduledoc """
Continuous learning and improvement system.
"""
def start_continuous_improvement(pipeline_name, config) do
# Schedule regular optimization cycles
schedule_optimization_cycles(pipeline_name, config.schedule)
# Monitor performance drift
monitor_performance_drift(pipeline_name, config.drift_detection)
# Collect new training data
collect_new_training_data(pipeline_name, config.data_collection)
# Update optimization strategies
update_optimization_strategies(pipeline_name, config.strategy_updates)
end
def schedule_optimization_cycles(pipeline_name, schedule_config) do
# Schedule daily, weekly, or monthly optimization cycles
case schedule_config.frequency do
"daily" ->
schedule_daily_optimization(pipeline_name, schedule_config)
"weekly" ->
schedule_weekly_optimization(pipeline_name, schedule_config)
"monthly" ->
schedule_monthly_optimization(pipeline_name, schedule_config)
"adaptive" ->
schedule_adaptive_optimization(pipeline_name, schedule_config)
end
end
def monitor_performance_drift(pipeline_name, drift_config) do
# Track performance metrics over time
# Detect significant performance degradation
# Trigger automatic optimization when drift is detected
current_performance = Pipeline.DSPy.Metrics.calculate_pipeline_performance(
pipeline_name,
drift_config.time_window
)
baseline_performance = Pipeline.DSPy.Storage.get_baseline_performance(pipeline_name)
drift_detected = detect_performance_drift(current_performance, baseline_performance, drift_config)
if drift_detected do
trigger_emergency_optimization(pipeline_name, drift_config)
end
end
end
This comprehensive evaluation and optimization system transforms pipeline_ex from a basic execution engine into a sophisticated, self-improving AI system that learns from usage patterns and continuously optimizes performance.