← Back to Specs

Dspex variables implementation strategy

Documentation for dspex_variables_implementation_strategy from the Dspex repository.

DSPex Variables Implementation Strategy: A Comprehensive Specification

Executive Summary

After extensive analysis of the DSPex variables concept, DSPy architecture, and various implementation approaches, this document presents the recommended strategy for implementing the revolutionary generalized variables system in DSPex. The recommendation is a hybrid approach with progressive native implementation, starting with a DSPy adapter layer and gradually building native Elixir components for optimal performance and innovation.

Core Innovation: Generalized Variables

What Makes DSPex Variables Revolutionary

  1. Cross-Module Optimization: Unlike DSPy where each module optimizes in isolation, DSPex variables are shared coordination points that can be optimized across multiple modules simultaneously.

  2. Module-Type Variables: Variables that can represent module choices themselves, enabling automatic selection between different implementations (e.g., choosing between GPT-4, Claude, or Gemini based on task performance).

  3. Consciousness-Ready Architecture: Forward-looking design with metadata tracking evolution stages, integration potential, and phi contribution (Integrated Information Theory).

  4. Universal Optimization Targets: Any parameter in the system can become a variable - from simple floats to complex module configurations.

Implementation Strategy: Three-Phase Hybrid Approach

Phase 1: DSPy Adapter Layer (Weeks 1-3)

Build a lightweight adapter that adds variables to existing DSPy modules without modifying DSPy core.

1.1 Variable Registry

defmodule DSPex.Variables.Registry do
  use GenServer
  
  @type variable_id :: String.t()
  @type variable_type :: :float | :integer | :choice | :module | :embedding
  
  defstruct [:variables, :observers, :optimizers, :history]
  
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def register(name, type, initial_value, opts \\ []) do
    GenServer.call(__MODULE__, {:register, name, type, initial_value, opts})
  end
  
  def update(variable_id, new_value, metadata \\ %{}) do
    GenServer.call(__MODULE__, {:update, variable_id, new_value, metadata})
  end
  
  def observe(variable_id, observer_pid) do
    GenServer.cast(__MODULE__, {:observe, variable_id, observer_pid})
  end
end

1.2 DSPy Variable Adapter

Create Python-side adapter that injects variables into DSPy modules:

# snakepit/priv/python/dspex_variables.py
class VariableAdapter:
    """Adds variable awareness to DSPy modules"""
    
    def __init__(self, elixir_bridge):
        self.bridge = elixir_bridge
        self.variable_cache = {}
    
    def inject_variables(self, module, variable_mappings):
        """Inject variable values into DSPy module parameters"""
        for param_path, variable_id in variable_mappings.items():
            value = self._get_variable_value(variable_id)
            self._set_nested_attr(module, param_path, value)
        return module
    
    def wrap_module(self, module_class, variable_specs):
        """Create variable-aware wrapper for DSPy module"""
        class VariableAwareModule(module_class):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, **kwargs)
                self._variable_specs = variable_specs
                self._variable_adapter = VariableAdapter(elixir_bridge)
            
            def forward(self, *args, **kwargs):
                # Apply current variable values
                self._variable_adapter.inject_variables(self, self._variable_specs)
                # Track variable usage
                self._track_variable_usage()
                # Execute original forward
                result = super().forward(*args, **kwargs)
                # Report variable impact
                self._report_variable_impact(result)
                return result
        
        return VariableAwareModule

1.3 Elixir Integration Layer

defmodule DSPex.Variables.DSPyBridge do
  @moduledoc """
  Bridge between Elixir variables and Python DSPy modules
  """
  
  def create_variable_aware_module(module_type, signature, variable_specs) do
    # Create module with variable injection points
    {:ok, module_id} = Snakepit.Python.call(:runtime, """
    from dspex_variables import VariableAdapter
    import dspy
    
    # Get module class
    module_class = getattr(dspy, '#{module_type}')
    
    # Create adapter
    adapter = VariableAdapter(bridge)
    
    # Create wrapped class
    VariableAwareClass = adapter.wrap_module(module_class, #{inspect(variable_specs)})
    
    # Instantiate
    module = VariableAwareClass('#{signature}')
    
    module
    """, store_as: ID.generate("variable_aware_module"))
    
    {:ok, module_id}
  end
  
  def execute_with_variables(module_id, inputs, variable_values) do
    # Update variables in registry
    Enum.each(variable_values, fn {var_id, value} ->
      DSPex.Variables.Registry.update(var_id, value)
    end)
    
    # Execute module
    {:ok, result} = Snakepit.Python.call(:runtime, """
    stored.#{module_id}(**#{inspect(inputs)})
    """)
    
    # Extract variable feedback
    feedback = extract_variable_feedback(result)
    
    {:ok, result, feedback}
  end
end

Phase 2: Native Core Components (Weeks 4-6)

Build essential native components that DSPy lacks.

2.1 Native Evaluation Framework

defmodule DSPex.Native.Evaluation do
  @moduledoc """
  Native evaluation engine with variable impact tracking
  """
  
  defstruct [:metrics, :variable_impacts, :traces]
  
  def evaluate_with_variables(program, dataset, variables, opts \\ []) do
    results = dataset
    |> Task.async_stream(fn example ->
      evaluate_example(program, example, variables, opts)
    end, max_concurrency: opts[:max_concurrency] || 4)
    |> Enum.map(fn {:ok, result} -> result end)
    
    %__MODULE__{
      metrics: aggregate_metrics(results),
      variable_impacts: calculate_variable_impacts(results, variables),
      traces: if(opts[:collect_traces], do: extract_traces(results), else: nil)
    }
  end
  
  defp evaluate_example(program, example, variables, opts) do
    # Start trace
    trace = DSPex.Trace.start()
    
    # Apply variables
    configured_program = apply_variables(program, variables)
    
    # Execute with trace
    {result, trace} = DSPex.Trace.capture(trace, fn ->
      execute_program(configured_program, example)
    end)
    
    # Calculate metrics
    metrics = calculate_metrics(result, example, opts[:metrics] || [:accuracy])
    
    # Extract variable attribution
    variable_attribution = attribute_to_variables(trace, variables)
    
    %{
      example: example,
      result: result,
      metrics: metrics,
      trace: trace,
      variable_attribution: variable_attribution
    }
  end
  
  defp calculate_variable_impacts(results, variables) do
    # Aggregate impact of each variable on metrics
    Enum.reduce(results, %{}, fn result, acc ->
      Enum.reduce(result.variable_attribution, acc, fn {var_id, impact}, acc2 ->
        Map.update(acc2, var_id, [impact], &[impact | &1])
      end)
    end)
    |> Enum.map(fn {var_id, impacts} ->
      {var_id, %{
        mean_impact: Statistics.mean(impacts),
        std_dev: Statistics.standard_deviation(impacts),
        samples: length(impacts)
      }}
    end)
    |> Map.new()
  end
end

2.2 Native Variable Types

defmodule DSPex.Variables.Types do
  @moduledoc """
  Native variable type system with consciousness metadata
  """
  
  defmodule Type do
    @callback validate(value :: any()) :: {:ok, any()} | {:error, String.t()}
    @callback cast(value :: any()) :: {:ok, any()} | {:error, String.t()}
    @callback constraints() :: keyword()
    @callback consciousness_metadata() :: map()
  end
  
  defmodule Float do
    @behaviour Type
    
    def validate(value) when is_float(value), do: {:ok, value}
    def validate(value) when is_integer(value), do: {:ok, value * 1.0}
    def validate(_), do: {:error, "must be a float"}
    
    def cast(value) when is_binary(value) do
      case Float.parse(value) do
        {float, ""} -> {:ok, float}
        _ -> {:error, "cannot cast to float"}
      end
    end
    def cast(value), do: validate(value)
    
    def constraints, do: [min: :float, max: :float, step: :float]
    
    def consciousness_metadata do
      %{
        integration_potential: 0.7,
        can_become_agent: false,
        evolution_stage: :static,
        phi_contribution: 0.1,
        description: "Continuous parameter for fine-tuning"
      }
    end
  end
  
  defmodule Module do
    @behaviour Type
    
    def validate(value) when is_atom(value), do: {:ok, value}
    def validate(value) when is_binary(value), do: {:ok, String.to_atom(value)}
    def validate(_), do: {:error, "must be a module identifier"}
    
    def cast(value), do: validate(value)
    
    def constraints, do: [choices: :list]
    
    def consciousness_metadata do
      %{
        integration_potential: 0.95,
        can_become_agent: true,
        evolution_stage: :intelligent,
        phi_contribution: 0.8,
        description: "Module selection enables cognitive orchestration"
      }
    end
  end
end

2.3 Native Trace System

defmodule DSPex.Trace do
  @moduledoc """
  Execution trace system with variable attribution
  """
  
  defstruct [
    :id,
    :start_time,
    :events,
    :variable_uses,
    :decision_points,
    :module_calls
  ]
  
  def start do
    %__MODULE__{
      id: ID.generate("trace"),
      start_time: System.monotonic_time(),
      events: [],
      variable_uses: %{},
      decision_points: [],
      module_calls: []
    }
  end
  
  def capture(trace, fun) do
    Process.put(:dspex_trace, trace)
    result = fun.()
    final_trace = Process.get(:dspex_trace)
    Process.delete(:dspex_trace)
    {result, final_trace}
  end
  
  def record_variable_use(variable_id, value, context) do
    case Process.get(:dspex_trace) do
      %__MODULE__{} = trace ->
        event = %{
          type: :variable_use,
          variable_id: variable_id,
          value: value,
          context: context,
          timestamp: System.monotonic_time()
        }
        
        updated_trace = %{trace |
          events: [event | trace.events],
          variable_uses: Map.update(trace.variable_uses, variable_id, [event], &[event | &1])
        }
        
        Process.put(:dspex_trace, updated_trace)
        
      nil ->
        # No active trace
        :ok
    end
  end
  
  def record_decision(decision_type, chosen, alternatives, metadata \\ %{}) do
    case Process.get(:dspex_trace) do
      %__MODULE__{} = trace ->
        decision = %{
          type: decision_type,
          chosen: chosen,
          alternatives: alternatives,
          metadata: metadata,
          timestamp: System.monotonic_time()
        }
        
        updated_trace = %{trace |
          events: [{:decision, decision} | trace.events],
          decision_points: [decision | trace.decision_points]
        }
        
        Process.put(:dspex_trace, updated_trace)
        
      nil ->
        :ok
    end
  end
end

Phase 3: Advanced Native Features (Weeks 7-10)

3.1 Native Variable-Aware Optimizers

defmodule DSPex.Native.Optimizers.VariableOptimizer do
  @moduledoc """
  Base optimizer for variable-aware optimization
  """
  
  defstruct [:strategy, :metrics, :constraints, :history]
  
  def optimize(program, dataset, variables, opts \\ []) do
    strategy = opts[:strategy] || DSPex.Native.Optimizers.SimulatedAnnealing
    
    initial_state = %{
      variables: variables,
      values: get_initial_values(variables),
      best_score: -:infinity,
      best_values: nil,
      iteration: 0
    }
    
    final_state = Enum.reduce_while(1..opts[:max_iterations], initial_state, fn i, state ->
      # Evaluate current configuration
      evaluation = DSPex.Native.Evaluation.evaluate_with_variables(
        program,
        dataset,
        apply_values(state.variables, state.values),
        opts
      )
      
      score = calculate_score(evaluation, opts[:metrics])
      
      # Update best if improved
      state = if score > state.best_score do
        %{state | best_score: score, best_values: state.values}
      else
        state
      end
      
      # Generate next values using strategy
      next_values = strategy.next_values(
        state.values,
        evaluation.variable_impacts,
        temperature(i, opts[:max_iterations])
      )
      
      # Check convergence
      if converged?(state, opts) do
        {:halt, state}
      else
        {:cont, %{state | values: next_values, iteration: i}}
      end
    end)
    
    {:ok, final_state.best_values, final_state.best_score}
  end
  
  defp temperature(iteration, max_iterations) do
    # Simulated annealing temperature schedule
    initial_temp = 1.0
    final_temp = 0.01
    
    initial_temp * :math.pow(final_temp / initial_temp, iteration / max_iterations)
  end
end

3.2 SIMBA-GV (SIMBA for Generalized Variables)

defmodule DSPex.Native.Optimizers.SIMBA_GV do
  @moduledoc """
  SIMBA adapted for generalized variable optimization
  """
  
  alias DSPex.Native.{Evaluation, Variables}
  
  def optimize(modules, dataset, variables, opts \\ []) do
    # Phase 1: Sample for variable coverage
    samples = sample_for_variables(dataset, variables, opts)
    
    # Phase 2: Initialize variable values
    initial_values = initialize_variables(modules, samples, variables)
    
    # Phase 3: Mutate and bootstrap
    bootstrapped = bootstrap_with_mutations(
      modules,
      samples,
      variables,
      initial_values,
      opts
    )
    
    # Phase 4: Amplify successful patterns
    amplified = amplify_patterns(bootstrapped, modules, dataset, variables)
    
    {:ok, amplified}
  end
  
  defp sample_for_variables(dataset, variables, opts) do
    # Smart sampling that ensures variable space coverage
    module_vars = Enum.filter(variables, &(&1.type == :module))
    continuous_vars = Enum.filter(variables, &(&1.type in [:float, :integer]))
    
    # For module variables, ensure we sample data that works well with each option
    module_samples = if length(module_vars) > 0 do
      Enum.flat_map(module_vars, fn var ->
        Enum.flat_map(var.constraints.choices, fn choice ->
          sample_for_module_choice(dataset, var, choice, opts)
        end)
      end)
    else
      []
    end
    
    # For continuous variables, sample across the range
    continuous_samples = if length(continuous_vars) > 0 do
      sample_across_ranges(dataset, continuous_vars, opts)
    else
      []
    end
    
    Enum.uniq(module_samples ++ continuous_samples)
  end
  
  defp bootstrap_with_mutations(modules, samples, variables, initial_values, opts) do
    # Generate mutations of variable values
    mutations = generate_mutations(initial_values, variables, opts[:mutation_count] || 10)
    
    # Evaluate each mutation across all modules
    evaluated_mutations = Enum.map(mutations, fn mutation ->
      scores = Enum.map(modules, fn module ->
        configured = apply_variables(module, mutation)
        evaluation = Evaluation.evaluate_with_variables(
          configured,
          samples,
          variables,
          opts
        )
        {module.id, evaluation.metrics}
      end)
      
      {mutation, aggregate_cross_module_score(scores)}
    end)
    
    # Select best mutations for bootstrapping
    evaluated_mutations
    |> Enum.sort_by(fn {_, score} -> score end, :desc)
    |> Enum.take(opts[:bootstrap_count] || 5)
    |> Enum.map(fn {mutation, _} -> mutation end)
  end
  
  defp amplify_patterns(bootstrapped_values, modules, dataset, variables) do
    # Find patterns in successful configurations
    patterns = extract_variable_patterns(bootstrapped_values, variables)
    
    # Generate new configurations based on patterns
    amplified = Enum.flat_map(patterns, fn pattern ->
      generate_from_pattern(pattern, variables, 5)
    end)
    
    # Final evaluation and selection
    all_configs = bootstrapped_values ++ amplified
    
    evaluated = Enum.map(all_configs, fn config ->
      score = evaluate_configuration(config, modules, dataset, variables)
      {config, score}
    end)
    
    evaluated
    |> Enum.sort_by(fn {_, score} -> score end, :desc)
    |> List.first()
    |> elem(0)
  end
end

Implementation Decision: Hybrid with Progressive Native

  1. Start Hybrid: Begin with DSPy adapter layer to get variables working quickly
  2. Build Native Core: Implement evaluation, tracing, and basic optimization natively
  3. Extend Gradually: Add advanced features like SIMBA-GV and consciousness metadata
  4. Maintain Compatibility: Keep DSPy bridge for ecosystem compatibility

Why Not Pure Native?

  • Time to Market: Would take 3-6 months to rebuild everything
  • Ecosystem Loss: Lose access to DSPy modules and optimizers
  • Maintenance Burden: Need to maintain all optimizers and modules

Why Not Pure DSPy Extension?

  • Architectural Limitations: DSPy’s module-centric design conflicts with variables
  • Performance: Cross-language variable updates would be slow
  • Innovation Constraints: Can’t implement advanced features like consciousness metadata

Key Innovation Points

1. Module-Type Variables

The ability to have variables that select between modules is revolutionary:

reasoning_var = Variable.create(:reasoning_approach,
  type: :module,
  choices: [Predict, ChainOfThought, ReAct, ProgramOfThought],
  metadata: %{affects: "reasoning style and depth"}
)

# System automatically selects best reasoning approach

2. Cross-Module Optimization

Variables optimize across module boundaries:

temperature_var = Variable.create(:temperature,
  type: :float,
  range: {0.0, 2.0},
  shared_by: [:ideation_module, :refinement_module, :critique_module]
)

# One temperature optimized for entire pipeline coherence

3. Consciousness-Ready Architecture

Every variable includes metadata for future cognitive capabilities:

%{
  integration_potential: 0.95,  # How well it integrates
  can_become_agent: true,       # Can evolve to make decisions
  evolution_stage: :intelligent, # Current evolution level
  phi_contribution: 0.8         # Consciousness contribution
}

Development Timeline

Weeks 1-3: Foundation

  • Variable registry and type system
  • DSPy adapter layer
  • Basic variable injection

Weeks 4-6: Native Core

  • Native evaluation framework
  • Trace system with attribution
  • Basic native optimizer

Weeks 7-9: Advanced Features

  • SIMBA-GV implementation
  • Module-type variables
  • Cross-module optimization

Weeks 10-12: Polish & Integration

  • Performance optimization
  • Consciousness metadata tracking
  • Documentation and examples

Success Metrics

  1. Functional: Variables work across modules
  2. Performance: <100ms overhead per variable update
  3. Optimization: 20%+ improvement over module-isolated optimization
  4. Compatibility: All DSPy modules work with variables
  5. Innovation: First system with module-type variables

Conclusion

The hybrid approach with progressive native implementation provides the best balance of:

  • Speed to market (working system in 3 weeks)
  • Innovation potential (native components enable new features)
  • Ecosystem compatibility (maintain DSPy integration)
  • Performance (native evaluation and optimization)

This strategy positions DSPex as the first system to truly implement generalized variables for language model programming, while maintaining practical compatibility with the existing ecosystem.