← Back to Analysis

15 real time optimization framework

Documentation for 15_real_time_optimization_framework from the Pipeline ex repository.

Real-Time Optimization Framework

Overview

This document addresses the critical gap in our DSPy implementation plan regarding real-time optimization constraints. The original analysis emphasized that optimization might introduce latency, but our current plans don’t adequately address real-time performance requirements for production pipeline execution.

Problem Statement

DSPy optimization is computationally expensive and can introduce significant latency. In production environments, users expect immediate or near-immediate responses. The challenge is to provide the benefits of DSPy optimization without compromising user experience through excessive wait times.

Key Challenges

  1. Optimization Latency: DSPy optimization can take seconds to minutes
  2. User Experience: Users expect immediate feedback
  3. Resource Constraints: Optimization requires significant computational resources
  4. Cache Management: Optimized results need intelligent caching strategies
  5. Fallback Mechanisms: System must gracefully handle optimization failures

Architecture Design

1. Async Optimization Engine

Background Optimization Manager

defmodule Pipeline.DSPy.AsyncOptimizationManager do
  @moduledoc """
  Manages background optimization tasks without blocking pipeline execution.
  """
  
  use GenServer
  
  defstruct [
    :optimization_queue,
    :active_optimizations,
    :optimization_workers,
    :optimization_cache,
    :optimization_metrics
  ]
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def schedule_optimization(signature, training_data, priority \\ :normal) do
    GenServer.cast(__MODULE__, {:schedule_optimization, signature, training_data, priority})
  end
  
  def get_optimization_status(signature_hash) do
    GenServer.call(__MODULE__, {:get_optimization_status, signature_hash})
  end
  
  def init(opts) do
    worker_count = Keyword.get(opts, :worker_count, 2)
    
    state = %__MODULE__{
      optimization_queue: PriorityQueue.new(),
      active_optimizations: %{},
      optimization_workers: start_optimization_workers(worker_count),
      optimization_cache: Pipeline.DSPy.OptimizationCache,
      optimization_metrics: %{}
    }
    
    # Start queue processor
    schedule_queue_processing()
    
    {:ok, state}
  end
  
  def handle_cast({:schedule_optimization, signature, training_data, priority}, state) do
    signature_hash = generate_signature_hash(signature)
    
    # Check if optimization is already in progress or cached
    case check_optimization_status(signature_hash, state) do
      {:cached, _result} ->
        # Already optimized, no need to schedule
        {:noreply, state}
      
      {:in_progress, _worker_id} ->
        # Already being optimized, no need to schedule
        {:noreply, state}
      
      :not_found ->
        # Schedule new optimization
        optimization_task = %{
          signature: signature,
          training_data: training_data,
          signature_hash: signature_hash,
          priority: priority,
          scheduled_at: DateTime.utc_now(),
          attempts: 0
        }
        
        new_queue = PriorityQueue.put(state.optimization_queue, optimization_task, priority)
        new_state = %{state | optimization_queue: new_queue}
        
        {:noreply, new_state}
    end
  end
  
  def handle_info(:process_queue, state) do
    case assign_optimization_to_worker(state) do
      {:ok, new_state} ->
        schedule_queue_processing()
        {:noreply, new_state}
      
      {:no_workers_available, new_state} ->
        schedule_queue_processing(500)  # Shorter delay when workers are busy
        {:noreply, new_state}
      
      {:empty_queue, new_state} ->
        schedule_queue_processing()
        {:noreply, new_state}
    end
  end
end

Optimization Worker

defmodule Pipeline.DSPy.OptimizationWorker do
  @moduledoc """
  Worker process that handles individual optimization tasks.
  """
  
  use GenServer
  
  defstruct [
    :worker_id,
    :status,
    :current_task,
    :python_bridge,
    :optimization_timeout
  ]
  
  def start_link(worker_id, opts \\ []) do
    GenServer.start_link(__MODULE__, {worker_id, opts}, name: via_tuple(worker_id))
  end
  
  def optimize(worker_id, optimization_task) do
    GenServer.call(via_tuple(worker_id), {:optimize, optimization_task}, :infinity)
  end
  
  def get_status(worker_id) do
    GenServer.call(via_tuple(worker_id), :get_status)
  end
  
  def init({worker_id, opts}) do
    state = %__MODULE__{
      worker_id: worker_id,
      status: :idle,
      current_task: nil,
      python_bridge: nil,
      optimization_timeout: Keyword.get(opts, :timeout, 300_000)  # 5 minutes
    }
    
    {:ok, state}
  end
  
  def handle_call({:optimize, optimization_task}, _from, state) do
    case state.status do
      :idle ->
        new_state = %{state | status: :optimizing, current_task: optimization_task}
        
        # Start optimization in background
        optimization_ref = start_optimization(optimization_task, state.optimization_timeout)
        
        {:reply, {:ok, optimization_ref}, new_state}
      
      _ ->
        {:reply, {:error, :worker_busy}, state}
    end
  end
  
  defp start_optimization(optimization_task, timeout) do
    parent = self()
    
    Task.start(fn ->
      result = perform_optimization(optimization_task)
      send(parent, {:optimization_complete, result})
    end)
  end
  
  defp perform_optimization(optimization_task) do
    try do
      # Use Python bridge to perform optimization
      case Pipeline.DSPy.PythonBridge.optimize_signature(
        optimization_task.signature,
        optimization_task.training_data
      ) do
        {:ok, optimized_signature} ->
          # Store in cache
          Pipeline.DSPy.OptimizationCache.store_optimization(
            optimization_task.signature_hash,
            optimized_signature
          )
          
          # Record metrics
          Pipeline.DSPy.Metrics.record_optimization_success(
            optimization_task.signature_hash,
            optimized_signature.metrics
          )
          
          {:ok, optimized_signature}
        
        {:error, reason} ->
          Pipeline.DSPy.Metrics.record_optimization_failure(
            optimization_task.signature_hash,
            reason
          )
          
          {:error, reason}
      end
    catch
      :exit, {:timeout, _} ->
        {:error, :optimization_timeout}
      
      error ->
        {:error, error}
    end
  end
end

2. Intelligent Threshold Management

Optimization Trigger System

defmodule Pipeline.DSPy.OptimizationTrigger do
  @moduledoc """
  Determines when to trigger optimization based on various factors.
  """
  
  defstruct [
    :execution_count,
    :error_rate,
    :performance_metrics,
    :last_optimization,
    :optimization_thresholds
  ]
  
  def should_optimize?(signature_hash, execution_context) do
    case get_optimization_status(signature_hash) do
      {:recently_optimized, _} ->
        false
      
      {:never_optimized, _} ->
        should_optimize_new_signature?(execution_context)
      
      {:optimization_available, last_optimization} ->
        should_reoptimize?(signature_hash, last_optimization, execution_context)
    end
  end
  
  defp should_optimize_new_signature?(execution_context) do
    # Optimize new signatures after a minimum number of executions
    minimum_executions = get_minimum_executions_threshold()
    
    execution_context.execution_count >= minimum_executions
  end
  
  defp should_reoptimize?(signature_hash, last_optimization, execution_context) do
    # Consider reoptimization based on multiple factors
    time_since_optimization = DateTime.diff(DateTime.utc_now(), last_optimization.timestamp)
    
    # Reoptimize if:
    # 1. Significant time has passed
    # 2. Error rate has increased
    # 3. Performance has degraded
    # 4. New training data is available
    
    time_threshold_exceeded?(time_since_optimization) or
    error_rate_threshold_exceeded?(signature_hash, execution_context) or
    performance_degradation_detected?(signature_hash, execution_context) or
    new_training_data_available?(signature_hash)
  end
  
  defp time_threshold_exceeded?(time_since_optimization) do
    # Reoptimize after 24 hours by default
    reoptimization_interval = get_reoptimization_interval()
    time_since_optimization > reoptimization_interval
  end
  
  defp error_rate_threshold_exceeded?(signature_hash, execution_context) do
    recent_error_rate = calculate_recent_error_rate(signature_hash)
    baseline_error_rate = get_baseline_error_rate(signature_hash)
    
    recent_error_rate > baseline_error_rate * 1.5  # 50% increase triggers reoptimization
  end
  
  defp performance_degradation_detected?(signature_hash, execution_context) do
    recent_performance = calculate_recent_performance(signature_hash)
    baseline_performance = get_baseline_performance(signature_hash)
    
    recent_performance < baseline_performance * 0.8  # 20% degradation triggers reoptimization
  end
end

Adaptive Thresholds

defmodule Pipeline.DSPy.AdaptiveThresholds do
  @moduledoc """
  Manages adaptive thresholds that adjust based on system performance and usage patterns.
  """
  
  use GenServer
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def get_threshold(signature_hash, threshold_type) do
    GenServer.call(__MODULE__, {:get_threshold, signature_hash, threshold_type})
  end
  
  def update_threshold(signature_hash, threshold_type, new_value) do
    GenServer.cast(__MODULE__, {:update_threshold, signature_hash, threshold_type, new_value})
  end
  
  def init(opts) do
    # Initialize with default thresholds
    state = %{
      global_thresholds: %{
        minimum_executions: 5,
        reoptimization_interval: 24 * 60 * 60,  # 24 hours
        error_rate_multiplier: 1.5,
        performance_degradation_threshold: 0.8
      },
      signature_specific_thresholds: %{},
      threshold_adjustment_history: %{}
    }
    
    # Start threshold adjustment process
    schedule_threshold_adjustment()
    
    {:ok, state}
  end
  
  def handle_info(:adjust_thresholds, state) do
    # Analyze system performance and adjust thresholds
    new_state = analyze_and_adjust_thresholds(state)
    
    schedule_threshold_adjustment()
    
    {:noreply, new_state}
  end
  
  defp analyze_and_adjust_thresholds(state) do
    # Analyze recent optimization effectiveness
    optimization_effectiveness = analyze_optimization_effectiveness()
    
    # Adjust thresholds based on effectiveness
    new_thresholds = adjust_thresholds_based_on_effectiveness(
      state.global_thresholds,
      optimization_effectiveness
    )
    
    %{state | global_thresholds: new_thresholds}
  end
  
  defp adjust_thresholds_based_on_effectiveness(current_thresholds, effectiveness) do
    case effectiveness do
      :high ->
        # Optimizations are very effective, be more aggressive
        %{
          current_thresholds |
          minimum_executions: max(current_thresholds.minimum_executions - 1, 3),
          reoptimization_interval: max(current_thresholds.reoptimization_interval - 3600, 12 * 60 * 60)
        }
      
      :medium ->
        # Current thresholds are working well, keep them
        current_thresholds
      
      :low ->
        # Optimizations are not very effective, be more conservative
        %{
          current_thresholds |
          minimum_executions: min(current_thresholds.minimum_executions + 1, 10),
          reoptimization_interval: min(current_thresholds.reoptimization_interval + 3600, 48 * 60 * 60)
        }
    end
  end
end

3. Multi-Level Caching Strategy

Hierarchical Cache System

defmodule Pipeline.DSPy.HierarchicalCache do
  @moduledoc """
  Multi-level caching system for optimization results.
  """
  
  defstruct [
    :l1_cache,    # In-memory, fastest access
    :l2_cache,    # Local disk, medium speed
    :l3_cache,    # Distributed cache, slowest but largest
    :cache_stats
  ]
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def get_optimization(signature_hash) do
    GenServer.call(__MODULE__, {:get_optimization, signature_hash})
  end
  
  def store_optimization(signature_hash, optimization_result, ttl \\ nil) do
    GenServer.cast(__MODULE__, {:store_optimization, signature_hash, optimization_result, ttl})
  end
  
  def init(opts) do
    state = %__MODULE__{
      l1_cache: initialize_l1_cache(opts),
      l2_cache: initialize_l2_cache(opts),
      l3_cache: initialize_l3_cache(opts),
      cache_stats: %{hits: 0, misses: 0, evictions: 0}
    }
    
    {:ok, state}
  end
  
  def handle_call({:get_optimization, signature_hash}, _from, state) do
    case get_from_cache_hierarchy(signature_hash, state) do
      {:ok, optimization_result, cache_level} ->
        # Promote to higher cache levels if found in lower levels
        new_state = promote_to_higher_caches(signature_hash, optimization_result, cache_level, state)
        
        # Update cache stats
        stats = %{state.cache_stats | hits: state.cache_stats.hits + 1}
        final_state = %{new_state | cache_stats: stats}
        
        {:reply, {:ok, optimization_result}, final_state}
      
      :not_found ->
        # Update cache stats
        stats = %{state.cache_stats | misses: state.cache_stats.misses + 1}
        new_state = %{state | cache_stats: stats}
        
        {:reply, :not_found, new_state}
    end
  end
  
  defp get_from_cache_hierarchy(signature_hash, state) do
    # Try L1 cache first (fastest)
    case get_from_l1_cache(signature_hash, state.l1_cache) do
      {:ok, result} ->
        {:ok, result, :l1}
      
      :not_found ->
        # Try L2 cache
        case get_from_l2_cache(signature_hash, state.l2_cache) do
          {:ok, result} ->
            {:ok, result, :l2}
          
          :not_found ->
            # Try L3 cache
            case get_from_l3_cache(signature_hash, state.l3_cache) do
              {:ok, result} ->
                {:ok, result, :l3}
              
              :not_found ->
                :not_found
            end
        end
    end
  end
  
  defp promote_to_higher_caches(signature_hash, optimization_result, found_level, state) do
    case found_level do
      :l3 ->
        # Promote to L2 and L1
        new_l2_cache = store_in_l2_cache(signature_hash, optimization_result, state.l2_cache)
        new_l1_cache = store_in_l1_cache(signature_hash, optimization_result, state.l1_cache)
        
        %{state | l1_cache: new_l1_cache, l2_cache: new_l2_cache}
      
      :l2 ->
        # Promote to L1
        new_l1_cache = store_in_l1_cache(signature_hash, optimization_result, state.l1_cache)
        
        %{state | l1_cache: new_l1_cache}
      
      :l1 ->
        # Already in highest cache level
        state
    end
  end
end

Predictive Cache Warming

defmodule Pipeline.DSPy.PredictiveCache do
  @moduledoc """
  Predictively warms caches based on usage patterns.
  """
  
  use GenServer
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def record_signature_usage(signature_hash, context) do
    GenServer.cast(__MODULE__, {:record_usage, signature_hash, context})
  end
  
  def init(opts) do
    state = %{
      usage_patterns: %{},
      prediction_models: %{},
      warming_queue: :queue.new()
    }
    
    # Start prediction analysis
    schedule_prediction_analysis()
    
    {:ok, state}
  end
  
  def handle_info(:analyze_predictions, state) do
    # Analyze usage patterns and predict future needs
    predictions = analyze_usage_patterns(state.usage_patterns)
    
    # Queue signatures for warming
    new_warming_queue = queue_signatures_for_warming(predictions, state.warming_queue)
    
    # Start warming process
    process_warming_queue(new_warming_queue)
    
    new_state = %{state | warming_queue: new_warming_queue}
    
    schedule_prediction_analysis()
    
    {:noreply, new_state}
  end
  
  defp analyze_usage_patterns(usage_patterns) do
    # Use simple heuristics to predict future usage
    # Could be enhanced with machine learning models
    
    Enum.reduce(usage_patterns, [], fn {signature_hash, pattern}, acc ->
      case predict_future_usage(pattern) do
        {:likely, confidence} when confidence > 0.7 ->
          [{signature_hash, confidence} | acc]
        
        _ ->
          acc
      end
    end)
  end
  
  defp predict_future_usage(pattern) do
    # Simple prediction based on usage frequency and recency
    recent_usage = count_recent_usage(pattern.usage_history)
    usage_frequency = calculate_usage_frequency(pattern.usage_history)
    
    confidence = min(recent_usage * 0.3 + usage_frequency * 0.7, 1.0)
    
    if confidence > 0.5 do
      {:likely, confidence}
    else
      {:unlikely, confidence}
    end
  end
end

4. Fallback and Timeout Mechanisms

Execution Flow with Fallbacks

defmodule Pipeline.DSPy.ExecutionFlow do
  @moduledoc """
  Manages execution flow with intelligent fallback mechanisms.
  """
  
  def execute_with_optimization(step, context, options \\ []) do
    timeout = Keyword.get(options, :timeout, 5000)
    fallback_strategy = Keyword.get(options, :fallback_strategy, :traditional)
    
    case attempt_optimized_execution(step, context, timeout) do
      {:ok, result} ->
        {:ok, result}
      
      {:error, :timeout} ->
        handle_timeout_fallback(step, context, fallback_strategy)
      
      {:error, :optimization_not_available} ->
        handle_no_optimization_fallback(step, context, fallback_strategy)
      
      {:error, reason} ->
        handle_error_fallback(step, context, fallback_strategy, reason)
    end
  end
  
  defp attempt_optimized_execution(step, context, timeout) do
    signature_hash = generate_signature_hash(step["dspy_signature"])
    
    # Try to get cached optimization first
    case Pipeline.DSPy.HierarchicalCache.get_optimization(signature_hash) do
      {:ok, optimization_result} ->
        execute_with_optimized_signature(step, context, optimization_result)
      
      :not_found ->
        # Check if optimization is in progress
        case Pipeline.DSPy.AsyncOptimizationManager.get_optimization_status(signature_hash) do
          {:in_progress, _} ->
            # Wait for a limited time for optimization to complete
            wait_for_optimization_with_timeout(signature_hash, timeout)
          
          :not_found ->
            # Schedule optimization for future use
            schedule_background_optimization(step, context)
            {:error, :optimization_not_available}
        end
    end
  end
  
  defp wait_for_optimization_with_timeout(signature_hash, timeout) do
    start_time = System.monotonic_time(:millisecond)
    
    case wait_for_optimization(signature_hash, timeout) do
      {:ok, optimization_result} ->
        elapsed = System.monotonic_time(:millisecond) - start_time
        Logger.info("Optimization completed in #{elapsed}ms")
        {:ok, optimization_result}
      
      :timeout ->
        Logger.warning("Optimization timeout after #{timeout}ms")
        {:error, :timeout}
    end
  end
  
  defp handle_timeout_fallback(step, context, fallback_strategy) do
    Logger.info("Falling back to traditional execution due to optimization timeout")
    
    case fallback_strategy do
      :traditional ->
        execute_traditional_step(step, context)
      
      :cached_best_effort ->
        execute_with_best_available_optimization(step, context)
      
      :error ->
        {:error, :optimization_timeout}
    end
  end
  
  defp handle_no_optimization_fallback(step, context, fallback_strategy) do
    Logger.info("No optimization available, using fallback strategy: #{fallback_strategy}")
    
    case fallback_strategy do
      :traditional ->
        execute_traditional_step(step, context)
      
      :schedule_and_execute ->
        # Schedule optimization for future use and execute traditionally
        schedule_background_optimization(step, context)
        execute_traditional_step(step, context)
      
      :wait_and_retry ->
        # Wait briefly and retry once
        :timer.sleep(100)
        attempt_optimized_execution(step, context, 1000)
    end
  end
  
  defp execute_traditional_step(step, context) do
    # Remove DSPy-specific options and execute with traditional provider
    traditional_step = Map.drop(step, ["dspy_signature", "dspy_config"])
    
    Pipeline.Executor.execute_step(traditional_step, context)
  end
end

Timeout Management

defmodule Pipeline.DSPy.TimeoutManager do
  @moduledoc """
  Manages timeouts for various DSPy operations.
  """
  
  defstruct [
    :operation_timeouts,
    :adaptive_timeouts,
    :timeout_history
  ]
  
  def get_timeout(operation_type, signature_hash \\ nil) do
    case signature_hash do
      nil ->
        get_default_timeout(operation_type)
      
      hash ->
        get_adaptive_timeout(operation_type, hash)
    end
  end
  
  def record_operation_time(operation_type, signature_hash, duration, success) do
    # Record operation time for adaptive timeout calculation
    GenServer.cast(__MODULE__, {:record_operation_time, operation_type, signature_hash, duration, success})
  end
  
  defp get_default_timeout(operation_type) do
    case operation_type do
      :optimization -> 300_000  # 5 minutes
      :execution -> 30_000      # 30 seconds
      :cache_lookup -> 1_000    # 1 second
      :python_bridge -> 60_000  # 1 minute
    end
  end
  
  defp get_adaptive_timeout(operation_type, signature_hash) do
    # Calculate adaptive timeout based on historical performance
    historical_times = get_historical_operation_times(operation_type, signature_hash)
    
    case historical_times do
      [] ->
        get_default_timeout(operation_type)
      
      times ->
        # Use 95th percentile of historical times
        percentile_95 = calculate_percentile(times, 0.95)
        
        # Cap at 2x default timeout
        min(percentile_95 * 1.2, get_default_timeout(operation_type) * 2)
    end
  end
end

5. Performance Monitoring and Metrics

Real-Time Performance Monitoring

defmodule Pipeline.DSPy.PerformanceMonitor do
  @moduledoc """
  Monitors real-time performance of DSPy operations.
  """
  
  use GenServer
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def record_execution_metrics(signature_hash, metrics) do
    GenServer.cast(__MODULE__, {:record_execution_metrics, signature_hash, metrics})
  end
  
  def get_performance_summary(signature_hash) do
    GenServer.call(__MODULE__, {:get_performance_summary, signature_hash})
  end
  
  def init(opts) do
    state = %{
      execution_metrics: %{},
      performance_alerts: %{},
      monitoring_config: load_monitoring_config(opts)
    }
    
    # Start performance analysis
    schedule_performance_analysis()
    
    {:ok, state}
  end
  
  def handle_cast({:record_execution_metrics, signature_hash, metrics}, state) do
    # Update metrics for signature
    updated_metrics = update_signature_metrics(state.execution_metrics, signature_hash, metrics)
    
    # Check for performance alerts
    new_alerts = check_performance_alerts(signature_hash, metrics, state.performance_alerts)
    
    new_state = %{
      state |
      execution_metrics: updated_metrics,
      performance_alerts: new_alerts
    }
    
    {:noreply, new_state}
  end
  
  def handle_info(:analyze_performance, state) do
    # Analyze overall system performance
    performance_analysis = analyze_system_performance(state.execution_metrics)
    
    # Generate performance report
    generate_performance_report(performance_analysis)
    
    # Update optimization recommendations
    update_optimization_recommendations(performance_analysis)
    
    schedule_performance_analysis()
    
    {:noreply, state}
  end
  
  defp check_performance_alerts(signature_hash, metrics, current_alerts) do
    alerts = []
    
    # Check for slow optimization
    if metrics.optimization_time > 60_000 do  # 1 minute
      alerts = [{:slow_optimization, signature_hash, metrics.optimization_time} | alerts]
    end
    
    # Check for high error rate
    if metrics.error_rate > 0.1 do  # 10% error rate
      alerts = [{:high_error_rate, signature_hash, metrics.error_rate} | alerts]
    end
    
    # Check for cache miss rate
    if metrics.cache_miss_rate > 0.5 do  # 50% cache miss rate
      alerts = [{:high_cache_miss_rate, signature_hash, metrics.cache_miss_rate} | alerts]
    end
    
    # Merge with current alerts
    merge_alerts(current_alerts, alerts)
  end
end

Integration with Existing System

1. Enhanced Executor Integration

defmodule Pipeline.Enhanced.Executor do
  def execute_step(step, context) do
    case step["type"] do
      "dspy_" <> _ ->
        # Use real-time optimization framework
        Pipeline.DSPy.ExecutionFlow.execute_with_optimization(step, context)
      
      _ ->
        # Use traditional execution
        execute_traditional_step(step, context)
    end
  end
end

2. Configuration Integration

workflow:
  name: real_time_optimized_pipeline
  
  dspy_config:
    real_time_optimization:
      enabled: true
      max_wait_time: 5000  # 5 seconds
      fallback_strategy: "traditional"
      cache_warming: true
      adaptive_thresholds: true
    
    optimization_triggers:
      minimum_executions: 5
      error_rate_threshold: 0.15
      performance_degradation_threshold: 0.2
      reoptimization_interval: 86400  # 24 hours
  
  steps:
    - name: analyze_with_optimization
      type: dspy_claude
      real_time_config:
        max_optimization_wait: 3000
        fallback_strategy: "traditional"
        cache_priority: "high"

Testing and Validation

1. Performance Tests

defmodule Pipeline.DSPy.PerformanceTest do
  use ExUnit.Case
  
  test "optimization does not block execution beyond timeout" do
    step = create_dspy_step()
    context = create_test_context()
    
    {time, result} = :timer.tc(fn ->
      Pipeline.DSPy.ExecutionFlow.execute_with_optimization(step, context, timeout: 5000)
    end)
    
    # Should complete within timeout plus small buffer
    assert time < 6000  # 6 seconds
    assert {:ok, _} = result
  end
  
  test "fallback to traditional execution works correctly" do
    step = create_dspy_step()
    context = create_test_context()
    
    # Simulate optimization timeout
    {:ok, result} = Pipeline.DSPy.ExecutionFlow.execute_with_optimization(
      step, 
      context, 
      timeout: 1, 
      fallback_strategy: :traditional
    )
    
    assert result["success"] == true
    assert result["execution_mode"] == "traditional"
  end
end

2. Load Tests

defmodule Pipeline.DSPy.LoadTest do
  use ExUnit.Case
  
  test "system handles concurrent optimization requests" do
    steps = create_multiple_dspy_steps(10)
    
    tasks = Enum.map(steps, fn step ->
      Task.async(fn ->
        Pipeline.DSPy.ExecutionFlow.execute_with_optimization(step, create_test_context())
      end)
    end)
    
    results = Task.await_many(tasks, 30_000)
    
    # All should complete successfully
    Enum.each(results, fn result ->
      assert {:ok, _} = result
    end)
  end
end

This real-time optimization framework provides a comprehensive solution for managing DSPy optimization without compromising user experience, ensuring that the system remains responsive while still providing the benefits of intelligent optimization.