Stage 1 Prompt 2: Python Bridge Communication Layer
OBJECTIVE
Implement a robust GenServer-based Python bridge that enables bidirectional communication between Elixir and Python DSPy processes. This bridge must handle process lifecycle management, request/response correlation, error handling, and provide a clean interface for executing DSPy operations from Elixir.
COMPLETE IMPLEMENTATION CONTEXT
PYTHON BRIDGE ARCHITECTURE OVERVIEW
From STAGE_1_FOUNDATION_IMPLEMENTATION.md, the Python bridge consists of:
lib/dspex/python_bridge/
├── bridge.ex # GenServer for Python communication
└── protocol.ex # Wire protocol
priv/python/
└── dspy_bridge.py # Python bridge script
Core Requirements:
- GenServer managing Python subprocess lifecycle
- Packet-based binary communication with length headers
- JSON protocol for structured data exchange
- Request/response correlation with unique IDs
- Error handling and process recovery
- Timeout management for long-running operations
COMPLETE GENSERVER IMPLEMENTATION REFERENCE
From STAGE_1_FOUNDATION_IMPLEMENTATION.md:
defmodule DSPex.PythonBridge.Bridge do
@moduledoc """
GenServer managing Python DSPy process communication.
"""
use GenServer
require Logger
alias DSPex.PythonBridge.Protocol
defstruct [:port, :requests, :request_id]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def call(command, args, timeout \\ 30_000) do
GenServer.call(__MODULE__, {:call, command, args}, timeout)
end
@impl true
def init(_opts) do
python_script = Path.join(:code.priv_dir(:dspex), "python/dspy_bridge.py")
case System.find_executable("python3") do
nil ->
{:stop, "Python 3 not found"}
python_path ->
port = Port.open({:spawn_executable, python_path}, [
{:args, [python_script]},
{:packet, 4},
:binary,
:exit_status
])
{:ok, %__MODULE__{
port: port,
requests: %{},
request_id: 0
}}
end
end
@impl true
def handle_call({:call, command, args}, from, state) do
request_id = state.request_id + 1
request = Protocol.encode_request(request_id, command, args)
# Send to Python
send(state.port, {self(), {:command, request}})
# Store request
new_requests = Map.put(state.requests, request_id, from)
{:noreply, %{state | requests: new_requests, request_id: request_id}}
end
@impl true
def handle_info({port, {:data, data}}, %{port: port} = state) do
case Protocol.decode_response(data) do
{:ok, id, result} ->
case Map.pop(state.requests, id) do
{nil, requests} ->
Logger.warning("Received response for unknown request: #{id}")
{:noreply, %{state | requests: requests}}
{from, requests} ->
GenServer.reply(from, {:ok, result})
{:noreply, %{state | requests: requests}}
end
{:error, id, error} ->
case Map.pop(state.requests, id) do
{nil, requests} ->
Logger.warning("Received error for unknown request: #{id}")
{:noreply, %{state | requests: requests}}
{from, requests} ->
GenServer.reply(from, {:error, error})
{:noreply, %{state | requests: requests}}
end
{:error, reason} ->
Logger.error("Failed to decode Python response: #{inspect(reason)}")
{:noreply, state}
end
end
@impl true
def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
Logger.error("Python process exited with status: #{status}")
{:stop, :python_process_died, state}
end
end
WIRE PROTOCOL IMPLEMENTATION
From STAGE_1_FOUNDATION_IMPLEMENTATION.md:
defmodule DSPex.PythonBridge.Protocol do
@moduledoc """
Wire protocol for Python bridge communication.
"""
def encode_request(id, command, args) do
request = %{
id: id,
command: to_string(command),
args: args,
timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
}
Jason.encode!(request)
end
def decode_response(data) when is_binary(data) do
case Jason.decode(data) do
{:ok, %{"id" => id, "success" => true, "result" => result}} ->
{:ok, id, result}
{:ok, %{"id" => id, "success" => false, "error" => error}} ->
{:error, id, error}
{:error, reason} ->
{:error, reason}
end
end
end
COMPLETE PYTHON BRIDGE SCRIPT IMPLEMENTATION
From STAGE_1_FOUNDATION_IMPLEMENTATION.md:
#!/usr/bin/env python3
import sys
import json
import struct
import traceback
import dspy
class DSPyBridge:
def __init__(self):
self.programs = {}
def handle_command(self, command, args):
handlers = {
'create_program': self.create_program,
'execute_program': self.execute_program,
'list_programs': self.list_programs
}
if command not in handlers:
raise ValueError(f"Unknown command: {command}")
return handlers[command](args)
def create_program(self, args):
program_id = args['id']
signature_def = args['signature']
# Create dynamic signature class
class DynamicSignature(dspy.Signature):
pass
# Add input fields
for field in signature_def['inputs']:
setattr(DynamicSignature, field['name'], dspy.InputField())
# Add output fields
for field in signature_def['outputs']:
setattr(DynamicSignature, field['name'], dspy.OutputField())
# Create simple predict program
program = dspy.Predict(DynamicSignature)
self.programs[program_id] = program
return {"program_id": program_id, "status": "created"}
def execute_program(self, args):
program_id = args['program_id']
inputs = args['inputs']
if program_id not in self.programs:
raise ValueError(f"Program not found: {program_id}")
program = self.programs[program_id]
result = program(**inputs)
# Convert result to dict
if hasattr(result, '__dict__'):
output = {k: v for k, v in result.__dict__.items()
if not k.startswith('_')}
else:
output = {"result": str(result)}
return output
def list_programs(self, args):
return {"programs": list(self.programs.keys())}
def read_message():
# Read 4-byte length header
length_bytes = sys.stdin.buffer.read(4)
if len(length_bytes) < 4:
return None
length = struct.unpack('>I', length_bytes)[0]
# Read message
message_bytes = sys.stdin.buffer.read(length)
if len(message_bytes) < length:
return None
return json.loads(message_bytes.decode('utf-8'))
def write_message(message):
message_bytes = json.dumps(message).encode('utf-8')
length = len(message_bytes)
# Write length header + message
sys.stdout.buffer.write(struct.pack('>I', length))
sys.stdout.buffer.write(message_bytes)
sys.stdout.buffer.flush()
def main():
bridge = DSPyBridge()
while True:
try:
message = read_message()
if message is None:
break
request_id = message.get('id')
command = message.get('command')
args = message.get('args', {})
try:
result = bridge.handle_command(command, args)
write_message({
'id': request_id,
'success': True,
'result': result
})
except Exception as e:
write_message({
'id': request_id,
'success': False,
'error': str(e)
})
except Exception as e:
sys.stderr.write(f"Bridge error: {e}\n")
sys.stderr.write(traceback.format_exc())
if __name__ == '__main__':
main()
ELIXIR PORT COMMUNICATION PATTERNS
From Elixir documentation and best practices:
Port Configuration:
port = Port.open({:spawn_executable, python_path}, [
{:args, [python_script]},
{:packet, 4}, # 4-byte length headers
:binary, # Binary data mode
:exit_status # Monitor process exit
])
Packet Protocol Details:
- 4-byte big-endian length header
- JSON payload following header
- Bidirectional communication
- Length-prefixed for proper framing
Message Sending:
send(port, {self(), {:command, encoded_request}})
Message Receiving:
def handle_info({port, {:data, data}}, state) do
# Process incoming data
end
def handle_info({port, {:exit_status, status}}, state) do
# Handle process termination
end
ERROR HANDLING AND RECOVERY PATTERNS
Process Lifecycle Management:
defmodule DSPex.PythonBridge.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
children = [
{DSPex.PythonBridge.Bridge, []},
# Add process monitor
{DSPex.PythonBridge.Monitor, []}
]
Supervisor.init(children, strategy: :one_for_one, max_restarts: 3, max_seconds: 60)
end
end
Error Recovery Strategies:
- Automatic process restart on failure
- Request timeout handling
- Graceful degradation on communication errors
- Health check mechanisms
Timeout Management:
def call(command, args, timeout \\ 30_000) do
GenServer.call(__MODULE__, {:call, command, args}, timeout)
catch
:exit, {:timeout, _} ->
Logger.warning("Python bridge call timed out: #{command}")
{:error, :timeout}
end
INTEGRATION WITH APPLICATION SUPERVISION TREE
From STAGE_1_FOUNDATION_IMPLEMENTATION.md:
defmodule DSPex.Application do
use Application
def start(_type, _args) do
children = [
# Start Python bridge
DSPex.PythonBridge.Bridge,
# Start Ash resources if using Postgres
{AshPostgres.Repo, Application.get_env(:dspex, DSPex.Repo)}
]
opts = [strategy: :one_for_one, name: DSPex.Supervisor]
Supervisor.start_link(children, opts)
end
end
CONFIGURATION AND ENVIRONMENT SETUP
Application Configuration:
# config/config.exs
import Config
config :dspex, :python_bridge,
python_executable: System.get_env("PYTHON_EXECUTABLE", "python3"),
script_path: "python/dspy_bridge.py",
default_timeout: 30_000,
max_retries: 3
config :dspex, :python_environment,
virtual_env: System.get_env("DSPY_VENV"),
required_packages: ["dspy-ai", "openai", "numpy"]
Runtime Environment Checks:
defmodule DSPex.PythonBridge.EnvironmentCheck do
@moduledoc """
Validate Python environment before starting bridge.
"""
def validate_environment do
with {:ok, python_path} <- find_python_executable(),
{:ok, _} <- check_dspy_installation(python_path),
{:ok, script_path} <- validate_bridge_script() do
{:ok, %{python_path: python_path, script_path: script_path}}
else
{:error, reason} -> {:error, reason}
end
end
defp find_python_executable do
python_cmd = Application.get_env(:dspex, :python_bridge)[:python_executable]
case System.find_executable(python_cmd) do
nil -> {:error, "Python executable not found: #{python_cmd}"}
path -> {:ok, path}
end
end
defp check_dspy_installation(python_path) do
case System.cmd(python_path, ["-c", "import dspy; print(dspy.__version__)"]) do
{version, 0} ->
Logger.info("DSPy version: #{String.trim(version)}")
{:ok, version}
{error, _} ->
{:error, "DSPy not installed or not accessible: #{error}"}
end
end
defp validate_bridge_script do
script_path = Path.join(:code.priv_dir(:dspex), "python/dspy_bridge.py")
if File.exists?(script_path) do
{:ok, script_path}
else
{:error, "Python bridge script not found: #{script_path}"}
end
end
end
HEALTH MONITORING AND METRICS
Bridge Health Monitoring:
defmodule DSPex.PythonBridge.Monitor do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
schedule_health_check()
{:ok, %{last_check: DateTime.utc_now(), failures: 0}}
end
@impl true
def handle_info(:health_check, state) do
case perform_health_check() do
:ok ->
schedule_health_check()
{:noreply, %{state | last_check: DateTime.utc_now(), failures: 0}}
{:error, reason} ->
new_failures = state.failures + 1
Logger.warning("Python bridge health check failed: #{reason} (#{new_failures})")
if new_failures >= 3 do
Logger.error("Python bridge unhealthy, restarting...")
DSPex.PythonBridge.Bridge.restart()
end
schedule_health_check()
{:noreply, %{state | failures: new_failures}}
end
end
defp perform_health_check do
case DSPex.PythonBridge.Bridge.call(:ping, %{}, 5_000) do
{:ok, %{"status" => "ok"}} -> :ok
{:ok, response} -> {:error, "unexpected response: #{inspect(response)}"}
{:error, reason} -> {:error, reason}
end
end
defp schedule_health_check do
Process.send_after(self(), :health_check, 30_000) # 30 seconds
end
end
ENHANCED PYTHON BRIDGE WITH HEALTH CHECKS
Extended Python Script Features:
def handle_command(self, command, args):
handlers = {
'create_program': self.create_program,
'execute_program': self.execute_program,
'list_programs': self.list_programs,
'ping': self.ping,
'get_stats': self.get_stats,
'cleanup': self.cleanup
}
if command not in handlers:
raise ValueError(f"Unknown command: {command}")
return handlers[command](args)
def ping(self, args):
return {"status": "ok", "timestamp": time.time()}
def get_stats(self, args):
return {
"programs_count": len(self.programs),
"memory_usage": self.get_memory_usage(),
"uptime": time.time() - self.start_time
}
def cleanup(self, args):
# Clean up resources
self.programs.clear()
return {"status": "cleaned"}
def get_memory_usage(self):
import psutil
process = psutil.Process()
return {
"rss": process.memory_info().rss,
"vms": process.memory_info().vms
}
COMPREHENSIVE TESTING PATTERNS
Bridge Communication Tests:
defmodule DSPex.PythonBridge.BridgeTest do
use ExUnit.Case
setup do
# Ensure bridge is running
{:ok, _} = DSPex.PythonBridge.Bridge.start_link()
:ok
end
test "basic ping communication" do
{:ok, response} = DSPex.PythonBridge.Bridge.call(:ping, %{})
assert response["status"] == "ok"
assert is_number(response["timestamp"])
end
test "program creation and execution" do
# Create program
{:ok, create_response} = DSPex.PythonBridge.Bridge.call(:create_program, %{
id: "test_program",
signature: %{
inputs: [%{name: "question", type: "str"}],
outputs: [%{name: "answer", type: "str"}]
}
})
assert create_response["program_id"] == "test_program"
assert create_response["status"] == "created"
# Execute program
{:ok, exec_response} = DSPex.PythonBridge.Bridge.call(:execute_program, %{
program_id: "test_program",
inputs: %{question: "What is 2+2?"}
})
assert Map.has_key?(exec_response, "answer")
end
test "error handling for unknown program" do
{:error, error_msg} = DSPex.PythonBridge.Bridge.call(:execute_program, %{
program_id: "nonexistent",
inputs: %{question: "test"}
})
assert error_msg =~ "Program not found"
end
test "timeout handling" do
# Test with very short timeout
result = DSPex.PythonBridge.Bridge.call(:ping, %{}, 1)
case result do
{:ok, _} -> :ok # Fast response
{:error, :timeout} -> :ok # Expected timeout
end
end
test "concurrent requests" do
tasks = for i <- 1..10 do
Task.async(fn ->
DSPex.PythonBridge.Bridge.call(:ping, %{request_id: i})
end)
end
results = Task.await_many(tasks, 5000)
assert length(results) == 10
assert Enum.all?(results, fn {:ok, response} ->
response["status"] == "ok"
end)
end
end
PERFORMANCE OPTIMIZATION
Connection Pooling:
defmodule DSPex.PythonBridge.Pool do
@moduledoc """
Connection pool for Python bridge instances.
"""
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def checkout do
GenServer.call(__MODULE__, :checkout)
end
def checkin(bridge_pid) do
GenServer.cast(__MODULE__, {:checkin, bridge_pid})
end
@impl true
def init(opts) do
pool_size = Keyword.get(opts, :pool_size, 3)
bridges = for _ <- 1..pool_size do
{:ok, pid} = DSPex.PythonBridge.Bridge.start_link()
pid
end
{:ok, %{available: bridges, checked_out: []}}
end
@impl true
def handle_call(:checkout, _from, %{available: []} = state) do
{:reply, {:error, :pool_exhausted}, state}
end
def handle_call(:checkout, _from, %{available: [bridge | rest]} = state) do
new_state = %{
state |
available: rest,
checked_out: [bridge | state.checked_out]
}
{:reply, {:ok, bridge}, new_state}
end
@impl true
def handle_cast({:checkin, bridge}, state) do
new_state = %{
state |
available: [bridge | state.available],
checked_out: List.delete(state.checked_out, bridge)
}
{:noreply, new_state}
end
end
IMPLEMENTATION TASK
Based on the complete context above, implement the Python bridge communication layer with the following specific requirements:
FILE STRUCTURE TO CREATE:
lib/dspex/python_bridge/
├── bridge.ex # Main GenServer implementation
├── protocol.ex # Wire protocol handling
├── monitor.ex # Health monitoring
├── environment_check.ex # Environment validation
└── supervisor.ex # Bridge supervision
priv/python/
└── dspy_bridge.py # Python bridge script
test/dspex/python_bridge/
├── bridge_test.exs # Bridge communication tests
├── protocol_test.exs # Protocol encoding/decoding tests
└── integration_test.exs # End-to-end integration tests
SPECIFIC IMPLEMENTATION REQUIREMENTS:
Bridge GenServer (
lib/dspex/python_bridge/bridge.ex
):- Complete GenServer implementation with proper state management
- Python subprocess lifecycle management
- Request/response correlation with unique IDs
- Timeout handling and error recovery
- Graceful shutdown and cleanup
Wire Protocol (
lib/dspex/python_bridge/protocol.ex
):- JSON encoding/decoding for requests and responses
- Packet framing with 4-byte length headers
- Error message standardization
- Request ID management and correlation
Health Monitoring (
lib/dspex/python_bridge/monitor.ex
):- Periodic health checks with ping operations
- Failure tracking and automatic restart triggers
- Performance metrics collection
- Bridge availability monitoring
Environment Validation (
lib/dspex/python_bridge/environment_check.ex
):- Python executable detection and validation
- DSPy package installation verification
- Script file existence and permissions check
- Environment configuration validation
Python Bridge Script (
priv/python/dspy_bridge.py
):- Complete Python implementation with DSPy integration
- Command handler architecture
- Dynamic signature creation from Elixir definitions
- Program lifecycle management
- Error handling and logging
QUALITY REQUIREMENTS:
- Reliability: Handle process failures gracefully with automatic recovery
- Performance: Efficient request/response handling with minimal latency
- Monitoring: Comprehensive health checks and metrics collection
- Error Handling: Clear error messages and proper error propagation
- Testing: Complete test coverage for all communication scenarios
- Documentation: Detailed documentation for all public APIs
- Configuration: Flexible configuration for different environments
INTEGRATION POINTS:
- Must integrate with application supervision tree
- Should support configuration through application environment
- Must provide clean API for adapter layer consumption
- Should support metrics collection and monitoring
- Must handle concurrent requests efficiently
SUCCESS CRITERIA:
- Python subprocess starts reliably and maintains communication
- Request/response correlation works correctly under load
- Error handling provides meaningful feedback
- Health monitoring detects and recovers from failures
- Environment validation catches configuration issues early
- All test scenarios pass with high reliability
- Performance meets requirements for ML workloads
- Integration with supervision tree works correctly
This Python bridge forms the critical communication layer between Elixir and Python DSPy processes, enabling the entire DSPy-Ash integration to function reliably.