Python Bridge Architecture
Overview
This document addresses the critical gap identified in our DSPy implementation plan: the complexity of integrating Python-based DSPy optimization with the Elixir pipeline_ex system. The Python bridge is essential for accessing DSPy’s optimization engines while maintaining the performance and reliability of the Elixir system.
Problem Statement
The original DSPy analysis highlighted that Python integration would be complex, but our current implementation plans underestimate this complexity. Specific challenges include:
- Process Management: Managing Python processes from Elixir
- Data Serialization: Efficient data exchange between languages
- Error Propagation: Handling Python errors in Elixir context
- Performance Optimization: Minimizing cross-language call overhead
- Resource Management: Memory and process lifecycle management
Architecture Design
1. Python Process Management
Process Architecture
defmodule Pipeline.DSPy.PythonBridge do
use GenServer
defstruct [
:python_port,
:python_pid,
:process_status,
:request_queue,
:response_cache,
:health_check_timer
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
python_cmd = build_python_command(opts)
case start_python_process(python_cmd) do
{:ok, port, pid} ->
state = %__MODULE__{
python_port: port,
python_pid: pid,
process_status: :healthy,
request_queue: :queue.new(),
response_cache: %{},
health_check_timer: schedule_health_check()
}
{:ok, state}
{:error, reason} ->
{:stop, reason}
end
end
end
Process Lifecycle Management
defmodule Pipeline.DSPy.ProcessManager do
@moduledoc """
Manages Python process lifecycle with health monitoring and recovery.
"""
def start_python_process(opts \\ []) do
python_script = Path.join([:code.priv_dir(:pipeline), "python", "dspy_bridge.py"])
port_opts = [
:binary,
:exit_status,
{:packet, 4},
{:args, [python_script]},
{:env, build_python_env(opts)}
]
port = Port.open({:spawn_executable, python_executable()}, port_opts)
# Wait for initialization confirmation
case wait_for_initialization(port) do
:ok ->
{:ok, port, get_port_pid(port)}
{:error, reason} ->
Port.close(port)
{:error, reason}
end
end
def monitor_process_health(port) do
case send_health_check(port) do
{:ok, :healthy} ->
:ok
{:error, reason} ->
Logger.error("Python process health check failed: #{reason}")
restart_process(port)
end
end
defp restart_process(failed_port) do
Logger.info("Restarting Python process")
Port.close(failed_port)
case start_python_process() do
{:ok, new_port, new_pid} ->
notify_process_restart(new_port, new_pid)
{:ok, new_port, new_pid}
{:error, reason} ->
Logger.error("Failed to restart Python process: #{reason}")
{:error, reason}
end
end
end
2. Data Serialization Protocol
Message Format
defmodule Pipeline.DSPy.MessageProtocol do
@moduledoc """
Defines the message protocol for Elixir-Python communication.
"""
defstruct [
:message_id,
:request_type,
:payload,
:timestamp,
:metadata
]
@message_types [
:optimize_signature,
:evaluate_pipeline,
:train_model,
:health_check,
:shutdown
]
def encode_message(message) do
message
|> Map.from_struct()
|> Jason.encode!()
end
def decode_message(encoded_message) do
case Jason.decode(encoded_message) do
{:ok, decoded} ->
{:ok, struct(__MODULE__, decoded)}
{:error, reason} ->
{:error, "Message decode failed: #{reason}"}
end
end
def create_optimization_request(signature, training_data, config) do
%__MODULE__{
message_id: generate_message_id(),
request_type: :optimize_signature,
payload: %{
signature: signature,
training_data: training_data,
config: config
},
timestamp: DateTime.utc_now(),
metadata: %{
source: "pipeline_ex",
version: "1.0.0"
}
}
end
end
Efficient Data Transfer
defmodule Pipeline.DSPy.DataTransfer do
@moduledoc """
Optimized data transfer between Elixir and Python.
"""
def transfer_large_dataset(data, port) do
# Compress data before transfer
compressed_data = compress_data(data)
# Send data in chunks to avoid memory issues
chunk_size = 1024 * 1024 # 1MB chunks
compressed_data
|> Stream.chunk_every(chunk_size)
|> Stream.with_index()
|> Enum.map(fn {chunk, index} ->
send_data_chunk(port, chunk, index)
end)
|> handle_transfer_results()
end
defp compress_data(data) do
data
|> Jason.encode!()
|> :zlib.compress()
end
defp send_data_chunk(port, chunk, index) do
message = %{
type: :data_chunk,
chunk_index: index,
data: Base.encode64(chunk),
is_final: false
}
send_message(port, message)
end
end
3. Error Handling and Propagation
Error Classification
defmodule Pipeline.DSPy.ErrorHandler do
@moduledoc """
Handles and classifies errors from Python bridge.
"""
defmodule PythonError do
defstruct [
:error_type,
:message,
:traceback,
:context,
:recovery_strategy
]
end
@error_types [
:dspy_optimization_error,
:python_runtime_error,
:serialization_error,
:timeout_error,
:process_crashed,
:import_error
]
def handle_python_error(error_data) do
error = classify_error(error_data)
case error.recovery_strategy do
:retry ->
{:retry, error}
:fallback ->
{:fallback, error}
:restart_process ->
{:restart_process, error}
:fatal ->
{:fatal, error}
end
end
defp classify_error(error_data) do
case error_data["error_type"] do
"DSPyOptimizationError" ->
%PythonError{
error_type: :dspy_optimization_error,
message: error_data["message"],
traceback: error_data["traceback"],
context: error_data["context"],
recovery_strategy: :fallback
}
"ImportError" ->
%PythonError{
error_type: :import_error,
message: error_data["message"],
traceback: error_data["traceback"],
context: error_data["context"],
recovery_strategy: :fatal
}
_ ->
%PythonError{
error_type: :python_runtime_error,
message: error_data["message"],
traceback: error_data["traceback"],
context: error_data["context"],
recovery_strategy: :restart_process
}
end
end
end
Recovery Mechanisms
defmodule Pipeline.DSPy.RecoveryManager do
@moduledoc """
Manages recovery strategies for Python bridge failures.
"""
def execute_recovery_strategy(strategy, error, context) do
case strategy do
:retry ->
retry_operation(context)
:fallback ->
fallback_to_traditional_execution(context)
:restart_process ->
restart_python_process(context)
:fatal ->
handle_fatal_error(error, context)
end
end
defp retry_operation(context) do
case context.retry_count do
count when count < 3 ->
Logger.warning("Retrying Python operation, attempt #{count + 1}")
new_context = %{context | retry_count: count + 1}
execute_python_operation(new_context)
_ ->
Logger.error("Max retries exceeded, falling back to traditional execution")
fallback_to_traditional_execution(context)
end
end
defp fallback_to_traditional_execution(context) do
Logger.info("Falling back to traditional execution")
# Execute step using traditional pipeline execution
Pipeline.Executor.execute_step(context.step, context.pipeline_context)
end
end
4. Performance Optimization
Caching Strategy
defmodule Pipeline.DSPy.OptimizationCache do
@moduledoc """
Caches optimization results to minimize Python calls.
"""
use GenServer
defstruct [
:cache_table,
:cache_size,
:max_cache_size,
:cache_ttl,
:cleanup_timer
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get_optimization(signature_hash) do
GenServer.call(__MODULE__, {:get_optimization, signature_hash})
end
def store_optimization(signature_hash, optimization_result) do
GenServer.cast(__MODULE__, {:store_optimization, signature_hash, optimization_result})
end
def init(opts) do
cache_table = :ets.new(:dspy_optimization_cache, [:set, :private])
state = %__MODULE__{
cache_table: cache_table,
cache_size: 0,
max_cache_size: Keyword.get(opts, :max_cache_size, 1000),
cache_ttl: Keyword.get(opts, :cache_ttl, 3600), # 1 hour
cleanup_timer: schedule_cleanup()
}
{:ok, state}
end
def handle_call({:get_optimization, signature_hash}, _from, state) do
case :ets.lookup(state.cache_table, signature_hash) do
[{^signature_hash, optimization_result, timestamp}] ->
if cache_entry_valid?(timestamp, state.cache_ttl) do
{:reply, {:ok, optimization_result}, state}
else
:ets.delete(state.cache_table, signature_hash)
{:reply, :not_found, %{state | cache_size: state.cache_size - 1}}
end
[] ->
{:reply, :not_found, state}
end
end
end
Connection Pooling
defmodule Pipeline.DSPy.ConnectionPool do
@moduledoc """
Manages a pool of Python process connections for better performance.
"""
use GenServer
defstruct [
:pool_size,
:available_connections,
:busy_connections,
:connection_queue,
:max_queue_size
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get_connection(timeout \\ 5000) do
GenServer.call(__MODULE__, :get_connection, timeout)
end
def return_connection(connection) do
GenServer.cast(__MODULE__, {:return_connection, connection})
end
def init(opts) do
pool_size = Keyword.get(opts, :pool_size, 3)
# Start initial pool of Python processes
connections = start_initial_connections(pool_size)
state = %__MODULE__{
pool_size: pool_size,
available_connections: connections,
busy_connections: %{},
connection_queue: :queue.new(),
max_queue_size: Keyword.get(opts, :max_queue_size, 10)
}
{:ok, state}
end
def handle_call(:get_connection, from, state) do
case state.available_connections do
[connection | rest] ->
new_state = %{
state |
available_connections: rest,
busy_connections: Map.put(state.busy_connections, connection.id, connection)
}
{:reply, {:ok, connection}, new_state}
[] ->
if :queue.len(state.connection_queue) < state.max_queue_size do
new_queue = :queue.in(from, state.connection_queue)
new_state = %{state | connection_queue: new_queue}
{:noreply, new_state}
else
{:reply, {:error, :pool_exhausted}, state}
end
end
end
end
5. Python Bridge Implementation
Python Side Bridge
# dspy_bridge.py
import sys
import json
import traceback
import dspy
from typing import Dict, Any, List
import zlib
import base64
class ElixirDSPyBridge:
def __init__(self):
self.dspy_models = {}
self.optimizers = {}
self.signatures = {}
def handle_message(self, message_data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle incoming messages from Elixir."""
try:
message_type = message_data.get('request_type')
payload = message_data.get('payload', {})
if message_type == 'optimize_signature':
return self.optimize_signature(payload)
elif message_type == 'evaluate_pipeline':
return self.evaluate_pipeline(payload)
elif message_type == 'train_model':
return self.train_model(payload)
elif message_type == 'health_check':
return {'status': 'healthy', 'message': 'Python bridge is operational'}
else:
return {'error': f'Unknown message type: {message_type}'}
except Exception as e:
return {
'error': 'PythonRuntimeError',
'message': str(e),
'traceback': traceback.format_exc()
}
def optimize_signature(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Optimize a DSPy signature with the given training data."""
try:
signature_config = payload['signature']
training_data = payload['training_data']
config = payload.get('config', {})
# Create DSPy signature
signature = self.create_dspy_signature(signature_config)
# Prepare training data
training_examples = self.prepare_training_data(training_data)
# Choose optimization strategy
optimizer = self.create_optimizer(config)
# Optimize signature
optimized_signature = optimizer.compile(signature, trainset=training_examples)
return {
'optimized_signature': self.serialize_signature(optimized_signature),
'optimization_metrics': self.get_optimization_metrics(optimizer),
'status': 'success'
}
except Exception as e:
return {
'error': 'DSPyOptimizationError',
'message': str(e),
'traceback': traceback.format_exc()
}
def create_dspy_signature(self, signature_config: Dict[str, Any]) -> dspy.Signature:
"""Create a DSPy signature from configuration."""
input_fields = signature_config['input_fields']
output_fields = signature_config['output_fields']
# Build signature string
signature_str = self.build_signature_string(input_fields, output_fields)
# Create and return signature
return dspy.Signature(signature_str)
def create_optimizer(self, config: Dict[str, Any]) -> dspy.teleprompt.Optimizer:
"""Create appropriate DSPy optimizer based on configuration."""
optimizer_type = config.get('optimizer', 'bootstrap_few_shot')
if optimizer_type == 'bootstrap_few_shot':
return dspy.BootstrapFewShot(
max_bootstrapped_demos=config.get('max_demos', 4),
max_labeled_demos=config.get('max_labeled_demos', 16)
)
elif optimizer_type == 'copro':
return dspy.COPRO()
elif optimizer_type == 'mipro':
return dspy.MIPRO()
else:
raise ValueError(f"Unknown optimizer type: {optimizer_type}")
def main():
"""Main message loop for Python bridge."""
bridge = ElixirDSPyBridge()
while True:
try:
# Read message from Elixir
message_length = sys.stdin.buffer.read(4)
if not message_length:
break
length = int.from_bytes(message_length, byteorder='big')
message_data = sys.stdin.buffer.read(length)
# Decode message
message_json = json.loads(message_data.decode('utf-8'))
# Handle message
response = bridge.handle_message(message_json)
# Send response back to Elixir
response_json = json.dumps(response).encode('utf-8')
response_length = len(response_json).to_bytes(4, byteorder='big')
sys.stdout.buffer.write(response_length)
sys.stdout.buffer.write(response_json)
sys.stdout.buffer.flush()
except Exception as e:
error_response = {
'error': 'PythonBridgeError',
'message': str(e),
'traceback': traceback.format_exc()
}
error_json = json.dumps(error_response).encode('utf-8')
error_length = len(error_json).to_bytes(4, byteorder='big')
sys.stdout.buffer.write(error_length)
sys.stdout.buffer.write(error_json)
sys.stdout.buffer.flush()
if __name__ == '__main__':
main()
Integration with Existing System
1. Enhanced Provider Integration
defmodule Pipeline.Providers.DSPyOptimizedProvider do
@moduledoc """
DSPy-optimized provider that uses the Python bridge for optimization.
"""
@behaviour Pipeline.Providers.AIProvider
def query(prompt, options) do
case options["dspy_optimization"] do
true ->
query_with_dspy_optimization(prompt, options)
_ ->
query_traditional(prompt, options)
end
end
defp query_with_dspy_optimization(prompt, options) do
signature = options["dspy_signature"]
case Pipeline.DSPy.PythonBridge.optimize_signature(signature) do
{:ok, optimized_signature} ->
execute_optimized_query(prompt, optimized_signature, options)
{:error, reason} ->
Logger.warning("DSPy optimization failed: #{reason}, falling back")
query_traditional(prompt, options)
end
end
end
2. Configuration Integration
# Enhanced configuration with Python bridge settings
workflow:
name: dspy_enhanced_pipeline
dspy_config:
python_bridge:
pool_size: 3
cache_ttl: 3600
max_retries: 3
timeout: 30000
optimization:
enabled: true
strategy: "bootstrap_few_shot"
max_demos: 4
steps:
- name: analyze_code
type: dspy_claude
dspy_signature:
input_fields: [...]
output_fields: [...]
Testing and Validation
1. Unit Tests
defmodule Pipeline.DSPy.PythonBridgeTest do
use ExUnit.Case
test "python process starts and responds to health check" do
{:ok, bridge} = Pipeline.DSPy.PythonBridge.start_link()
assert {:ok, :healthy} = Pipeline.DSPy.PythonBridge.health_check(bridge)
end
test "optimization request returns valid response" do
signature = create_test_signature()
training_data = create_test_training_data()
{:ok, result} = Pipeline.DSPy.PythonBridge.optimize_signature(signature, training_data)
assert result["status"] == "success"
assert result["optimized_signature"] != nil
end
end
2. Integration Tests
defmodule Pipeline.DSPy.IntegrationTest do
use ExUnit.Case
test "full pipeline execution with DSPy optimization" do
config = create_dspy_pipeline_config()
{:ok, result} = Pipeline.Enhanced.Executor.execute_pipeline(config)
assert result["dspy_step"]["dspy_optimization_applied"] == true
assert result["dspy_step"]["success"] == true
end
end
Performance Monitoring
1. Metrics Collection
defmodule Pipeline.DSPy.Metrics do
def record_python_bridge_metrics(operation, duration, success) do
:telemetry.execute(
[:pipeline, :dspy, :python_bridge],
%{duration: duration, success: success ? 1 : 0},
%{operation: operation}
)
end
def record_optimization_metrics(signature_hash, metrics) do
:telemetry.execute(
[:pipeline, :dspy, :optimization],
%{
accuracy: metrics.accuracy,
cost: metrics.cost,
latency: metrics.latency
},
%{signature_hash: signature_hash}
)
end
end
Deployment Considerations
1. Environment Setup
- Python environment with DSPy and dependencies
- Elixir application with Python bridge modules
- Process monitoring and restart capabilities
- Health check endpoints for monitoring
2. Resource Management
- Memory limits for Python processes
- CPU allocation for optimization tasks
- Network timeouts for external API calls
- Disk space for cache and training data
3. Security
- Input validation for Python bridge messages
- Process isolation and sandboxing
- Resource limits to prevent abuse
- Audit logging for optimization requests
This Python bridge architecture provides a robust foundation for DSPy integration while maintaining the performance and reliability requirements of the pipeline_ex system.