← Back to Analysis

14 python bridge architecture

Documentation for 14_python_bridge_architecture from the Pipeline ex repository.

Python Bridge Architecture

Overview

This document addresses the critical gap identified in our DSPy implementation plan: the complexity of integrating Python-based DSPy optimization with the Elixir pipeline_ex system. The Python bridge is essential for accessing DSPy’s optimization engines while maintaining the performance and reliability of the Elixir system.

Problem Statement

The original DSPy analysis highlighted that Python integration would be complex, but our current implementation plans underestimate this complexity. Specific challenges include:

  1. Process Management: Managing Python processes from Elixir
  2. Data Serialization: Efficient data exchange between languages
  3. Error Propagation: Handling Python errors in Elixir context
  4. Performance Optimization: Minimizing cross-language call overhead
  5. Resource Management: Memory and process lifecycle management

Architecture Design

1. Python Process Management

Process Architecture

defmodule Pipeline.DSPy.PythonBridge do
  use GenServer
  
  defstruct [
    :python_port,
    :python_pid,
    :process_status,
    :request_queue,
    :response_cache,
    :health_check_timer
  ]
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def init(opts) do
    python_cmd = build_python_command(opts)
    
    case start_python_process(python_cmd) do
      {:ok, port, pid} ->
        state = %__MODULE__{
          python_port: port,
          python_pid: pid,
          process_status: :healthy,
          request_queue: :queue.new(),
          response_cache: %{},
          health_check_timer: schedule_health_check()
        }
        
        {:ok, state}
      
      {:error, reason} ->
        {:stop, reason}
    end
  end
end

Process Lifecycle Management

defmodule Pipeline.DSPy.ProcessManager do
  @moduledoc """
  Manages Python process lifecycle with health monitoring and recovery.
  """
  
  def start_python_process(opts \\ []) do
    python_script = Path.join([:code.priv_dir(:pipeline), "python", "dspy_bridge.py"])
    
    port_opts = [
      :binary,
      :exit_status,
      {:packet, 4},
      {:args, [python_script]},
      {:env, build_python_env(opts)}
    ]
    
    port = Port.open({:spawn_executable, python_executable()}, port_opts)
    
    # Wait for initialization confirmation
    case wait_for_initialization(port) do
      :ok ->
        {:ok, port, get_port_pid(port)}
      
      {:error, reason} ->
        Port.close(port)
        {:error, reason}
    end
  end
  
  def monitor_process_health(port) do
    case send_health_check(port) do
      {:ok, :healthy} ->
        :ok
      
      {:error, reason} ->
        Logger.error("Python process health check failed: #{reason}")
        restart_process(port)
    end
  end
  
  defp restart_process(failed_port) do
    Logger.info("Restarting Python process")
    
    Port.close(failed_port)
    
    case start_python_process() do
      {:ok, new_port, new_pid} ->
        notify_process_restart(new_port, new_pid)
        {:ok, new_port, new_pid}
      
      {:error, reason} ->
        Logger.error("Failed to restart Python process: #{reason}")
        {:error, reason}
    end
  end
end

2. Data Serialization Protocol

Message Format

defmodule Pipeline.DSPy.MessageProtocol do
  @moduledoc """
  Defines the message protocol for Elixir-Python communication.
  """
  
  defstruct [
    :message_id,
    :request_type,
    :payload,
    :timestamp,
    :metadata
  ]
  
  @message_types [
    :optimize_signature,
    :evaluate_pipeline,
    :train_model,
    :health_check,
    :shutdown
  ]
  
  def encode_message(message) do
    message
    |> Map.from_struct()
    |> Jason.encode!()
  end
  
  def decode_message(encoded_message) do
    case Jason.decode(encoded_message) do
      {:ok, decoded} ->
        {:ok, struct(__MODULE__, decoded)}
      
      {:error, reason} ->
        {:error, "Message decode failed: #{reason}"}
    end
  end
  
  def create_optimization_request(signature, training_data, config) do
    %__MODULE__{
      message_id: generate_message_id(),
      request_type: :optimize_signature,
      payload: %{
        signature: signature,
        training_data: training_data,
        config: config
      },
      timestamp: DateTime.utc_now(),
      metadata: %{
        source: "pipeline_ex",
        version: "1.0.0"
      }
    }
  end
end

Efficient Data Transfer

defmodule Pipeline.DSPy.DataTransfer do
  @moduledoc """
  Optimized data transfer between Elixir and Python.
  """
  
  def transfer_large_dataset(data, port) do
    # Compress data before transfer
    compressed_data = compress_data(data)
    
    # Send data in chunks to avoid memory issues
    chunk_size = 1024 * 1024  # 1MB chunks
    
    compressed_data
    |> Stream.chunk_every(chunk_size)
    |> Stream.with_index()
    |> Enum.map(fn {chunk, index} ->
      send_data_chunk(port, chunk, index)
    end)
    |> handle_transfer_results()
  end
  
  defp compress_data(data) do
    data
    |> Jason.encode!()
    |> :zlib.compress()
  end
  
  defp send_data_chunk(port, chunk, index) do
    message = %{
      type: :data_chunk,
      chunk_index: index,
      data: Base.encode64(chunk),
      is_final: false
    }
    
    send_message(port, message)
  end
end

3. Error Handling and Propagation

Error Classification

defmodule Pipeline.DSPy.ErrorHandler do
  @moduledoc """
  Handles and classifies errors from Python bridge.
  """
  
  defmodule PythonError do
    defstruct [
      :error_type,
      :message,
      :traceback,
      :context,
      :recovery_strategy
    ]
  end
  
  @error_types [
    :dspy_optimization_error,
    :python_runtime_error,
    :serialization_error,
    :timeout_error,
    :process_crashed,
    :import_error
  ]
  
  def handle_python_error(error_data) do
    error = classify_error(error_data)
    
    case error.recovery_strategy do
      :retry ->
        {:retry, error}
      
      :fallback ->
        {:fallback, error}
      
      :restart_process ->
        {:restart_process, error}
      
      :fatal ->
        {:fatal, error}
    end
  end
  
  defp classify_error(error_data) do
    case error_data["error_type"] do
      "DSPyOptimizationError" ->
        %PythonError{
          error_type: :dspy_optimization_error,
          message: error_data["message"],
          traceback: error_data["traceback"],
          context: error_data["context"],
          recovery_strategy: :fallback
        }
      
      "ImportError" ->
        %PythonError{
          error_type: :import_error,
          message: error_data["message"],
          traceback: error_data["traceback"],
          context: error_data["context"],
          recovery_strategy: :fatal
        }
      
      _ ->
        %PythonError{
          error_type: :python_runtime_error,
          message: error_data["message"],
          traceback: error_data["traceback"],
          context: error_data["context"],
          recovery_strategy: :restart_process
        }
    end
  end
end

Recovery Mechanisms

defmodule Pipeline.DSPy.RecoveryManager do
  @moduledoc """
  Manages recovery strategies for Python bridge failures.
  """
  
  def execute_recovery_strategy(strategy, error, context) do
    case strategy do
      :retry ->
        retry_operation(context)
      
      :fallback ->
        fallback_to_traditional_execution(context)
      
      :restart_process ->
        restart_python_process(context)
      
      :fatal ->
        handle_fatal_error(error, context)
    end
  end
  
  defp retry_operation(context) do
    case context.retry_count do
      count when count < 3 ->
        Logger.warning("Retrying Python operation, attempt #{count + 1}")
        new_context = %{context | retry_count: count + 1}
        execute_python_operation(new_context)
      
      _ ->
        Logger.error("Max retries exceeded, falling back to traditional execution")
        fallback_to_traditional_execution(context)
    end
  end
  
  defp fallback_to_traditional_execution(context) do
    Logger.info("Falling back to traditional execution")
    
    # Execute step using traditional pipeline execution
    Pipeline.Executor.execute_step(context.step, context.pipeline_context)
  end
end

4. Performance Optimization

Caching Strategy

defmodule Pipeline.DSPy.OptimizationCache do
  @moduledoc """
  Caches optimization results to minimize Python calls.
  """
  
  use GenServer
  
  defstruct [
    :cache_table,
    :cache_size,
    :max_cache_size,
    :cache_ttl,
    :cleanup_timer
  ]
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def get_optimization(signature_hash) do
    GenServer.call(__MODULE__, {:get_optimization, signature_hash})
  end
  
  def store_optimization(signature_hash, optimization_result) do
    GenServer.cast(__MODULE__, {:store_optimization, signature_hash, optimization_result})
  end
  
  def init(opts) do
    cache_table = :ets.new(:dspy_optimization_cache, [:set, :private])
    
    state = %__MODULE__{
      cache_table: cache_table,
      cache_size: 0,
      max_cache_size: Keyword.get(opts, :max_cache_size, 1000),
      cache_ttl: Keyword.get(opts, :cache_ttl, 3600),  # 1 hour
      cleanup_timer: schedule_cleanup()
    }
    
    {:ok, state}
  end
  
  def handle_call({:get_optimization, signature_hash}, _from, state) do
    case :ets.lookup(state.cache_table, signature_hash) do
      [{^signature_hash, optimization_result, timestamp}] ->
        if cache_entry_valid?(timestamp, state.cache_ttl) do
          {:reply, {:ok, optimization_result}, state}
        else
          :ets.delete(state.cache_table, signature_hash)
          {:reply, :not_found, %{state | cache_size: state.cache_size - 1}}
        end
      
      [] ->
        {:reply, :not_found, state}
    end
  end
end

Connection Pooling

defmodule Pipeline.DSPy.ConnectionPool do
  @moduledoc """
  Manages a pool of Python process connections for better performance.
  """
  
  use GenServer
  
  defstruct [
    :pool_size,
    :available_connections,
    :busy_connections,
    :connection_queue,
    :max_queue_size
  ]
  
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def get_connection(timeout \\ 5000) do
    GenServer.call(__MODULE__, :get_connection, timeout)
  end
  
  def return_connection(connection) do
    GenServer.cast(__MODULE__, {:return_connection, connection})
  end
  
  def init(opts) do
    pool_size = Keyword.get(opts, :pool_size, 3)
    
    # Start initial pool of Python processes
    connections = start_initial_connections(pool_size)
    
    state = %__MODULE__{
      pool_size: pool_size,
      available_connections: connections,
      busy_connections: %{},
      connection_queue: :queue.new(),
      max_queue_size: Keyword.get(opts, :max_queue_size, 10)
    }
    
    {:ok, state}
  end
  
  def handle_call(:get_connection, from, state) do
    case state.available_connections do
      [connection | rest] ->
        new_state = %{
          state |
          available_connections: rest,
          busy_connections: Map.put(state.busy_connections, connection.id, connection)
        }
        
        {:reply, {:ok, connection}, new_state}
      
      [] ->
        if :queue.len(state.connection_queue) < state.max_queue_size do
          new_queue = :queue.in(from, state.connection_queue)
          new_state = %{state | connection_queue: new_queue}
          {:noreply, new_state}
        else
          {:reply, {:error, :pool_exhausted}, state}
        end
    end
  end
end

5. Python Bridge Implementation

Python Side Bridge

# dspy_bridge.py
import sys
import json
import traceback
import dspy
from typing import Dict, Any, List
import zlib
import base64

class ElixirDSPyBridge:
    def __init__(self):
        self.dspy_models = {}
        self.optimizers = {}
        self.signatures = {}
        
    def handle_message(self, message_data: Dict[str, Any]) -> Dict[str, Any]:
        """Handle incoming messages from Elixir."""
        try:
            message_type = message_data.get('request_type')
            payload = message_data.get('payload', {})
            
            if message_type == 'optimize_signature':
                return self.optimize_signature(payload)
            elif message_type == 'evaluate_pipeline':
                return self.evaluate_pipeline(payload)
            elif message_type == 'train_model':
                return self.train_model(payload)
            elif message_type == 'health_check':
                return {'status': 'healthy', 'message': 'Python bridge is operational'}
            else:
                return {'error': f'Unknown message type: {message_type}'}
                
        except Exception as e:
            return {
                'error': 'PythonRuntimeError',
                'message': str(e),
                'traceback': traceback.format_exc()
            }
    
    def optimize_signature(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Optimize a DSPy signature with the given training data."""
        try:
            signature_config = payload['signature']
            training_data = payload['training_data']
            config = payload.get('config', {})
            
            # Create DSPy signature
            signature = self.create_dspy_signature(signature_config)
            
            # Prepare training data
            training_examples = self.prepare_training_data(training_data)
            
            # Choose optimization strategy
            optimizer = self.create_optimizer(config)
            
            # Optimize signature
            optimized_signature = optimizer.compile(signature, trainset=training_examples)
            
            return {
                'optimized_signature': self.serialize_signature(optimized_signature),
                'optimization_metrics': self.get_optimization_metrics(optimizer),
                'status': 'success'
            }
            
        except Exception as e:
            return {
                'error': 'DSPyOptimizationError',
                'message': str(e),
                'traceback': traceback.format_exc()
            }
    
    def create_dspy_signature(self, signature_config: Dict[str, Any]) -> dspy.Signature:
        """Create a DSPy signature from configuration."""
        input_fields = signature_config['input_fields']
        output_fields = signature_config['output_fields']
        
        # Build signature string
        signature_str = self.build_signature_string(input_fields, output_fields)
        
        # Create and return signature
        return dspy.Signature(signature_str)
    
    def create_optimizer(self, config: Dict[str, Any]) -> dspy.teleprompt.Optimizer:
        """Create appropriate DSPy optimizer based on configuration."""
        optimizer_type = config.get('optimizer', 'bootstrap_few_shot')
        
        if optimizer_type == 'bootstrap_few_shot':
            return dspy.BootstrapFewShot(
                max_bootstrapped_demos=config.get('max_demos', 4),
                max_labeled_demos=config.get('max_labeled_demos', 16)
            )
        elif optimizer_type == 'copro':
            return dspy.COPRO()
        elif optimizer_type == 'mipro':
            return dspy.MIPRO()
        else:
            raise ValueError(f"Unknown optimizer type: {optimizer_type}")

def main():
    """Main message loop for Python bridge."""
    bridge = ElixirDSPyBridge()
    
    while True:
        try:
            # Read message from Elixir
            message_length = sys.stdin.buffer.read(4)
            if not message_length:
                break
                
            length = int.from_bytes(message_length, byteorder='big')
            message_data = sys.stdin.buffer.read(length)
            
            # Decode message
            message_json = json.loads(message_data.decode('utf-8'))
            
            # Handle message
            response = bridge.handle_message(message_json)
            
            # Send response back to Elixir
            response_json = json.dumps(response).encode('utf-8')
            response_length = len(response_json).to_bytes(4, byteorder='big')
            
            sys.stdout.buffer.write(response_length)
            sys.stdout.buffer.write(response_json)
            sys.stdout.buffer.flush()
            
        except Exception as e:
            error_response = {
                'error': 'PythonBridgeError',
                'message': str(e),
                'traceback': traceback.format_exc()
            }
            
            error_json = json.dumps(error_response).encode('utf-8')
            error_length = len(error_json).to_bytes(4, byteorder='big')
            
            sys.stdout.buffer.write(error_length)
            sys.stdout.buffer.write(error_json)
            sys.stdout.buffer.flush()

if __name__ == '__main__':
    main()

Integration with Existing System

1. Enhanced Provider Integration

defmodule Pipeline.Providers.DSPyOptimizedProvider do
  @moduledoc """
  DSPy-optimized provider that uses the Python bridge for optimization.
  """
  
  @behaviour Pipeline.Providers.AIProvider
  
  def query(prompt, options) do
    case options["dspy_optimization"] do
      true ->
        query_with_dspy_optimization(prompt, options)
      
      _ ->
        query_traditional(prompt, options)
    end
  end
  
  defp query_with_dspy_optimization(prompt, options) do
    signature = options["dspy_signature"]
    
    case Pipeline.DSPy.PythonBridge.optimize_signature(signature) do
      {:ok, optimized_signature} ->
        execute_optimized_query(prompt, optimized_signature, options)
      
      {:error, reason} ->
        Logger.warning("DSPy optimization failed: #{reason}, falling back")
        query_traditional(prompt, options)
    end
  end
end

2. Configuration Integration

# Enhanced configuration with Python bridge settings
workflow:
  name: dspy_enhanced_pipeline
  
  dspy_config:
    python_bridge:
      pool_size: 3
      cache_ttl: 3600
      max_retries: 3
      timeout: 30000
    
    optimization:
      enabled: true
      strategy: "bootstrap_few_shot"
      max_demos: 4
      
  steps:
    - name: analyze_code
      type: dspy_claude
      dspy_signature:
        input_fields: [...]
        output_fields: [...]

Testing and Validation

1. Unit Tests

defmodule Pipeline.DSPy.PythonBridgeTest do
  use ExUnit.Case
  
  test "python process starts and responds to health check" do
    {:ok, bridge} = Pipeline.DSPy.PythonBridge.start_link()
    
    assert {:ok, :healthy} = Pipeline.DSPy.PythonBridge.health_check(bridge)
  end
  
  test "optimization request returns valid response" do
    signature = create_test_signature()
    training_data = create_test_training_data()
    
    {:ok, result} = Pipeline.DSPy.PythonBridge.optimize_signature(signature, training_data)
    
    assert result["status"] == "success"
    assert result["optimized_signature"] != nil
  end
end

2. Integration Tests

defmodule Pipeline.DSPy.IntegrationTest do
  use ExUnit.Case
  
  test "full pipeline execution with DSPy optimization" do
    config = create_dspy_pipeline_config()
    
    {:ok, result} = Pipeline.Enhanced.Executor.execute_pipeline(config)
    
    assert result["dspy_step"]["dspy_optimization_applied"] == true
    assert result["dspy_step"]["success"] == true
  end
end

Performance Monitoring

1. Metrics Collection

defmodule Pipeline.DSPy.Metrics do
  def record_python_bridge_metrics(operation, duration, success) do
    :telemetry.execute(
      [:pipeline, :dspy, :python_bridge],
      %{duration: duration, success: success ? 1 : 0},
      %{operation: operation}
    )
  end
  
  def record_optimization_metrics(signature_hash, metrics) do
    :telemetry.execute(
      [:pipeline, :dspy, :optimization],
      %{
        accuracy: metrics.accuracy,
        cost: metrics.cost,
        latency: metrics.latency
      },
      %{signature_hash: signature_hash}
    )
  end
end

Deployment Considerations

1. Environment Setup

  • Python environment with DSPy and dependencies
  • Elixir application with Python bridge modules
  • Process monitoring and restart capabilities
  • Health check endpoints for monitoring

2. Resource Management

  • Memory limits for Python processes
  • CPU allocation for optimization tasks
  • Network timeouts for external API calls
  • Disk space for cache and training data

3. Security

  • Input validation for Python bridge messages
  • Process isolation and sandboxing
  • Resource limits to prevent abuse
  • Audit logging for optimization requests

This Python bridge architecture provides a robust foundation for DSPy integration while maintaining the performance and reliability requirements of the pipeline_ex system.