← Back to Analysis

19 dspex integration architecture

Documentation for 19_dspex_integration_architecture from the Pipeline ex repository.

DSPex Integration Architecture: Next-Generation Agent Intelligence

Overview

This document outlines the comprehensive architecture for integrating DSPex (Declarative Self-improving Elixir) with MABEAM agent framework into pipeline_ex, creating a revolutionary multi-agent intelligence platform that surpasses traditional DSPy approaches.

Vision Statement

Transform pipeline_ex from a simple pipeline executor into a comprehensive multi-agent intelligence platform where:

  • Agents are the execution units - Every step runs in fault-tolerant agents
  • Signatures are intelligent contracts - Python-like syntax with compile-time optimization
  • Variables are first-class citizens - Unified abstraction for all optimizable parameters
  • Optimization is distributed - Multi-agent coordination for superior results
  • Execution is resilient - OTP supervision for production reliability

Core Architecture Components

1. Multi-Agent Execution Engine

Agent-Based Step Execution

defmodule Pipeline.DSPex.AgentExecutor do
  @moduledoc """
  Agent-based execution engine replacing traditional step execution.
  """
  
  def execute_step(step, context) do
    # Determine agent type for step
    agent_type = determine_agent_type(step)
    
    # Get or create agent for execution
    case get_or_create_agent(agent_type, step) do
      {:ok, agent_id} ->
        # Execute step as agent action
        Mabeam.execute_action(agent_id, :execute_step, %{
          step: step,
          context: context
        })
      
      {:error, reason} ->
        {:error, "Agent creation failed: #{reason}"}
    end
  end
  
  defp determine_agent_type(step) do
    case step["type"] do
      "dspex_claude" -> :claude_agent
      "dspex_gemini" -> :gemini_agent
      "dspex_chain" -> :chain_agent
      "dspex_optimizer" -> :optimizer_agent
      _ -> :generic_agent
    end
  end
end

Specialized Agent Types

defmodule Pipeline.DSPex.Agents.ClaudeAgent do
  @moduledoc """
  Claude-specific agent with DSPex signature support.
  """
  
  use Mabeam.Agent
  use DSPex.SignatureAgent
  
  @impl true
  def init(agent, config) do
    # Initialize Claude-specific state
    state = %{
      model: config["model"] || "claude-3-sonnet",
      temperature: config["temperature"] || 0.7,
      max_tokens: config["max_tokens"] || 4000,
      signature_cache: %{},
      optimization_state: %{}
    }
    
    {:ok, %{agent | state: state}}
  end
  
  @impl true
  def handle_action(agent, :execute_step, %{step: step, context: context}) do
    # Extract signature from step
    signature = extract_signature(step)
    
    # Resolve variables in signature
    resolved_signature = resolve_signature_variables(signature, agent.state)
    
    # Execute with Claude
    case execute_with_claude(resolved_signature, context) do
      {:ok, result} ->
        # Update optimization state
        updated_agent = update_optimization_state(agent, signature, result)
        
        {:ok, updated_agent, result}
      
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp execute_with_claude(signature, context) do
    # Use signature to build optimized prompt
    prompt = build_prompt_from_signature(signature, context)
    
    # Execute with Claude provider
    Pipeline.Providers.ClaudeProvider.query(prompt, signature.options)
  end
end

2. Python-Like Type System Integration

Enhanced Signature Module

defmodule Pipeline.DSPex.Signature do
  @moduledoc """
  Enhanced signature system with Python-like syntax and agent integration.
  """
  
  use ElixirML.Signature
  
  defmacro __using__(opts) do
    quote do
      import Pipeline.DSPex.Signature
      import ElixirML.Signature.TypeMacros
      
      Module.register_attribute(__MODULE__, :agent_config, accumulate: false)
      Module.register_attribute(__MODULE__, :variables, accumulate: true)
      
      @before_compile Pipeline.DSPex.Signature
    end
  end
  
  defmacro signature(signature_ast) do
    quote do
      @signature_ast unquote(signature_ast)
    end
  end
  
  defmacro agent_config(config) do
    quote do
      @agent_config unquote(config)
    end
  end
  
  defmacro variable(name, type, opts \\ []) do
    quote do
      @variables {unquote(name), unquote(type), unquote(opts)}
    end
  end
  
  defmacro __before_compile__(env) do
    signature_ast = Module.get_attribute(env.module, :signature_ast)
    agent_config = Module.get_attribute(env.module, :agent_config)
    variables = Module.get_attribute(env.module, :variables)
    
    quote do
      def __signature__, do: unquote(signature_ast)
      def __agent_config__, do: unquote(agent_config)
      def __variables__, do: unquote(variables)
      
      def validate_input(input), do: validate_signature_input(input, __signature__())
      def validate_output(output), do: validate_signature_output(output, __signature__())
    end
  end
end

Example Usage

defmodule SecurityAnalysisSignature do
  use Pipeline.DSPex.Signature
  
  # Python-like type syntax
  signature analyze_code: list[:string] ->
    vulnerabilities: list[SecurityVulnerability],
    risk_score: float[0.0, 1.0],
    recommendations: list[:string]
  
  # Agent configuration
  agent_config %{
    primary_agent: :security_specialist,
    fallback_agent: :general_analyzer,
    optimization_agent: :security_optimizer
  }
  
  # Variable declarations
  variable :analysis_depth, :discrete,
    default: :thorough,
    options: [:quick, :standard, :thorough, :comprehensive]
  
  variable :confidence_threshold, :continuous,
    default: 0.8,
    constraints: [range: {0.0, 1.0}]
  
  variable :analysis_prompt, :string,
    default: "Analyze this code for security vulnerabilities:",
    constraints: [length: [min: 10, max: 500]]
end

3. Variable Abstraction Integration

Variable-Aware Agent System

defmodule Pipeline.DSPex.VariableAgent do
  @moduledoc """
  Agent that manages variable optimization and state.
  """
  
  use Mabeam.Agent
  use DSPy.Variable.Agent
  
  @impl true
  def init(agent, config) do
    # Initialize variable registry for this agent
    variable_registry = DSPy.Variable.Registry.create_agent_registry(agent.id)
    
    # Register variables from signature
    signature_variables = extract_signature_variables(config.signature)
    Enum.each(signature_variables, fn {name, type, opts} ->
      DSPy.Variable.Registry.register_variable(agent.id, name, type, opts)
    end)
    
    state = %{
      signature: config.signature,
      variable_registry: variable_registry,
      optimization_history: [],
      current_values: %{}
    }
    
    {:ok, %{agent | state: state}}
  end
  
  @impl true
  def handle_action(agent, :update_variable, %{name: name, value: value}) do
    # Update variable with validation
    case DSPy.Variable.Registry.update_variable(agent.id, name, value) do
      {:ok, updated_variable} ->
        # Update agent state
        new_values = Map.put(agent.state.current_values, name, value)
        updated_agent = put_in(agent.state.current_values, new_values)
        
        # Emit variable update event
        Mabeam.emit_event(:variable_updated, %{
          agent_id: agent.id,
          variable_name: name,
          old_value: Map.get(agent.state.current_values, name),
          new_value: value
        })
        
        {:ok, updated_agent, %{variable: updated_variable}}
      
      {:error, reason} ->
        {:error, "Variable update failed: #{reason}"}
    end
  end
  
  @impl true
  def handle_action(agent, :optimize_variables, %{strategy: strategy, data: training_data}) do
    # Get all variables for optimization
    variables = DSPy.Variable.Registry.get_variables_for_agent(agent.id)
    
    # Create optimization task
    optimization_task = %{
      agent_id: agent.id,
      variables: variables,
      strategy: strategy,
      training_data: training_data
    }
    
    # Send to optimization coordinator
    case send_to_optimization_coordinator(optimization_task) do
      {:ok, optimization_id} ->
        # Track optimization
        new_history = [optimization_id | agent.state.optimization_history]
        updated_agent = put_in(agent.state.optimization_history, new_history)
        
        {:ok, updated_agent, %{optimization_id: optimization_id}}
      
      {:error, reason} ->
        {:error, "Optimization failed: #{reason}"}
    end
  end
end

4. Multi-Agent Optimization System

Optimization Coordinator Agent

defmodule Pipeline.DSPex.OptimizationCoordinator do
  @moduledoc """
  Central coordinator for multi-agent optimization.
  """
  
  use Mabeam.Agent
  
  @impl true
  def init(agent, config) do
    state = %{
      active_optimizations: %{},
      optimization_strategies: %{
        prompt_optimization: Pipeline.DSPex.Optimizers.PromptOptimizer,
        parameter_optimization: Pipeline.DSPex.Optimizers.ParameterOptimizer,
        distributed_optimization: Pipeline.DSPex.Optimizers.DistributedOptimizer
      },
      coordinator_agents: %{}
    }
    
    {:ok, %{agent | state: state}}
  end
  
  @impl true
  def handle_action(agent, :start_optimization, optimization_task) do
    optimization_id = generate_optimization_id()
    
    # Determine optimization strategy
    strategy = determine_optimization_strategy(optimization_task)
    
    # Create specialized optimization agents
    optimization_agents = create_optimization_agents(strategy, optimization_task)
    
    # Store optimization state
    optimization_state = %{
      id: optimization_id,
      task: optimization_task,
      strategy: strategy,
      agents: optimization_agents,
      status: :running,
      started_at: DateTime.utc_now(),
      results: %{}
    }
    
    new_optimizations = Map.put(agent.state.active_optimizations, optimization_id, optimization_state)
    updated_agent = put_in(agent.state.active_optimizations, new_optimizations)
    
    # Start optimization process
    start_optimization_process(optimization_id, optimization_agents)
    
    {:ok, updated_agent, %{optimization_id: optimization_id}}
  end
  
  defp create_optimization_agents(strategy, task) do
    case strategy do
      :prompt_optimization ->
        {:ok, prompt_agent, _} = start_prompt_optimization_agent(task)
        [prompt_agent]
      
      :parameter_optimization ->
        {:ok, param_agent, _} = start_parameter_optimization_agent(task)
        [param_agent]
      
      :distributed_optimization ->
        {:ok, prompt_agent, _} = start_prompt_optimization_agent(task)
        {:ok, param_agent, _} = start_parameter_optimization_agent(task)
        {:ok, eval_agent, _} = start_evaluation_agent(task)
        [prompt_agent, param_agent, eval_agent]
    end
  end
end

Specialized Optimization Agents

defmodule Pipeline.DSPex.Optimizers.PromptOptimizer do
  @moduledoc """
  Specialized agent for prompt optimization.
  """
  
  use Mabeam.Agent
  use DSPy.Optimizer.VariableOptimizer
  
  @impl true
  def handle_action(agent, :optimize_prompts, %{variables: variables, training_data: data}) do
    # Filter to string variables (prompts)
    prompt_variables = Enum.filter(variables, &(&1.type == :string))
    
    # Generate prompt variants using LLM
    variant_tasks = Enum.map(prompt_variables, fn var ->
      Task.async(fn -> generate_prompt_variants(var, data) end)
    end)
    
    # Collect results
    variants = Task.await_many(variant_tasks, 30_000)
    
    # Evaluate variants
    evaluation_tasks = Enum.map(variants, fn {var_id, variant_list} ->
      Task.async(fn -> evaluate_prompt_variants(var_id, variant_list, data) end)
    end)
    
    # Get best variants
    best_variants = Task.await_many(evaluation_tasks, 60_000)
    
    # Create optimization results
    optimization_results = Enum.map(best_variants, fn {var_id, best_variant, score} ->
      %{
        variable_id: var_id,
        optimized_value: best_variant,
        improvement_score: score,
        optimization_method: :prompt_variants
      }
    end)
    
    {:ok, agent, %{results: optimization_results}}
  end
  
  defp generate_prompt_variants(variable, training_data) do
    # Use Claude to generate prompt variants
    variant_prompt = """
    Generate 10 improved variations of this prompt:
    "#{variable.value}"
    
    Requirements:
    - Maintain the same intent and functionality
    - Improve clarity and effectiveness
    - Consider these constraints: #{inspect(variable.constraints)}
    
    Training context:
    #{format_training_context(training_data)}
    
    Return as a JSON array of strings.
    """
    
    case Pipeline.Providers.ClaudeProvider.query(variant_prompt, %{}) do
      {:ok, response} ->
        variants = extract_variants_from_response(response)
        {variable.id, variants}
      
      {:error, reason} ->
        Logger.error("Failed to generate prompt variants: #{reason}")
        {variable.id, [variable.value]}
    end
  end
end

5. Agent-Aware Configuration System

DSPex Configuration Schema

defmodule Pipeline.DSPex.ConfigSchema do
  @moduledoc """
  Configuration schema for DSPex-enabled pipelines.
  """
  
  def get_dspex_schema_extension do
    %{
      "properties" => %{
        "workflow" => %{
          "properties" => %{
            "agent_config" => %{
              "type" => "object",
              "properties" => %{
                "default_agent_type" => %{"type" => "string"},
                "agent_pools" => %{
                  "type" => "object",
                  "patternProperties" => %{
                    ".*" => %{
                      "type" => "object",
                      "properties" => %{
                        "size" => %{"type" => "integer", "minimum" => 1},
                        "agent_type" => %{"type" => "string"},
                        "initialization_config" => %{"type" => "object"}
                      }
                    }
                  }
                },
                "optimization_config" => %{
                  "type" => "object",
                  "properties" => %{
                    "enabled" => %{"type" => "boolean"},
                    "strategy" => %{
                      "type" => "string",
                      "enum" => ["prompt_optimization", "parameter_optimization", "distributed_optimization"]
                    },
                    "coordination_agent" => %{"type" => "string"}
                  }
                }
              }
            },
            "steps" => %{
              "items" => %{
                "properties" => %{
                  "signature" => %{
                    "type" => "object",
                    "properties" => %{
                      "input_fields" => %{
                        "type" => "array",
                        "items" => %{
                          "type" => "object",
                          "properties" => %{
                            "name" => %{"type" => "string"},
                            "type" => %{"type" => "string"},
                            "constraints" => %{"type" => "object"}
                          }
                        }
                      },
                      "output_fields" => %{
                        "type" => "array",
                        "items" => %{
                          "type" => "object",
                          "properties" => %{
                            "name" => %{"type" => "string"},
                            "type" => %{"type" => "string"},
                            "constraints" => %{"type" => "object"}
                          }
                        }
                      }
                    }
                  },
                  "variables" => %{
                    "type" => "array",
                    "items" => %{
                      "type" => "object",
                      "properties" => %{
                        "name" => %{"type" => "string"},
                        "type" => %{"type" => "string"},
                        "default" => %{"type" => "any"},
                        "constraints" => %{"type" => "object"}
                      }
                    }
                  },
                  "agent_config" => %{
                    "type" => "object",
                    "properties" => %{
                      "primary_agent" => %{"type" => "string"},
                      "fallback_agent" => %{"type" => "string"},
                      "optimization_agent" => %{"type" => "string"}
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  end
end

Enhanced Configuration Loading

defmodule Pipeline.DSPex.ConfigLoader do
  @moduledoc """
  Enhanced configuration loader with DSPex support.
  """
  
  def load_dspex_config(config_path) do
    with {:ok, raw_config} <- File.read(config_path),
         {:ok, parsed_config} <- parse_config(raw_config),
         {:ok, validated_config} <- validate_dspex_config(parsed_config),
         {:ok, enhanced_config} <- enhance_with_agents(validated_config) do
      {:ok, enhanced_config}
    else
      {:error, reason} -> {:error, reason}
    end
  end
  
  defp parse_config(raw_config) do
    # Support both YAML and JSON
    case Path.extname(config_path) do
      ".yaml" -> YamlElixir.read_from_string(raw_config)
      ".json" -> Jason.decode(raw_config)
      _ -> {:error, "Unsupported config format"}
    end
  end
  
  defp validate_dspex_config(config) do
    # Validate with DSPex schema extensions
    schema = Pipeline.DSPex.ConfigSchema.get_dspex_schema_extension()
    
    Pipeline.Enhanced.SchemaValidator.validate_with_type_preservation(config, schema)
  end
  
  defp enhance_with_agents(config) do
    # Add agent-specific enhancements
    enhanced_workflow = config["workflow"]
    |> enhance_steps_with_agents()
    |> setup_agent_pools()
    |> configure_optimization_agents()
    
    {:ok, put_in(config, ["workflow"], enhanced_workflow)}
  end
  
  defp enhance_steps_with_agents(workflow) do
    enhanced_steps = Enum.map(workflow["steps"], fn step ->
      step
      |> determine_agent_requirements()
      |> setup_step_agents()
      |> configure_step_variables()
    end)
    
    put_in(workflow, ["steps"], enhanced_steps)
  end
end

6. Production Integration

Agent Supervisor Integration

defmodule Pipeline.DSPex.Supervisor do
  @moduledoc """
  Supervisor for DSPex agent-based pipeline execution.
  """
  
  use Supervisor
  
  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def init(opts) do
    children = [
      # MABEAM foundation
      {Mabeam.Foundation.Supervisor, opts},
      
      # DSPex-specific agents
      {Pipeline.DSPex.OptimizationCoordinator, []},
      {Pipeline.DSPex.VariableRegistry, []},
      {Pipeline.DSPex.AgentPool, []},
      
      # Enhanced infrastructure
      {Pipeline.Enhanced.AgentStepRegistry, []},
      {Pipeline.Enhanced.AgentSchemaValidator, []},
      {Pipeline.DSPex.ConfigManager, []}
    ]
    
    Supervisor.init(children, strategy: :one_for_one)
  end
end

Example DSPex Pipeline Configuration

# dspex_security_analysis.yaml
workflow:
  name: "dspex_security_analysis"
  description: "Multi-agent security analysis with optimization"
  
  agent_config:
    default_agent_type: "security_specialist"
    agent_pools:
      security_specialists:
        size: 3
        agent_type: "security_specialist"
        initialization_config:
          specializations: ["web_security", "crypto", "auth"]
      
      optimizers:
        size: 2
        agent_type: "optimization_coordinator"
        initialization_config:
          strategies: ["prompt_optimization", "parameter_optimization"]
    
    optimization_config:
      enabled: true
      strategy: "distributed_optimization"
      coordination_agent: "optimization_coordinator"
  
  steps:
    - name: "analyze_security_vulnerabilities"
      type: "dspex_claude"
      
      signature:
        input_fields:
          - name: "source_code"
            type: "list[string]"
            constraints:
              length: {min: 1, max: 10000}
          
          - name: "analysis_context"
            type: "dict[string, any]"
            constraints:
              required_keys: ["language", "framework"]
        
        output_fields:
          - name: "vulnerabilities"
            type: "list[SecurityVulnerability]"
            constraints:
              length: {min: 0, max: 100}
          
          - name: "risk_assessment"
            type: "RiskAssessment"
            constraints:
              required_fields: ["overall_score", "category_scores"]
          
          - name: "recommendations"
            type: "list[string]"
            constraints:
              length: {min: 0, max: 50}
      
      variables:
        - name: "analysis_depth"
          type: "discrete"
          default: "thorough"
          constraints:
            options: ["quick", "standard", "thorough", "comprehensive"]
        
        - name: "confidence_threshold"
          type: "continuous"
          default: 0.8
          constraints:
            range: {min: 0.0, max: 1.0}
        
        - name: "analysis_prompt"
          type: "string"
          default: "Analyze the following code for security vulnerabilities:"
          constraints:
            length: {min: 10, max: 500}
      
      agent_config:
        primary_agent: "security_specialist"
        fallback_agent: "general_analyzer"
        optimization_agent: "security_optimizer"
    
    - name: "generate_security_report"
      type: "dspex_chain"
      
      signature:
        input_fields:
          - name: "vulnerabilities"
            type: "list[SecurityVulnerability]"
          - name: "risk_assessment"  
            type: "RiskAssessment"
          - name: "recommendations"
            type: "list[string]"
        
        output_fields:
          - name: "security_report"
            type: "SecurityReport"
          - name: "executive_summary"
            type: "string"
      
      chain_config:
        steps:
          - agent: "report_generator"
            action: "structure_findings"
          - agent: "executive_summarizer"
            action: "create_executive_summary"
          - agent: "formatter"
            action: "format_final_report"

Benefits of DSPex Integration

1. Revolutionary Architecture

  • Native Elixir/BEAM - No Python bridge complexity
  • Multi-agent intelligence - Distributed optimization and execution
  • Fault-tolerant design - OTP supervision for production reliability
  • Scalable concurrency - Thousands of agents per node

2. Superior Developer Experience

  • Python-like syntax - Familiar and intuitive type system
  • Comprehensive tooling - Full IDE support with type checking
  • Agent abstractions - Simple yet powerful execution model
  • Variable-first design - Unified optimization interface

3. Production-Ready Features

  • Built-in monitoring - Agent health and performance tracking
  • Hot code reloading - Zero-downtime updates
  • Horizontal scaling - Multi-node distribution
  • Event-driven coordination - Reactive agent interactions

4. Advanced Optimization

  • Multi-agent optimization - Distributed intelligence
  • Variable abstraction - Unified parameter optimization
  • Adaptive learning - Continuous improvement
  • Performance monitoring - Real-time optimization feedback

Migration Strategy

Phase 1: Foundation (Weeks 1-2)

  1. Integrate MABEAM - Add agent framework
  2. Enhance schemas - Python-like type syntax
  3. Basic agent steps - Convert existing steps
  4. Agent registry - Extend step registry

Phase 2: Variables (Weeks 3-4)

  1. Variable system - Implement abstraction
  2. Agent variables - Connect to agent state
  3. Basic optimization - Single-agent optimization
  4. Configuration enhancement - Support variables

Phase 3: Multi-Agent (Weeks 5-6)

  1. Optimization coordination - Multi-agent system
  2. Distributed optimization - Scale across nodes
  3. Advanced signatures - Complex type support
  4. Performance tuning - Optimize agent performance

Phase 4: Production (Weeks 7-8)

  1. Monitoring integration - Agent-aware monitoring
  2. Deployment tools - Agent deployment utilities
  3. Advanced features - Hot reloading, scaling
  4. Documentation - Complete user guides

This DSPex integration transforms pipeline_ex into a next-generation multi-agent intelligence platform, providing unprecedented capabilities for AI system development and optimization.