← Back to Pipeline ex

JULY 1 ARCH DOCS 02 HYBRID ARCHITECTURE

Documentation for JULY_1_ARCH_DOCS_02_HYBRID_ARCHITECTURE from the Pipeline ex repository.

JULY_1_ARCH_DOCS_02: The Hybrid DSPy-Elixir Architecture

Architecture Overview

ElexirionDSP implements a hybrid architecture that combines the best of two worlds:

  1. DSPy (Python): Dynamic prompt optimization and learning
  2. pipeline_ex (Elixir): Robust, concurrent execution with OTP supervision

This separation allows us to leverage DSPy’s advanced optimization capabilities while maintaining production-grade reliability through Elixir’s battle-tested concurrency model.

Core Components

The Three-Layer Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Layer 3: Optimization                   │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              DSPy Optimizer                         │    │
│  │  • Prompt optimization                              │    │
│  │  • Performance metrics                              │    │
│  │  • Learning algorithms                              │    │
│  │  • A/B testing                                      │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────┐
│                  Layer 2: Orchestration                    │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Agent Framework                        │    │
│  │  • PipelineAgent.handle_request/1                   │    │
│  │  • Web API                                          │    │
│  │  • Chat Interface                                   │    │
│  │  • Job Management                                   │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────┐
│                   Layer 1: Execution                       │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              pipeline_ex Runtime                    │    │
│  │  • Pipeline.Executor                               │    │
│  │  • Claude/Gemini Providers                         │    │
│  │  • OTP Supervision                                 │    │
│  │  • Fault Tolerance                                 │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Layer 1: Execution Runtime (Elixir)

Current Implementation

The execution layer is already robust and production-ready:

# Pipeline execution with OTP supervision
{:ok, pid} = Pipeline.Executor.start_link()
{:ok, result} = Pipeline.Executor.execute(config)

# Multi-provider support
providers = [
  {:claude, Pipeline.Providers.ClaudeProvider},
  {:gemini, Pipeline.Providers.GeminiProvider}
]

# Fault tolerance with emergent fallbacks
case execute_pipeline(config) do
  {:ok, result} -> result
  {:error, reason} -> 
    trigger_emergent_fallback(reason)
end

Key Capabilities

1. Concurrent Pipeline Execution

# Supervisor manages multiple pipeline processes
defmodule Pipeline.Supervisor do
  use DynamicSupervisor
  
  def start_pipeline(config) do
    DynamicSupervisor.start_child(__MODULE__, {
      Pipeline.Worker, config
    })
  end
end

2. Multi-Provider Orchestration

# Single pipeline using multiple AI providers
steps:
  - name: analyze_code
    type: claude_smart
    preset: analysis
    
  - name: generate_plan  
    type: gemini_structured
    format: json
    
  - name: implement_changes
    type: claude_robust
    retry_count: 3

3. Emergent Fallback Systems

defp create_emergent_fallback(request) do
  # When primary pipeline fails, generate new approach
  pipeline_yaml = case failure_pattern(request) do
    :max_turns_exceeded -> create_simple_pipeline(request)
    :tool_restriction_violated -> create_no_tools_pipeline(request) 
    :provider_error -> create_alternative_provider_pipeline(request)
  end
end

Execution Flow

User Request
     │
     ▼
┌─────────────────┐
│ Pipeline Config │ ──► Load YAML configuration
└─────────────────┘
     │
     ▼
┌─────────────────┐
│ Template Engine │ ──► Replace variables
└─────────────────┘
     │
     ▼
┌─────────────────┐
│ Step Executor   │ ──► Execute each step with supervision
└─────────────────┘
     │
     ▼
┌─────────────────┐
│ Provider Router │ ──► Route to Claude/Gemini/etc
└─────────────────┘
     │
     ▼
┌─────────────────┐
│ Result Compiler │ ──► Aggregate and format results
└─────────────────┘

Layer 2: Agent Framework (To Be Built)

Agent Wrapper Design

defmodule PipelineAgent do
  use GenServer
  
  # Public API
  def handle_request(request) do
    GenServer.call(__MODULE__, {:handle_request, request})
  end
  
  # Internal Implementation
  def handle_call({:handle_request, request}, _from, state) do
    # 1. Route request to appropriate pipeline
    pipeline_path = determine_pipeline(request)
    
    # 2. Execute pipeline asynchronously  
    task = Task.async(fn ->
      Pipeline.Executor.execute_file(pipeline_path, %{
        "user_request" => request
      })
    end)
    
    # 3. Monitor execution and handle results
    case Task.await(task, 300_000) do # 5 minute timeout
      {:ok, result} -> format_success_response(result)
      {:error, reason} -> handle_agent_failure(reason, request)
    end
  end
  
  defp determine_pipeline(request) do
    cond do
      String.contains?(request, "analyze") -> "pipelines/analysis.yaml"
      String.contains?(request, "refactor") -> "pipelines/refactor.yaml"
      String.contains?(request, "generate") -> "pipelines/generate.yaml"
      true -> "pipelines/general.yaml"
    end
  end
end

Web API Layer

# Phoenix controller for HTTP interface
defmodule PipelineWeb.AgentController do
  use PipelineWeb, :controller
  
  def chat(conn, %{"message" => user_request}) do
    case PipelineAgent.handle_request(user_request) do
      {:ok, result} ->
        json(conn, %{
          status: "success",
          response: result.formatted_response,
          execution_time: result.duration_ms,
          pipeline_used: result.pipeline_path
        })
        
      {:error, reason} ->
        json(conn, %{
          status: "error", 
          error: reason,
          fallback_used: true
        })
    end
  end
  
  def pipeline_status(conn, %{"job_id" => job_id}) do
    # Real-time status of long-running pipelines
    status = PipelineAgent.get_job_status(job_id)
    json(conn, status)
  end
end

Chat Interface

// Simple frontend for agent interaction
class PipelineChat {
  async sendMessage(message) {
    const response = await fetch('/api/agent/chat', {
      method: 'POST',
      headers: {'Content-Type': 'application/json'},
      body: JSON.stringify({message})
    });
    
    const result = await response.json();
    this.displayResponse(result);
  }
  
  displayResponse(result) {
    if (result.status === 'success') {
      this.showSuccess(result.response);
      this.showMetrics(result.execution_time, result.pipeline_used);
    } else {
      this.showError(result.error);
    }
  }
}

Layer 3: DSPy Optimization (Future)

Python-Elixir Bridge

# DSPy optimizer that treats Elixir as a custom LLM
class ElixirGenesisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        # DSPy controls this prompt, Elixir executes it
        self.generate = dspy.Predict(
            'high_level_goal -> pipeline_result'
        )
    
    def forward(self, goal):
        # Generate optimized prompt
        optimized_prompt = self.generate(high_level_goal=goal).prediction
        
        # Execute via Elixir
        result = subprocess.run([
            'mix', 'pipeline.agent.execute', optimized_prompt
        ], capture_output=True, text=True)
        
        return dspy.Prediction(pipeline_result=result.stdout)

Optimization Loop

def optimize_pipeline_prompts():
    # Training data from successful executions
    training_examples = load_training_data()
    
    # Metric function using another LLM
    def quality_metric(example, prediction):
        rater = dspy.Predict('generated_result, gold_standard -> quality_score')
        score = rater(
            generated_result=prediction.pipeline_result,
            gold_standard=example.expected_output
        ).quality_score
        return int(score) >= 8
    
    # Optimize the pipeline
    optimizer = dspy.teleprompt.BootstrapFewShot(
        metric=quality_metric,
        max_bootstrapped_demos=5
    )
    
    optimized_pipeline = optimizer.compile(
        ElixirGenesisPipeline(),
        trainset=training_examples
    )
    
    # Save optimized prompts back to Elixir YAML files
    save_optimized_config(optimized_pipeline)

Communication Patterns

1. Request Flow (User → System)

User Input
    │
    ▼
Agent Framework ──► Route to Pipeline
    │
    ▼  
Pipeline Executor ──► Load YAML Config
    │
    ▼
Provider Router ──► Call Claude/Gemini
    │
    ▼
Result Compiler ──► Format Response
    │
    ▼
Back to User

2. Optimization Flow (DSPy → Elixir)

DSPy Optimizer
    │
    ▼
Generate Candidate Prompt
    │
    ▼
Call Elixir via subprocess
    │
    ▼
Evaluate Result Quality
    │
    ▼
Update Prompt Parameters
    │
    ▼
Repeat until Convergence

3. Monitoring Flow (System → Observability)

Pipeline Execution
    │
    ▼
Telemetry Events ──► Metrics Collection
    │
    ▼
Performance Database ──► Dashboard Updates
    │
    ▼
Alert Conditions ──► Notification System

Fault Tolerance Strategy

1. Component Isolation

Each layer can fail independently without affecting others:

# Layer 1: Pipeline execution failure
{:error, "Claude API timeout"} -> trigger_gemini_fallback()

# Layer 2: Agent framework failure  
{:error, "Agent crash"} -> restart_agent_supervisor()

# Layer 3: Optimization failure
{:error, "DSPy optimization failed"} -> use_last_known_good_prompts()

2. Graceful Degradation

defp handle_system_degradation(failure_type) do
  case failure_type do
    :optimization_layer_down -> 
      # Use static prompts, disable learning
      switch_to_static_mode()
      
    :provider_unavailable ->
      # Switch to alternative providers
      activate_backup_providers()
      
    :agent_framework_overloaded ->
      # Direct pipeline execution
      bypass_agent_layer()
  end
end

3. Self-Recovery

defmodule Pipeline.HealthMonitor do
  use GenServer
  
  def handle_info(:health_check, state) do
    case system_health_status() do
      :healthy -> 
        {:noreply, state}
        
      {:degraded, issues} ->
        trigger_recovery_procedures(issues)
        {:noreply, update_state(state, issues)}
        
      :critical ->
        activate_emergency_mode()
        {:noreply, emergency_state()}
    end
  end
end

Benefits of the Hybrid Architecture

1. Best of Both Worlds

  • DSPy: Advanced optimization, learning, research capabilities
  • Elixir: Production reliability, concurrency, fault tolerance

2. Independent Evolution

  • Can upgrade DSPy optimization without touching production execution
  • Can enhance Elixir runtime without affecting optimization logic
  • Each layer can be tested and deployed independently

3. Scalability Options

  • Horizontal: Multiple Elixir nodes running pipelines
  • Vertical: More powerful DSPy optimization servers
  • Functional: Separate optimization and execution workloads

4. Technology Flexibility

  • Can swap DSPy for other optimization frameworks
  • Can replace Elixir with other runtimes (though we won’t!)
  • Can add new AI providers without changing architecture

This hybrid architecture positions us to build the most robust and capable AI engineering platform possible, combining cutting-edge research with production-grade engineering.