Real-Time Optimization Framework
Overview
This document addresses the critical gap in our DSPy implementation plan regarding real-time optimization constraints. The original analysis emphasized that optimization might introduce latency, but our current plans don’t adequately address real-time performance requirements for production pipeline execution.
Problem Statement
DSPy optimization is computationally expensive and can introduce significant latency. In production environments, users expect immediate or near-immediate responses. The challenge is to provide the benefits of DSPy optimization without compromising user experience through excessive wait times.
Key Challenges
- Optimization Latency: DSPy optimization can take seconds to minutes
- User Experience: Users expect immediate feedback
- Resource Constraints: Optimization requires significant computational resources
- Cache Management: Optimized results need intelligent caching strategies
- Fallback Mechanisms: System must gracefully handle optimization failures
Architecture Design
1. Async Optimization Engine
Background Optimization Manager
defmodule Pipeline.DSPy.AsyncOptimizationManager do
@moduledoc """
Manages background optimization tasks without blocking pipeline execution.
"""
use GenServer
defstruct [
:optimization_queue,
:active_optimizations,
:optimization_workers,
:optimization_cache,
:optimization_metrics
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def schedule_optimization(signature, training_data, priority \\ :normal) do
GenServer.cast(__MODULE__, {:schedule_optimization, signature, training_data, priority})
end
def get_optimization_status(signature_hash) do
GenServer.call(__MODULE__, {:get_optimization_status, signature_hash})
end
def init(opts) do
worker_count = Keyword.get(opts, :worker_count, 2)
state = %__MODULE__{
optimization_queue: PriorityQueue.new(),
active_optimizations: %{},
optimization_workers: start_optimization_workers(worker_count),
optimization_cache: Pipeline.DSPy.OptimizationCache,
optimization_metrics: %{}
}
# Start queue processor
schedule_queue_processing()
{:ok, state}
end
def handle_cast({:schedule_optimization, signature, training_data, priority}, state) do
signature_hash = generate_signature_hash(signature)
# Check if optimization is already in progress or cached
case check_optimization_status(signature_hash, state) do
{:cached, _result} ->
# Already optimized, no need to schedule
{:noreply, state}
{:in_progress, _worker_id} ->
# Already being optimized, no need to schedule
{:noreply, state}
:not_found ->
# Schedule new optimization
optimization_task = %{
signature: signature,
training_data: training_data,
signature_hash: signature_hash,
priority: priority,
scheduled_at: DateTime.utc_now(),
attempts: 0
}
new_queue = PriorityQueue.put(state.optimization_queue, optimization_task, priority)
new_state = %{state | optimization_queue: new_queue}
{:noreply, new_state}
end
end
def handle_info(:process_queue, state) do
case assign_optimization_to_worker(state) do
{:ok, new_state} ->
schedule_queue_processing()
{:noreply, new_state}
{:no_workers_available, new_state} ->
schedule_queue_processing(500) # Shorter delay when workers are busy
{:noreply, new_state}
{:empty_queue, new_state} ->
schedule_queue_processing()
{:noreply, new_state}
end
end
end
Optimization Worker
defmodule Pipeline.DSPy.OptimizationWorker do
@moduledoc """
Worker process that handles individual optimization tasks.
"""
use GenServer
defstruct [
:worker_id,
:status,
:current_task,
:python_bridge,
:optimization_timeout
]
def start_link(worker_id, opts \\ []) do
GenServer.start_link(__MODULE__, {worker_id, opts}, name: via_tuple(worker_id))
end
def optimize(worker_id, optimization_task) do
GenServer.call(via_tuple(worker_id), {:optimize, optimization_task}, :infinity)
end
def get_status(worker_id) do
GenServer.call(via_tuple(worker_id), :get_status)
end
def init({worker_id, opts}) do
state = %__MODULE__{
worker_id: worker_id,
status: :idle,
current_task: nil,
python_bridge: nil,
optimization_timeout: Keyword.get(opts, :timeout, 300_000) # 5 minutes
}
{:ok, state}
end
def handle_call({:optimize, optimization_task}, _from, state) do
case state.status do
:idle ->
new_state = %{state | status: :optimizing, current_task: optimization_task}
# Start optimization in background
optimization_ref = start_optimization(optimization_task, state.optimization_timeout)
{:reply, {:ok, optimization_ref}, new_state}
_ ->
{:reply, {:error, :worker_busy}, state}
end
end
defp start_optimization(optimization_task, timeout) do
parent = self()
Task.start(fn ->
result = perform_optimization(optimization_task)
send(parent, {:optimization_complete, result})
end)
end
defp perform_optimization(optimization_task) do
try do
# Use Python bridge to perform optimization
case Pipeline.DSPy.PythonBridge.optimize_signature(
optimization_task.signature,
optimization_task.training_data
) do
{:ok, optimized_signature} ->
# Store in cache
Pipeline.DSPy.OptimizationCache.store_optimization(
optimization_task.signature_hash,
optimized_signature
)
# Record metrics
Pipeline.DSPy.Metrics.record_optimization_success(
optimization_task.signature_hash,
optimized_signature.metrics
)
{:ok, optimized_signature}
{:error, reason} ->
Pipeline.DSPy.Metrics.record_optimization_failure(
optimization_task.signature_hash,
reason
)
{:error, reason}
end
catch
:exit, {:timeout, _} ->
{:error, :optimization_timeout}
error ->
{:error, error}
end
end
end
2. Intelligent Threshold Management
Optimization Trigger System
defmodule Pipeline.DSPy.OptimizationTrigger do
@moduledoc """
Determines when to trigger optimization based on various factors.
"""
defstruct [
:execution_count,
:error_rate,
:performance_metrics,
:last_optimization,
:optimization_thresholds
]
def should_optimize?(signature_hash, execution_context) do
case get_optimization_status(signature_hash) do
{:recently_optimized, _} ->
false
{:never_optimized, _} ->
should_optimize_new_signature?(execution_context)
{:optimization_available, last_optimization} ->
should_reoptimize?(signature_hash, last_optimization, execution_context)
end
end
defp should_optimize_new_signature?(execution_context) do
# Optimize new signatures after a minimum number of executions
minimum_executions = get_minimum_executions_threshold()
execution_context.execution_count >= minimum_executions
end
defp should_reoptimize?(signature_hash, last_optimization, execution_context) do
# Consider reoptimization based on multiple factors
time_since_optimization = DateTime.diff(DateTime.utc_now(), last_optimization.timestamp)
# Reoptimize if:
# 1. Significant time has passed
# 2. Error rate has increased
# 3. Performance has degraded
# 4. New training data is available
time_threshold_exceeded?(time_since_optimization) or
error_rate_threshold_exceeded?(signature_hash, execution_context) or
performance_degradation_detected?(signature_hash, execution_context) or
new_training_data_available?(signature_hash)
end
defp time_threshold_exceeded?(time_since_optimization) do
# Reoptimize after 24 hours by default
reoptimization_interval = get_reoptimization_interval()
time_since_optimization > reoptimization_interval
end
defp error_rate_threshold_exceeded?(signature_hash, execution_context) do
recent_error_rate = calculate_recent_error_rate(signature_hash)
baseline_error_rate = get_baseline_error_rate(signature_hash)
recent_error_rate > baseline_error_rate * 1.5 # 50% increase triggers reoptimization
end
defp performance_degradation_detected?(signature_hash, execution_context) do
recent_performance = calculate_recent_performance(signature_hash)
baseline_performance = get_baseline_performance(signature_hash)
recent_performance < baseline_performance * 0.8 # 20% degradation triggers reoptimization
end
end
Adaptive Thresholds
defmodule Pipeline.DSPy.AdaptiveThresholds do
@moduledoc """
Manages adaptive thresholds that adjust based on system performance and usage patterns.
"""
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get_threshold(signature_hash, threshold_type) do
GenServer.call(__MODULE__, {:get_threshold, signature_hash, threshold_type})
end
def update_threshold(signature_hash, threshold_type, new_value) do
GenServer.cast(__MODULE__, {:update_threshold, signature_hash, threshold_type, new_value})
end
def init(opts) do
# Initialize with default thresholds
state = %{
global_thresholds: %{
minimum_executions: 5,
reoptimization_interval: 24 * 60 * 60, # 24 hours
error_rate_multiplier: 1.5,
performance_degradation_threshold: 0.8
},
signature_specific_thresholds: %{},
threshold_adjustment_history: %{}
}
# Start threshold adjustment process
schedule_threshold_adjustment()
{:ok, state}
end
def handle_info(:adjust_thresholds, state) do
# Analyze system performance and adjust thresholds
new_state = analyze_and_adjust_thresholds(state)
schedule_threshold_adjustment()
{:noreply, new_state}
end
defp analyze_and_adjust_thresholds(state) do
# Analyze recent optimization effectiveness
optimization_effectiveness = analyze_optimization_effectiveness()
# Adjust thresholds based on effectiveness
new_thresholds = adjust_thresholds_based_on_effectiveness(
state.global_thresholds,
optimization_effectiveness
)
%{state | global_thresholds: new_thresholds}
end
defp adjust_thresholds_based_on_effectiveness(current_thresholds, effectiveness) do
case effectiveness do
:high ->
# Optimizations are very effective, be more aggressive
%{
current_thresholds |
minimum_executions: max(current_thresholds.minimum_executions - 1, 3),
reoptimization_interval: max(current_thresholds.reoptimization_interval - 3600, 12 * 60 * 60)
}
:medium ->
# Current thresholds are working well, keep them
current_thresholds
:low ->
# Optimizations are not very effective, be more conservative
%{
current_thresholds |
minimum_executions: min(current_thresholds.minimum_executions + 1, 10),
reoptimization_interval: min(current_thresholds.reoptimization_interval + 3600, 48 * 60 * 60)
}
end
end
end
3. Multi-Level Caching Strategy
Hierarchical Cache System
defmodule Pipeline.DSPy.HierarchicalCache do
@moduledoc """
Multi-level caching system for optimization results.
"""
defstruct [
:l1_cache, # In-memory, fastest access
:l2_cache, # Local disk, medium speed
:l3_cache, # Distributed cache, slowest but largest
:cache_stats
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get_optimization(signature_hash) do
GenServer.call(__MODULE__, {:get_optimization, signature_hash})
end
def store_optimization(signature_hash, optimization_result, ttl \\ nil) do
GenServer.cast(__MODULE__, {:store_optimization, signature_hash, optimization_result, ttl})
end
def init(opts) do
state = %__MODULE__{
l1_cache: initialize_l1_cache(opts),
l2_cache: initialize_l2_cache(opts),
l3_cache: initialize_l3_cache(opts),
cache_stats: %{hits: 0, misses: 0, evictions: 0}
}
{:ok, state}
end
def handle_call({:get_optimization, signature_hash}, _from, state) do
case get_from_cache_hierarchy(signature_hash, state) do
{:ok, optimization_result, cache_level} ->
# Promote to higher cache levels if found in lower levels
new_state = promote_to_higher_caches(signature_hash, optimization_result, cache_level, state)
# Update cache stats
stats = %{state.cache_stats | hits: state.cache_stats.hits + 1}
final_state = %{new_state | cache_stats: stats}
{:reply, {:ok, optimization_result}, final_state}
:not_found ->
# Update cache stats
stats = %{state.cache_stats | misses: state.cache_stats.misses + 1}
new_state = %{state | cache_stats: stats}
{:reply, :not_found, new_state}
end
end
defp get_from_cache_hierarchy(signature_hash, state) do
# Try L1 cache first (fastest)
case get_from_l1_cache(signature_hash, state.l1_cache) do
{:ok, result} ->
{:ok, result, :l1}
:not_found ->
# Try L2 cache
case get_from_l2_cache(signature_hash, state.l2_cache) do
{:ok, result} ->
{:ok, result, :l2}
:not_found ->
# Try L3 cache
case get_from_l3_cache(signature_hash, state.l3_cache) do
{:ok, result} ->
{:ok, result, :l3}
:not_found ->
:not_found
end
end
end
end
defp promote_to_higher_caches(signature_hash, optimization_result, found_level, state) do
case found_level do
:l3 ->
# Promote to L2 and L1
new_l2_cache = store_in_l2_cache(signature_hash, optimization_result, state.l2_cache)
new_l1_cache = store_in_l1_cache(signature_hash, optimization_result, state.l1_cache)
%{state | l1_cache: new_l1_cache, l2_cache: new_l2_cache}
:l2 ->
# Promote to L1
new_l1_cache = store_in_l1_cache(signature_hash, optimization_result, state.l1_cache)
%{state | l1_cache: new_l1_cache}
:l1 ->
# Already in highest cache level
state
end
end
end
Predictive Cache Warming
defmodule Pipeline.DSPy.PredictiveCache do
@moduledoc """
Predictively warms caches based on usage patterns.
"""
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def record_signature_usage(signature_hash, context) do
GenServer.cast(__MODULE__, {:record_usage, signature_hash, context})
end
def init(opts) do
state = %{
usage_patterns: %{},
prediction_models: %{},
warming_queue: :queue.new()
}
# Start prediction analysis
schedule_prediction_analysis()
{:ok, state}
end
def handle_info(:analyze_predictions, state) do
# Analyze usage patterns and predict future needs
predictions = analyze_usage_patterns(state.usage_patterns)
# Queue signatures for warming
new_warming_queue = queue_signatures_for_warming(predictions, state.warming_queue)
# Start warming process
process_warming_queue(new_warming_queue)
new_state = %{state | warming_queue: new_warming_queue}
schedule_prediction_analysis()
{:noreply, new_state}
end
defp analyze_usage_patterns(usage_patterns) do
# Use simple heuristics to predict future usage
# Could be enhanced with machine learning models
Enum.reduce(usage_patterns, [], fn {signature_hash, pattern}, acc ->
case predict_future_usage(pattern) do
{:likely, confidence} when confidence > 0.7 ->
[{signature_hash, confidence} | acc]
_ ->
acc
end
end)
end
defp predict_future_usage(pattern) do
# Simple prediction based on usage frequency and recency
recent_usage = count_recent_usage(pattern.usage_history)
usage_frequency = calculate_usage_frequency(pattern.usage_history)
confidence = min(recent_usage * 0.3 + usage_frequency * 0.7, 1.0)
if confidence > 0.5 do
{:likely, confidence}
else
{:unlikely, confidence}
end
end
end
4. Fallback and Timeout Mechanisms
Execution Flow with Fallbacks
defmodule Pipeline.DSPy.ExecutionFlow do
@moduledoc """
Manages execution flow with intelligent fallback mechanisms.
"""
def execute_with_optimization(step, context, options \\ []) do
timeout = Keyword.get(options, :timeout, 5000)
fallback_strategy = Keyword.get(options, :fallback_strategy, :traditional)
case attempt_optimized_execution(step, context, timeout) do
{:ok, result} ->
{:ok, result}
{:error, :timeout} ->
handle_timeout_fallback(step, context, fallback_strategy)
{:error, :optimization_not_available} ->
handle_no_optimization_fallback(step, context, fallback_strategy)
{:error, reason} ->
handle_error_fallback(step, context, fallback_strategy, reason)
end
end
defp attempt_optimized_execution(step, context, timeout) do
signature_hash = generate_signature_hash(step["dspy_signature"])
# Try to get cached optimization first
case Pipeline.DSPy.HierarchicalCache.get_optimization(signature_hash) do
{:ok, optimization_result} ->
execute_with_optimized_signature(step, context, optimization_result)
:not_found ->
# Check if optimization is in progress
case Pipeline.DSPy.AsyncOptimizationManager.get_optimization_status(signature_hash) do
{:in_progress, _} ->
# Wait for a limited time for optimization to complete
wait_for_optimization_with_timeout(signature_hash, timeout)
:not_found ->
# Schedule optimization for future use
schedule_background_optimization(step, context)
{:error, :optimization_not_available}
end
end
end
defp wait_for_optimization_with_timeout(signature_hash, timeout) do
start_time = System.monotonic_time(:millisecond)
case wait_for_optimization(signature_hash, timeout) do
{:ok, optimization_result} ->
elapsed = System.monotonic_time(:millisecond) - start_time
Logger.info("Optimization completed in #{elapsed}ms")
{:ok, optimization_result}
:timeout ->
Logger.warning("Optimization timeout after #{timeout}ms")
{:error, :timeout}
end
end
defp handle_timeout_fallback(step, context, fallback_strategy) do
Logger.info("Falling back to traditional execution due to optimization timeout")
case fallback_strategy do
:traditional ->
execute_traditional_step(step, context)
:cached_best_effort ->
execute_with_best_available_optimization(step, context)
:error ->
{:error, :optimization_timeout}
end
end
defp handle_no_optimization_fallback(step, context, fallback_strategy) do
Logger.info("No optimization available, using fallback strategy: #{fallback_strategy}")
case fallback_strategy do
:traditional ->
execute_traditional_step(step, context)
:schedule_and_execute ->
# Schedule optimization for future use and execute traditionally
schedule_background_optimization(step, context)
execute_traditional_step(step, context)
:wait_and_retry ->
# Wait briefly and retry once
:timer.sleep(100)
attempt_optimized_execution(step, context, 1000)
end
end
defp execute_traditional_step(step, context) do
# Remove DSPy-specific options and execute with traditional provider
traditional_step = Map.drop(step, ["dspy_signature", "dspy_config"])
Pipeline.Executor.execute_step(traditional_step, context)
end
end
Timeout Management
defmodule Pipeline.DSPy.TimeoutManager do
@moduledoc """
Manages timeouts for various DSPy operations.
"""
defstruct [
:operation_timeouts,
:adaptive_timeouts,
:timeout_history
]
def get_timeout(operation_type, signature_hash \\ nil) do
case signature_hash do
nil ->
get_default_timeout(operation_type)
hash ->
get_adaptive_timeout(operation_type, hash)
end
end
def record_operation_time(operation_type, signature_hash, duration, success) do
# Record operation time for adaptive timeout calculation
GenServer.cast(__MODULE__, {:record_operation_time, operation_type, signature_hash, duration, success})
end
defp get_default_timeout(operation_type) do
case operation_type do
:optimization -> 300_000 # 5 minutes
:execution -> 30_000 # 30 seconds
:cache_lookup -> 1_000 # 1 second
:python_bridge -> 60_000 # 1 minute
end
end
defp get_adaptive_timeout(operation_type, signature_hash) do
# Calculate adaptive timeout based on historical performance
historical_times = get_historical_operation_times(operation_type, signature_hash)
case historical_times do
[] ->
get_default_timeout(operation_type)
times ->
# Use 95th percentile of historical times
percentile_95 = calculate_percentile(times, 0.95)
# Cap at 2x default timeout
min(percentile_95 * 1.2, get_default_timeout(operation_type) * 2)
end
end
end
5. Performance Monitoring and Metrics
Real-Time Performance Monitoring
defmodule Pipeline.DSPy.PerformanceMonitor do
@moduledoc """
Monitors real-time performance of DSPy operations.
"""
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def record_execution_metrics(signature_hash, metrics) do
GenServer.cast(__MODULE__, {:record_execution_metrics, signature_hash, metrics})
end
def get_performance_summary(signature_hash) do
GenServer.call(__MODULE__, {:get_performance_summary, signature_hash})
end
def init(opts) do
state = %{
execution_metrics: %{},
performance_alerts: %{},
monitoring_config: load_monitoring_config(opts)
}
# Start performance analysis
schedule_performance_analysis()
{:ok, state}
end
def handle_cast({:record_execution_metrics, signature_hash, metrics}, state) do
# Update metrics for signature
updated_metrics = update_signature_metrics(state.execution_metrics, signature_hash, metrics)
# Check for performance alerts
new_alerts = check_performance_alerts(signature_hash, metrics, state.performance_alerts)
new_state = %{
state |
execution_metrics: updated_metrics,
performance_alerts: new_alerts
}
{:noreply, new_state}
end
def handle_info(:analyze_performance, state) do
# Analyze overall system performance
performance_analysis = analyze_system_performance(state.execution_metrics)
# Generate performance report
generate_performance_report(performance_analysis)
# Update optimization recommendations
update_optimization_recommendations(performance_analysis)
schedule_performance_analysis()
{:noreply, state}
end
defp check_performance_alerts(signature_hash, metrics, current_alerts) do
alerts = []
# Check for slow optimization
if metrics.optimization_time > 60_000 do # 1 minute
alerts = [{:slow_optimization, signature_hash, metrics.optimization_time} | alerts]
end
# Check for high error rate
if metrics.error_rate > 0.1 do # 10% error rate
alerts = [{:high_error_rate, signature_hash, metrics.error_rate} | alerts]
end
# Check for cache miss rate
if metrics.cache_miss_rate > 0.5 do # 50% cache miss rate
alerts = [{:high_cache_miss_rate, signature_hash, metrics.cache_miss_rate} | alerts]
end
# Merge with current alerts
merge_alerts(current_alerts, alerts)
end
end
Integration with Existing System
1. Enhanced Executor Integration
defmodule Pipeline.Enhanced.Executor do
def execute_step(step, context) do
case step["type"] do
"dspy_" <> _ ->
# Use real-time optimization framework
Pipeline.DSPy.ExecutionFlow.execute_with_optimization(step, context)
_ ->
# Use traditional execution
execute_traditional_step(step, context)
end
end
end
2. Configuration Integration
workflow:
name: real_time_optimized_pipeline
dspy_config:
real_time_optimization:
enabled: true
max_wait_time: 5000 # 5 seconds
fallback_strategy: "traditional"
cache_warming: true
adaptive_thresholds: true
optimization_triggers:
minimum_executions: 5
error_rate_threshold: 0.15
performance_degradation_threshold: 0.2
reoptimization_interval: 86400 # 24 hours
steps:
- name: analyze_with_optimization
type: dspy_claude
real_time_config:
max_optimization_wait: 3000
fallback_strategy: "traditional"
cache_priority: "high"
Testing and Validation
1. Performance Tests
defmodule Pipeline.DSPy.PerformanceTest do
use ExUnit.Case
test "optimization does not block execution beyond timeout" do
step = create_dspy_step()
context = create_test_context()
{time, result} = :timer.tc(fn ->
Pipeline.DSPy.ExecutionFlow.execute_with_optimization(step, context, timeout: 5000)
end)
# Should complete within timeout plus small buffer
assert time < 6000 # 6 seconds
assert {:ok, _} = result
end
test "fallback to traditional execution works correctly" do
step = create_dspy_step()
context = create_test_context()
# Simulate optimization timeout
{:ok, result} = Pipeline.DSPy.ExecutionFlow.execute_with_optimization(
step,
context,
timeout: 1,
fallback_strategy: :traditional
)
assert result["success"] == true
assert result["execution_mode"] == "traditional"
end
end
2. Load Tests
defmodule Pipeline.DSPy.LoadTest do
use ExUnit.Case
test "system handles concurrent optimization requests" do
steps = create_multiple_dspy_steps(10)
tasks = Enum.map(steps, fn step ->
Task.async(fn ->
Pipeline.DSPy.ExecutionFlow.execute_with_optimization(step, create_test_context())
end)
end)
results = Task.await_many(tasks, 30_000)
# All should complete successfully
Enum.each(results, fn result ->
assert {:ok, _} = result
end)
end
end
This real-time optimization framework provides a comprehensive solution for managing DSPy optimization without compromising user experience, ensuring that the system remains responsive while still providing the benefits of intelligent optimization.