← Back to Analysis

08 hybrid execution architecture

Documentation for 08_hybrid_execution_architecture from the Pipeline ex repository.

Hybrid Execution Architecture for DSPy Integration

Overview

This document outlines the hybrid execution architecture that seamlessly integrates DSPy optimization with the existing pipeline_ex system, providing both traditional execution and optimized execution modes within a unified framework.

Architectural Principles

1. Backward Compatibility

  • Existing pipelines continue to work without modification
  • Progressive enhancement model
  • Graceful degradation when DSPy is unavailable

2. Seamless Integration

  • Single configuration format supports both modes
  • Unified API for execution
  • Transparent optimization

3. Performance Optimization

  • Intelligent caching of optimization results
  • Lazy optimization initialization
  • Resource-efficient execution

Core Architecture Components

1. Unified Execution Engine

Master Executor

defmodule Pipeline.HybridExecutor do
  @moduledoc """
  Unified execution engine supporting both traditional and DSPy-optimized execution.
  """
  
  def execute(workflow, opts \\ []) do
    # Determine execution mode
    execution_mode = determine_execution_mode(workflow, opts)
    
    # Initialize execution context
    context = initialize_hybrid_context(workflow, opts, execution_mode)
    
    # Execute based on mode
    case execution_mode do
      :traditional ->
        execute_traditional(workflow, context)
        
      :dspy_optimized ->
        execute_dspy_optimized(workflow, context)
        
      :hybrid ->
        execute_hybrid(workflow, context)
        
      :evaluation ->
        execute_evaluation(workflow, context)
    end
  end
  
  defp determine_execution_mode(workflow, opts) do
    # Check explicit mode override
    case Keyword.get(opts, :execution_mode) do
      mode when mode in [:traditional, :dspy_optimized, :hybrid, :evaluation] ->
        mode
        
      nil ->
        # Determine from workflow configuration
        determine_from_workflow_config(workflow)
    end
  end
  
  defp determine_from_workflow_config(workflow) do
    dspy_config = get_in(workflow, ["workflow", "dspy_config"])
    
    cond do
      is_nil(dspy_config) ->
        :traditional
        
      dspy_config["optimization_enabled"] == true ->
        :dspy_optimized
        
      dspy_config["hybrid_mode"] == true ->
        :hybrid
        
      true ->
        :traditional
    end
  end
end

Hybrid Context Manager

defmodule Pipeline.HybridContext do
  @moduledoc """
  Manages execution context for hybrid execution.
  """
  
  defstruct [
    :workflow_name,
    :execution_mode,
    :traditional_context,
    :dspy_context,
    :optimization_cache,
    :metrics_collector,
    :fallback_enabled
  ]
  
  def new(workflow, opts, execution_mode) do
    %__MODULE__{
      workflow_name: workflow["workflow"]["name"],
      execution_mode: execution_mode,
      traditional_context: Pipeline.Executor.initialize_context(workflow, opts),
      dspy_context: initialize_dspy_context(workflow, opts),
      optimization_cache: Pipeline.DSPy.Cache.new(),
      metrics_collector: Pipeline.DSPy.Metrics.new_collector(),
      fallback_enabled: Keyword.get(opts, :fallback_enabled, true)
    }
  end
  
  def initialize_dspy_context(workflow, opts) do
    case workflow["workflow"]["dspy_config"] do
      nil ->
        nil
        
      dspy_config ->
        %Pipeline.DSPy.Context{
          optimization_enabled: dspy_config["optimization_enabled"],
          evaluation_mode: dspy_config["evaluation_mode"],
          training_data_path: dspy_config["training_data_path"],
          cache_enabled: dspy_config["cache_enabled"] || true,
          fallback_strategy: dspy_config["fallback_strategy"] || "traditional"
        }
    end
  end
end

2. Intelligent Step Routing

Step Router

defmodule Pipeline.HybridStepRouter do
  @moduledoc """
  Routes steps to appropriate execution engines based on optimization availability.
  """
  
  def route_step(step, context) do
    case determine_step_execution_mode(step, context) do
      :traditional ->
        execute_traditional_step(step, context)
        
      :dspy_optimized ->
        execute_dspy_step(step, context)
        
      :hybrid ->
        execute_hybrid_step(step, context)
        
      :fallback ->
        execute_fallback_step(step, context)
    end
  end
  
  defp determine_step_execution_mode(step, context) do
    cond do
      # Step explicitly requests traditional execution
      step["execution_mode"] == "traditional" ->
        :traditional
        
      # Step has DSPy optimization available
      has_dspy_optimization?(step) and context.execution_mode == :dspy_optimized ->
        :dspy_optimized
        
      # Hybrid mode with intelligent routing
      context.execution_mode == :hybrid ->
        determine_hybrid_routing(step, context)
        
      # DSPy unavailable, fallback to traditional
      context.execution_mode == :dspy_optimized and not dspy_available?() ->
        :fallback
        
      true ->
        :traditional
    end
  end
  
  defp determine_hybrid_routing(step, context) do
    # Intelligent routing based on step characteristics
    optimization_available = has_dspy_optimization?(step)
    performance_benefit = estimate_performance_benefit(step, context)
    
    cond do
      optimization_available and performance_benefit > 0.2 ->
        :dspy_optimized
        
      optimization_available and performance_benefit > 0.1 ->
        # A/B test between traditional and optimized
        if should_ab_test?(step, context) do
          :hybrid
        else
          :dspy_optimized
        end
        
      true ->
        :traditional
    end
  end
  
  defp execute_hybrid_step(step, context) do
    # Execute both traditional and DSPy versions
    traditional_result = execute_traditional_step(step, context)
    dspy_result = execute_dspy_step(step, context)
    
    # Compare results and choose best
    best_result = select_best_result(traditional_result, dspy_result, step, context)
    
    # Record A/B test results
    record_ab_test_result(step, traditional_result, dspy_result, best_result, context)
    
    best_result
  end
end

3. Optimization Cache System

Multi-Level Caching

defmodule Pipeline.DSPy.Cache do
  @moduledoc """
  Multi-level caching system for DSPy optimizations.
  """
  
  defstruct [
    :memory_cache,
    :disk_cache,
    :distributed_cache,
    :cache_config
  ]
  
  def new(config \\ %{}) do
    %__MODULE__{
      memory_cache: :ets.new(:dspy_memory_cache, [:set, :public]),
      disk_cache: initialize_disk_cache(config),
      distributed_cache: initialize_distributed_cache(config),
      cache_config: config
    }
  end
  
  def get_optimization(cache, signature_hash, optimization_params) do
    cache_key = build_cache_key(signature_hash, optimization_params)
    
    # Try memory cache first
    case :ets.lookup(cache.memory_cache, cache_key) do
      [{^cache_key, cached_result}] ->
        {:ok, cached_result}
        
      [] ->
        # Try disk cache
        case get_from_disk_cache(cache.disk_cache, cache_key) do
          {:ok, cached_result} ->
            # Store in memory cache for faster access
            :ets.insert(cache.memory_cache, {cache_key, cached_result})
            {:ok, cached_result}
            
          :not_found ->
            # Try distributed cache
            get_from_distributed_cache(cache.distributed_cache, cache_key)
        end
    end
  end
  
  def store_optimization(cache, signature_hash, optimization_params, result) do
    cache_key = build_cache_key(signature_hash, optimization_params)
    
    # Store in all cache levels
    :ets.insert(cache.memory_cache, {cache_key, result})
    store_in_disk_cache(cache.disk_cache, cache_key, result)
    store_in_distributed_cache(cache.distributed_cache, cache_key, result)
    
    :ok
  end
  
  def invalidate_optimization(cache, signature_hash) do
    # Find all cache keys for this signature
    pattern = build_cache_pattern(signature_hash)
    
    # Invalidate from all cache levels
    :ets.match_delete(cache.memory_cache, pattern)
    invalidate_from_disk_cache(cache.disk_cache, pattern)
    invalidate_from_distributed_cache(cache.distributed_cache, pattern)
    
    :ok
  end
end

4. Fallback and Error Handling

Graceful Degradation System

defmodule Pipeline.HybridFallback do
  @moduledoc """
  Handles graceful degradation when DSPy optimization fails.
  """
  
  def execute_with_fallback(step, context, primary_mode) do
    try do
      case primary_mode do
        :dspy_optimized ->
          execute_dspy_with_fallback(step, context)
          
        :hybrid ->
          execute_hybrid_with_fallback(step, context)
      end
    rescue
      error ->
        handle_execution_error(step, context, error, primary_mode)
    end
  end
  
  defp execute_dspy_with_fallback(step, context) do
    case Pipeline.DSPy.StepExecutor.execute(step, context) do
      {:ok, result} ->
        {:ok, result}
        
      {:error, :dspy_unavailable} ->
        Logger.warning("DSPy unavailable for step #{step["name"]}, falling back to traditional")
        execute_traditional_fallback(step, context)
        
      {:error, :optimization_failed} ->
        Logger.warning("DSPy optimization failed for step #{step["name"]}, falling back to traditional")
        execute_traditional_fallback(step, context)
        
      {:error, :timeout} ->
        Logger.warning("DSPy optimization timeout for step #{step["name"]}, falling back to traditional")
        execute_traditional_fallback(step, context)
        
      {:error, reason} ->
        # Other errors should be propagated
        {:error, reason}
    end
  end
  
  defp execute_traditional_fallback(step, context) do
    # Remove DSPy-specific configuration
    traditional_step = prepare_traditional_step(step)
    
    # Execute using traditional pipeline
    Pipeline.Executor.execute_step(traditional_step, context.traditional_context)
  end
  
  defp prepare_traditional_step(step) do
    step
    |> Map.drop(["dspy_config", "signature"])
    |> ensure_traditional_compatibility()
  end
  
  def handle_execution_error(step, context, error, primary_mode) do
    # Log error
    Logger.error("Execution error in #{primary_mode} mode for step #{step["name"]}: #{inspect(error)}")
    
    # Record error metrics
    Pipeline.DSPy.Metrics.record_execution_error(step, error, primary_mode)
    
    # Attempt fallback if enabled
    if context.fallback_enabled do
      Logger.info("Attempting fallback execution for step #{step["name"]}")
      execute_traditional_fallback(step, context)
    else
      {:error, "Execution failed: #{inspect(error)}"}
    end
  end
end

5. Performance Monitoring and Metrics

Hybrid Performance Monitor

defmodule Pipeline.HybridPerformanceMonitor do
  @moduledoc """
  Monitors performance across traditional and DSPy execution modes.
  """
  
  def monitor_execution(step, context, execution_mode) do
    start_time = System.monotonic_time(:millisecond)
    
    # Start monitoring
    monitor_ref = start_monitoring(step, execution_mode)
    
    try do
      # Execute step
      result = execute_monitored_step(step, context, execution_mode)
      
      # Record successful execution
      end_time = System.monotonic_time(:millisecond)
      record_execution_success(step, execution_mode, end_time - start_time, result)
      
      result
    rescue
      error ->
        # Record failed execution
        end_time = System.monotonic_time(:millisecond)
        record_execution_failure(step, execution_mode, end_time - start_time, error)
        
        reraise error, __STACKTRACE__
    after
      # Stop monitoring
      stop_monitoring(monitor_ref)
    end
  end
  
  def compare_execution_modes(step, context) do
    # Execute with both modes
    traditional_metrics = execute_and_measure(step, context, :traditional)
    dspy_metrics = execute_and_measure(step, context, :dspy_optimized)
    
    # Compare metrics
    comparison = %{
      traditional: traditional_metrics,
      dspy_optimized: dspy_metrics,
      performance_difference: calculate_performance_difference(traditional_metrics, dspy_metrics),
      recommendation: generate_mode_recommendation(traditional_metrics, dspy_metrics)
    }
    
    # Store comparison results
    store_mode_comparison(step, comparison)
    
    comparison
  end
  
  defp calculate_performance_difference(traditional, dspy) do
    %{
      execution_time_diff: dspy.execution_time - traditional.execution_time,
      cost_diff: dspy.cost - traditional.cost,
      quality_diff: dspy.quality_score - traditional.quality_score,
      success_rate_diff: dspy.success_rate - traditional.success_rate
    }
  end
  
  defp generate_mode_recommendation(traditional, dspy) do
    cond do
      # DSPy is significantly better
      dspy.quality_score > traditional.quality_score + 0.1 and
      dspy.cost < traditional.cost * 1.2 ->
        {:recommend_dspy, "DSPy provides better quality at reasonable cost"}
        
      # Traditional is more cost-effective
      traditional.cost < dspy.cost * 0.8 and
      traditional.quality_score > dspy.quality_score - 0.05 ->
        {:recommend_traditional, "Traditional execution is more cost-effective"}
        
      # Performance is similar
      abs(dspy.quality_score - traditional.quality_score) < 0.05 ->
        {:recommend_hybrid, "Performance is similar, use hybrid mode for A/B testing"}
        
      true ->
        {:recommend_evaluation, "Need more data to make recommendation"}
    end
  end
end

6. Configuration Management

Unified Configuration System

defmodule Pipeline.HybridConfig do
  @moduledoc """
  Unified configuration system supporting both traditional and DSPy modes.
  """
  
  def parse_hybrid_config(yaml_config) do
    workflow = yaml_config["workflow"]
    
    base_config = %{
      name: workflow["name"],
      description: workflow["description"],
      steps: workflow["steps"],
      traditional_config: extract_traditional_config(workflow),
      dspy_config: extract_dspy_config(workflow),
      hybrid_config: extract_hybrid_config(workflow)
    }
    
    validate_hybrid_config(base_config)
  end
  
  defp extract_dspy_config(workflow) do
    case workflow["dspy_config"] do
      nil ->
        %{optimization_enabled: false}
        
      dspy_config ->
        %{
          optimization_enabled: dspy_config["optimization_enabled"] || false,
          evaluation_mode: dspy_config["evaluation_mode"] || "bootstrap_few_shot",
          training_data_path: dspy_config["training_data_path"],
          cache_enabled: dspy_config["cache_enabled"] || true,
          fallback_strategy: dspy_config["fallback_strategy"] || "traditional",
          optimization_frequency: dspy_config["optimization_frequency"] || "weekly"
        }
    end
  end
  
  defp extract_hybrid_config(workflow) do
    case workflow["hybrid_config"] do
      nil ->
        %{mode: :traditional}
        
      hybrid_config ->
        %{
          mode: String.to_atom(hybrid_config["mode"] || "traditional"),
          intelligent_routing: hybrid_config["intelligent_routing"] || false,
          ab_testing_enabled: hybrid_config["ab_testing_enabled"] || false,
          performance_threshold: hybrid_config["performance_threshold"] || 0.1,
          fallback_enabled: hybrid_config["fallback_enabled"] || true
        }
    end
  end
  
  def validate_hybrid_config(config) do
    with :ok <- validate_traditional_compatibility(config),
         :ok <- validate_dspy_compatibility(config),
         :ok <- validate_hybrid_compatibility(config) do
      {:ok, config}
    else
      {:error, reason} ->
        {:error, "Invalid hybrid configuration: #{reason}"}
    end
  end
end

7. Migration and Compatibility

Migration Helper

defmodule Pipeline.HybridMigration do
  @moduledoc """
  Helps migrate existing pipelines to hybrid execution.
  """
  
  def migrate_to_hybrid(traditional_pipeline_path, migration_config) do
    # Load traditional pipeline
    {:ok, traditional_config} = Pipeline.Config.load_workflow(traditional_pipeline_path)
    
    # Generate DSPy configuration
    dspy_config = generate_dspy_config(traditional_config, migration_config)
    
    # Create hybrid configuration
    hybrid_config = merge_configurations(traditional_config, dspy_config)
    
    # Validate hybrid configuration
    case Pipeline.HybridConfig.validate_hybrid_config(hybrid_config) do
      {:ok, validated_config} ->
        # Save hybrid pipeline
        hybrid_path = generate_hybrid_path(traditional_pipeline_path)
        save_hybrid_pipeline(hybrid_path, validated_config)
        
        {:ok, hybrid_path}
        
      {:error, reason} ->
        {:error, "Migration failed: #{reason}"}
    end
  end
  
  defp generate_dspy_config(traditional_config, migration_config) do
    steps = traditional_config["workflow"]["steps"]
    
    # Analyze steps to determine DSPy candidates
    dspy_candidates = identify_dspy_candidates(steps)
    
    # Generate DSPy signatures for candidates
    signatures = generate_signatures_for_candidates(dspy_candidates)
    
    %{
      "dspy_config" => %{
        "optimization_enabled" => migration_config.enable_optimization,
        "evaluation_mode" => migration_config.evaluation_mode || "bootstrap_few_shot",
        "signatures" => signatures,
        "fallback_strategy" => "traditional"
      }
    }
  end
  
  defp identify_dspy_candidates(steps) do
    # Identify steps that would benefit from DSPy optimization
    Enum.filter(steps, fn step ->
      step_type = step["type"]
      
      step_type in ["claude", "gemini", "claude_smart", "claude_extract"] and
      has_complex_prompt?(step) and
      suitable_for_optimization?(step)
    end)
  end
end

Example Hybrid Pipeline Configuration

workflow:
  name: hybrid_code_analyzer
  description: "Code analysis with hybrid execution"
  
  # Traditional configuration (backward compatible)
  steps:
    - name: analyze_structure
      type: claude
      prompt:
        - type: static
          content: "Analyze this code structure"
      
    - name: generate_docs
      type: claude_smart
      prompt:
        - type: static
          content: "Generate documentation"
  
  # DSPy configuration
  dspy_config:
    optimization_enabled: true
    evaluation_mode: "bootstrap_few_shot"
    cache_enabled: true
    fallback_strategy: "traditional"
    
  # Hybrid execution configuration
  hybrid_config:
    mode: "hybrid"
    intelligent_routing: true
    ab_testing_enabled: true
    performance_threshold: 0.1
    fallback_enabled: true

Benefits of Hybrid Architecture

1. Gradual Migration

  • Existing pipelines work unchanged
  • Step-by-step optimization adoption
  • Risk-free experimentation

2. Intelligent Optimization

  • Automatic routing based on performance
  • A/B testing for continuous improvement
  • Fallback ensures reliability

3. Comprehensive Monitoring

  • Performance comparison across modes
  • Cost-benefit analysis
  • Optimization recommendations

4. Production Ready

  • Graceful degradation
  • Error handling and recovery
  • Caching for performance

This hybrid architecture provides a robust foundation for integrating DSPy optimization while maintaining the reliability and usability that makes pipeline_ex effective for real-world software development tasks.