JULY_1_ARCH_DOCS_02: The Hybrid DSPy-Elixir Architecture
Architecture Overview
ElexirionDSP implements a hybrid architecture that combines the best of two worlds:
- DSPy (Python): Dynamic prompt optimization and learning
- pipeline_ex (Elixir): Robust, concurrent execution with OTP supervision
This separation allows us to leverage DSPy’s advanced optimization capabilities while maintaining production-grade reliability through Elixir’s battle-tested concurrency model.
Core Components
The Three-Layer Architecture
┌─────────────────────────────────────────────────────────────┐
│ Layer 3: Optimization │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ DSPy Optimizer │ │
│ │ • Prompt optimization │ │
│ │ • Performance metrics │ │
│ │ • Learning algorithms │ │
│ │ • A/B testing │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Orchestration │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Agent Framework │ │
│ │ • PipelineAgent.handle_request/1 │ │
│ │ • Web API │ │
│ │ • Chat Interface │ │
│ │ • Job Management │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Execution │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ pipeline_ex Runtime │ │
│ │ • Pipeline.Executor │ │
│ │ • Claude/Gemini Providers │ │
│ │ • OTP Supervision │ │
│ │ • Fault Tolerance │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Layer 1: Execution Runtime (Elixir)
Current Implementation
The execution layer is already robust and production-ready:
# Pipeline execution with OTP supervision
{:ok, pid} = Pipeline.Executor.start_link()
{:ok, result} = Pipeline.Executor.execute(config)
# Multi-provider support
providers = [
{:claude, Pipeline.Providers.ClaudeProvider},
{:gemini, Pipeline.Providers.GeminiProvider}
]
# Fault tolerance with emergent fallbacks
case execute_pipeline(config) do
{:ok, result} -> result
{:error, reason} ->
trigger_emergent_fallback(reason)
end
Key Capabilities
1. Concurrent Pipeline Execution
# Supervisor manages multiple pipeline processes
defmodule Pipeline.Supervisor do
use DynamicSupervisor
def start_pipeline(config) do
DynamicSupervisor.start_child(__MODULE__, {
Pipeline.Worker, config
})
end
end
2. Multi-Provider Orchestration
# Single pipeline using multiple AI providers
steps:
- name: analyze_code
type: claude_smart
preset: analysis
- name: generate_plan
type: gemini_structured
format: json
- name: implement_changes
type: claude_robust
retry_count: 3
3. Emergent Fallback Systems
defp create_emergent_fallback(request) do
# When primary pipeline fails, generate new approach
pipeline_yaml = case failure_pattern(request) do
:max_turns_exceeded -> create_simple_pipeline(request)
:tool_restriction_violated -> create_no_tools_pipeline(request)
:provider_error -> create_alternative_provider_pipeline(request)
end
end
Execution Flow
User Request
│
▼
┌─────────────────┐
│ Pipeline Config │ ──► Load YAML configuration
└─────────────────┘
│
▼
┌─────────────────┐
│ Template Engine │ ──► Replace variables
└─────────────────┘
│
▼
┌─────────────────┐
│ Step Executor │ ──► Execute each step with supervision
└─────────────────┘
│
▼
┌─────────────────┐
│ Provider Router │ ──► Route to Claude/Gemini/etc
└─────────────────┘
│
▼
┌─────────────────┐
│ Result Compiler │ ──► Aggregate and format results
└─────────────────┘
Layer 2: Agent Framework (To Be Built)
Agent Wrapper Design
defmodule PipelineAgent do
use GenServer
# Public API
def handle_request(request) do
GenServer.call(__MODULE__, {:handle_request, request})
end
# Internal Implementation
def handle_call({:handle_request, request}, _from, state) do
# 1. Route request to appropriate pipeline
pipeline_path = determine_pipeline(request)
# 2. Execute pipeline asynchronously
task = Task.async(fn ->
Pipeline.Executor.execute_file(pipeline_path, %{
"user_request" => request
})
end)
# 3. Monitor execution and handle results
case Task.await(task, 300_000) do # 5 minute timeout
{:ok, result} -> format_success_response(result)
{:error, reason} -> handle_agent_failure(reason, request)
end
end
defp determine_pipeline(request) do
cond do
String.contains?(request, "analyze") -> "pipelines/analysis.yaml"
String.contains?(request, "refactor") -> "pipelines/refactor.yaml"
String.contains?(request, "generate") -> "pipelines/generate.yaml"
true -> "pipelines/general.yaml"
end
end
end
Web API Layer
# Phoenix controller for HTTP interface
defmodule PipelineWeb.AgentController do
use PipelineWeb, :controller
def chat(conn, %{"message" => user_request}) do
case PipelineAgent.handle_request(user_request) do
{:ok, result} ->
json(conn, %{
status: "success",
response: result.formatted_response,
execution_time: result.duration_ms,
pipeline_used: result.pipeline_path
})
{:error, reason} ->
json(conn, %{
status: "error",
error: reason,
fallback_used: true
})
end
end
def pipeline_status(conn, %{"job_id" => job_id}) do
# Real-time status of long-running pipelines
status = PipelineAgent.get_job_status(job_id)
json(conn, status)
end
end
Chat Interface
// Simple frontend for agent interaction
class PipelineChat {
async sendMessage(message) {
const response = await fetch('/api/agent/chat', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message})
});
const result = await response.json();
this.displayResponse(result);
}
displayResponse(result) {
if (result.status === 'success') {
this.showSuccess(result.response);
this.showMetrics(result.execution_time, result.pipeline_used);
} else {
this.showError(result.error);
}
}
}
Layer 3: DSPy Optimization (Future)
Python-Elixir Bridge
# DSPy optimizer that treats Elixir as a custom LLM
class ElixirGenesisPipeline(dspy.Module):
def __init__(self):
super().__init__()
# DSPy controls this prompt, Elixir executes it
self.generate = dspy.Predict(
'high_level_goal -> pipeline_result'
)
def forward(self, goal):
# Generate optimized prompt
optimized_prompt = self.generate(high_level_goal=goal).prediction
# Execute via Elixir
result = subprocess.run([
'mix', 'pipeline.agent.execute', optimized_prompt
], capture_output=True, text=True)
return dspy.Prediction(pipeline_result=result.stdout)
Optimization Loop
def optimize_pipeline_prompts():
# Training data from successful executions
training_examples = load_training_data()
# Metric function using another LLM
def quality_metric(example, prediction):
rater = dspy.Predict('generated_result, gold_standard -> quality_score')
score = rater(
generated_result=prediction.pipeline_result,
gold_standard=example.expected_output
).quality_score
return int(score) >= 8
# Optimize the pipeline
optimizer = dspy.teleprompt.BootstrapFewShot(
metric=quality_metric,
max_bootstrapped_demos=5
)
optimized_pipeline = optimizer.compile(
ElixirGenesisPipeline(),
trainset=training_examples
)
# Save optimized prompts back to Elixir YAML files
save_optimized_config(optimized_pipeline)
Communication Patterns
1. Request Flow (User → System)
User Input
│
▼
Agent Framework ──► Route to Pipeline
│
▼
Pipeline Executor ──► Load YAML Config
│
▼
Provider Router ──► Call Claude/Gemini
│
▼
Result Compiler ──► Format Response
│
▼
Back to User
2. Optimization Flow (DSPy → Elixir)
DSPy Optimizer
│
▼
Generate Candidate Prompt
│
▼
Call Elixir via subprocess
│
▼
Evaluate Result Quality
│
▼
Update Prompt Parameters
│
▼
Repeat until Convergence
3. Monitoring Flow (System → Observability)
Pipeline Execution
│
▼
Telemetry Events ──► Metrics Collection
│
▼
Performance Database ──► Dashboard Updates
│
▼
Alert Conditions ──► Notification System
Fault Tolerance Strategy
1. Component Isolation
Each layer can fail independently without affecting others:
# Layer 1: Pipeline execution failure
{:error, "Claude API timeout"} -> trigger_gemini_fallback()
# Layer 2: Agent framework failure
{:error, "Agent crash"} -> restart_agent_supervisor()
# Layer 3: Optimization failure
{:error, "DSPy optimization failed"} -> use_last_known_good_prompts()
2. Graceful Degradation
defp handle_system_degradation(failure_type) do
case failure_type do
:optimization_layer_down ->
# Use static prompts, disable learning
switch_to_static_mode()
:provider_unavailable ->
# Switch to alternative providers
activate_backup_providers()
:agent_framework_overloaded ->
# Direct pipeline execution
bypass_agent_layer()
end
end
3. Self-Recovery
defmodule Pipeline.HealthMonitor do
use GenServer
def handle_info(:health_check, state) do
case system_health_status() do
:healthy ->
{:noreply, state}
{:degraded, issues} ->
trigger_recovery_procedures(issues)
{:noreply, update_state(state, issues)}
:critical ->
activate_emergency_mode()
{:noreply, emergency_state()}
end
end
end
Benefits of the Hybrid Architecture
1. Best of Both Worlds
- DSPy: Advanced optimization, learning, research capabilities
- Elixir: Production reliability, concurrency, fault tolerance
2. Independent Evolution
- Can upgrade DSPy optimization without touching production execution
- Can enhance Elixir runtime without affecting optimization logic
- Each layer can be tested and deployed independently
3. Scalability Options
- Horizontal: Multiple Elixir nodes running pipelines
- Vertical: More powerful DSPy optimization servers
- Functional: Separate optimization and execution workloads
4. Technology Flexibility
- Can swap DSPy for other optimization frameworks
- Can replace Elixir with other runtimes (though we won’t!)
- Can add new AI providers without changing architecture
This hybrid architecture positions us to build the most robust and capable AI engineering platform possible, combining cutting-edge research with production-grade engineering.