← Back to Prompts

ROUTER.1 core router logic

Documentation for ROUTER.1_core_router_logic from the Dspex repository.

Task: ROUTER.1 - Core Router Logic

Context

You are implementing the core router that intelligently directs DSPex operations to the appropriate implementation (native Elixir or Python via Snakepit). This router is the brain that makes execution decisions based on availability, performance, and requirements.

Required Reading

1. Router Module

  • File: /home/home/p/g/n/dspex/lib/dspex/router.ex
    • Current router structure
    • Routing decision patterns

2. Cognitive Orchestration Architecture

  • File: /home/home/p/g/n/dspex/docs/specs/dspex_cognitive_orchestration/01_CORE_ARCHITECTURE.md
    • Section: “Cognitive Orchestration Engine”
    • Intelligence through observation

3. Detailed Router Design

  • File: /home/home/p/g/n/dspex/docs/specs/dspex_cognitive_orchestration/02_CORE_COMPONENTS_DETAILED.md
    • Section: “Component 1: Cognitive Orchestration Engine”
    • Strategy selection algorithm

4. Registry Integration

  • File: /home/home/p/g/n/dspex/lib/dspex/native/registry.ex
    • Native module registry
  • File: Previous prompt PYTHON.2
    • Python module registry design

5. Success Criteria

  • File: /home/home/p/g/n/dspex/docs/specs/dspex_cognitive_orchestration/06_SUCCESS_CRITERIA.md
    • Stage 4: Intelligent Orchestration tests

Implementation Requirements

Core Router Structure

defmodule DSPex.Router do
  use GenServer
  require Logger
  
  @moduledoc """
  Intelligent router for DSPex operations.
  Routes to native or Python implementations based on:
  - Availability
  - Performance characteristics
  - Current system load
  - Historical performance
  """
  
  defstruct [
    :native_registry,
    :python_registry,
    :performance_history,
    :routing_rules,
    :fallback_enabled,
    :metrics_collector
  ]
  
  # Routing decision result
  defmodule Route do
    defstruct [
      :implementation,     # :native | :python
      :module,            # Module to execute
      :pool_type,         # For Python: :general | :optimizer | :neural
      :timeout,           # Recommended timeout
      :reason,            # Why this route was chosen
      :alternatives,      # Other possible routes
      :confidence         # Confidence in this decision (0.0-1.0)
    ]
  end
end

Routing Decision Engine

defmodule DSPex.Router do
  def route(operation, args, context \\ %{}) do
    GenServer.call(__MODULE__, {:route, operation, args, context})
  end
  
  def handle_call({:route, operation, args, context}, _from, state) do
    # 1. Check what's available
    native_available = check_native_availability(operation, state)
    python_available = check_python_availability(operation, state)
    
    # 2. Analyze requirements
    requirements = analyze_requirements(operation, args, context)
    
    # 3. Check performance history
    performance = get_performance_history(operation, state)
    
    # 4. Make routing decision
    route = make_routing_decision(
      operation,
      %{
        native_available: native_available,
        python_available: python_available,
        requirements: requirements,
        performance: performance,
        context: context
      },
      state
    )
    
    # 5. Record decision
    record_routing_decision(route, state)
    
    {:reply, {:ok, route}, state}
  end
  
  defp make_routing_decision(operation, analysis, state) do
    cond do
      # Force native if specified
      analysis.context[:force_native] && analysis.native_available ->
        build_route(:native, operation, analysis, "forced by context")
        
      # Force Python if specified
      analysis.context[:force_python] && analysis.python_available ->
        build_route(:python, operation, analysis, "forced by context")
        
      # Only one available
      analysis.native_available && not analysis.python_available ->
        build_route(:native, operation, analysis, "only native available")
        
      not analysis.native_available && analysis.python_available ->
        build_route(:python, operation, analysis, "only python available")
        
      # Both available - use intelligence
      analysis.native_available && analysis.python_available ->
        intelligent_routing(operation, analysis, state)
        
      # Neither available
      true ->
        {:error, :no_implementation_available}
    end
  end
end

Intelligent Routing Logic

defp intelligent_routing(operation, analysis, state) do
  # Calculate scores for each option
  native_score = calculate_implementation_score(:native, operation, analysis)
  python_score = calculate_implementation_score(:python, operation, analysis)
  
  # Adjust based on current system state
  native_score = adjust_for_system_state(native_score, :native, state)
  python_score = adjust_for_system_state(python_score, :python, state)
  
  # Make decision
  if native_score > python_score do
    build_route(:native, operation, analysis, 
      "native scored #{native_score} vs python #{python_score}")
  else
    build_route(:python, operation, analysis,
      "python scored #{python_score} vs native #{native_score}")
  end
end

defp calculate_implementation_score(impl, operation, analysis) do
  base_score = 50.0
  
  # Performance history (±30 points)
  perf_score = case analysis.performance[impl] do
    %{avg_duration: avg, success_rate: rate} ->
      speed_score = calculate_speed_score(avg)
      reliability_score = rate * 20
      speed_score + reliability_score
      
    nil -> 0
  end
  
  # Requirements match (±20 points)
  req_score = calculate_requirements_score(impl, analysis.requirements)
  
  # Implementation characteristics (±10 points)
  char_score = case impl do
    :native -> 
      # Native is better for simple, frequent operations
      if analysis.requirements.complexity == :low, do: 10, else: -5
      
    :python ->
      # Python is better for complex ML operations
      if analysis.requirements.complexity == :high, do: 10, else: -5
  end
  
  base_score + perf_score + req_score + char_score
end

Performance Tracking

defmodule DSPex.Router.PerformanceTracker do
  @window_size 100  # Keep last 100 executions per operation
  
  def record_execution(operation, implementation, duration, success) do
    :ets.insert(
      :router_performance,
      {
        {operation, implementation, System.monotonic_time()},
        %{
          duration: duration,
          success: success,
          timestamp: DateTime.utc_now()
        }
      }
    )
    
    # Trim old records
    trim_old_records(operation, implementation)
  end
  
  def get_stats(operation, implementation) do
    records = get_recent_records(operation, implementation)
    
    if Enum.empty?(records) do
      nil
    else
      %{
        avg_duration: calculate_average_duration(records),
        p95_duration: calculate_percentile(records, 95),
        success_rate: calculate_success_rate(records),
        sample_size: length(records)
      }
    end
  end
  
  defp calculate_average_duration(records) do
    successful = Enum.filter(records, & &1.success)
    if Enum.empty?(successful) do
      nil
    else
      avg = Enum.sum(Enum.map(successful, & &1.duration)) / length(successful)
      round(avg)
    end
  end
end

Fallback Handling

defmodule DSPex.Router.Fallback do
  def handle_execution_failure(route, error, state) do
    case route.implementation do
      :native when route.alternatives[:python] ->
        Logger.warning("Native execution failed, falling back to Python: #{inspect(error)}")
        
        # Build Python route
        python_route = build_fallback_route(route, :python)
        
        # Record fallback
        :telemetry.execute(
          [:dspex, :router, :fallback],
          %{},
          %{from: :native, to: :python, reason: error}
        )
        
        {:fallback, python_route}
        
      :python when route.alternatives[:native] ->
        Logger.warning("Python execution failed, falling back to native: #{inspect(error)}")
        
        native_route = build_fallback_route(route, :native)
        
        :telemetry.execute(
          [:dspex, :router, :fallback],
          %{},
          %{from: :python, to: :native, reason: error}
        )
        
        {:fallback, native_route}
        
      _ ->
        {:error, :no_fallback_available}
    end
  end
end

Registry Integration

defp check_native_availability(operation, state) do
  case DSPex.Native.Registry.get(operation) do
    {:ok, module_info} -> 
      %{
        available: true,
        module: module_info.module,
        estimated_duration: module_info.estimated_duration
      }
    :error -> 
      %{available: false}
  end
end

defp check_python_availability(operation, state) do
  case DSPex.Python.Registry.get_module(operation) do
    {:ok, module_info} ->
      %{
        available: true,
        module: module_info.name,
        pool_type: module_info.pool_type,
        estimated_duration: module_info.estimated_duration
      }
    :error ->
      %{available: false}
  end
end

Acceptance Criteria

  • Routes operations based on availability
  • Tracks performance history for decisions
  • Implements intelligent routing when both available
  • Supports fallback on failure
  • Integrates with both registries
  • Emits telemetry events for monitoring
  • Handles forced routing (context overrides)
  • Thread-safe concurrent routing
  • Configurable routing rules

Testing Requirements

Create tests in:

  • test/dspex/router_test.exs
  • test/dspex/router/performance_tracker_test.exs

Test scenarios:

  • Route when only native available
  • Route when only Python available
  • Intelligent routing based on performance
  • Fallback handling
  • Concurrent routing requests
  • Performance tracking accuracy
  • Context overrides

Example Usage

# Basic routing
{:ok, route} = DSPex.Router.route("predict", %{
  text: "Hello world",
  max_tokens: 50
})

route
# %Route{
#   implementation: :native,
#   module: DSPex.Native.Predict,
#   timeout: 1000,
#   reason: "native faster for simple predictions",
#   confidence: 0.85
# }

# Force Python implementation
{:ok, route} = DSPex.Router.route(
  "chain_of_thought",
  %{question: "Complex question"},
  %{force_python: true}
)

# Route with requirements
{:ok, route} = DSPex.Router.route(
  "optimize",
  %{iterations: 1000},
  %{require_gpu: true}
)
# Routes to Python neural pool

# After execution, record performance
DSPex.Router.record_execution(
  "predict",
  :native,
  duration_ms: 45,
  success: true
)

Dependencies

  • Requires registries (NATIVE.*, PYTHON.2) to be implemented
  • Integrates with telemetry system
  • No circular dependencies

Time Estimate

6 hours total:

  • 2 hours: Core routing logic
  • 1 hour: Performance tracking
  • 1 hour: Intelligent routing algorithm
  • 1 hour: Fallback handling
  • 1 hour: Testing

Notes

  • Start with simple routing, add intelligence incrementally
  • Use ETS for performance tracking
  • Consider caching routing decisions briefly
  • Monitor routing patterns for optimization
  • Document routing decision reasons
  • Plan for routing rule customization