DSPEx Implementation Roadmap
Date: 2025-07-11
Status: Implementation Planning
Timeline: 12 weeks (Q3 2025)
Target: Complete DSPEx interface on simplified Foundation
Implementation Overview
This roadmap provides concrete, step-by-step implementation instructions to build the DSPEx interface layer on our simplified Foundation/Jido integration. Each milestone includes specific code deliverables, test requirements, and success criteria.
Pre-Implementation Setup
Prerequisites Checklist
- ✅ Foundation infrastructure stable (271+ tests passing)
- 🔄 Foundation/Jido simplification complete (1,720+ line reduction)
- ✅ ElixirML Schema Engine operational
- ✅ MABEAM multi-agent system functional
Development Environment Setup
# Clone and setup development environment
git clone https://github.com/your-org/elixir_ml.git
cd elixir_ml
# Install dependencies
mix deps.get
mix deps.compile
# Run full test suite (should pass)
mix test
# Verify Foundation services
mix test test/foundation/
mix test test/jido_system/
mix test test/elixir_ml/
Phase 1: Core DSPEx Infrastructure (Weeks 1-4)
Week 1: DSPEx.Program Foundation
Milestone 1.1: Basic Program Module
Files to Create:
lib/dspex/
├── program.ex # Core DSPEx.Program module
├── schema.ex # Extended schema integration
├── program/
│ ├── behaviour.ex # Program behaviour definition
│ ├── registry.ex # Program registry
│ └── supervisor.ex # Program supervision
test/dspex/
├── program_test.exs # Core program tests
└── integration/
└── basic_program_test.exs # Integration tests
Implementation:
# lib/dspex/program.ex
defmodule DSPEx.Program do
@moduledoc """
Core DSPEx program interface providing ML-native development experience
"""
@callback predict(input :: map()) :: {:ok, term()} | {:error, term()}
@callback to_agent_system(opts :: keyword()) :: {:ok, pid()} | {:error, term()}
defmacro __using__(opts \\ []) do
quote location: :keep do
@behaviour DSPEx.Program
@dspex_opts unquote(opts)
# Import DSPEx conveniences
import DSPEx.Schema, only: [defschema: 2]
import DSPEx.Program.Helpers
# Initialize program metadata
Module.register_attribute(__MODULE__, :dspex_variables, accumulate: true)
Module.register_attribute(__MODULE__, :dspex_skills, accumulate: true)
Module.register_attribute(__MODULE__, :dspex_schemas, accumulate: true)
@before_compile DSPEx.Program
# Default implementations
def to_agent_system(opts \\ []) do
DSPEx.MABEAM.convert_program(__MODULE__, opts)
end
defoverridable to_agent_system: 1
end
end
defmacro __before_compile__(_env) do
quote do
# Extract and validate program metadata
def __dspex_metadata__ do
%{
name: @dspex_opts[:name] || Atom.to_string(__MODULE__),
description: @dspex_opts[:description] || "",
version: @dspex_opts[:version] || "1.0.0",
variables: @dspex_variables,
skills: @dspex_skills,
schemas: @dspex_schemas
}
end
def __dspex_variables__, do: @dspex_variables
def __dspex_skills__, do: @dspex_skills
def __dspex_schemas__, do: @dspex_schemas
end
end
end
Tests:
# test/dspex/program_test.exs
defmodule DSPEx.ProgramTest do
use ExUnit.Case, async: true
defmodule SimpleProgram do
use DSPEx.Program,
name: "simple_test_program",
description: "Test program for unit tests"
defschema InputSchema do
field :text, :string, required: true
field :temperature, :probability, default: 0.7, variable: true
end
def predict(input) do
{:ok, %{result: "processed: #{input.text}"}}
end
end
test "program metadata extraction" do
metadata = SimpleProgram.__dspex_metadata__()
assert metadata.name == "simple_test_program"
assert metadata.description == "Test program for unit tests"
assert metadata.version == "1.0.0"
assert length(metadata.variables) == 1
end
test "basic program execution" do
input = %{text: "hello world", temperature: 0.5}
assert {:ok, result} = SimpleProgram.predict(input)
assert result.result == "processed: hello world"
end
test "agent system conversion" do
assert {:ok, _pid} = SimpleProgram.to_agent_system()
end
end
Milestone 1.2: Schema Integration
Implementation:
# lib/dspex/schema.ex
defmodule DSPEx.Schema do
@moduledoc """
DSPEx-specific schema extensions built on ElixirML.Schema
"""
defmacro __using__(opts \\ []) do
quote do
use ElixirML.Schema, unquote(opts)
import DSPEx.Schema.DSL
import DSPEx.Schema.Variables
end
end
defmacro defschema(name, do: block) do
quote do
# Store schema metadata for DSPEx
schema_info = %{
name: unquote(name),
fields: [],
variables: []
}
@dspex_schemas schema_info
# Use ElixirML.Schema for core functionality
use ElixirML.Schema
defschema unquote(name) do
unquote(block)
end
end
end
end
# lib/dspex/schema/variables.ex
defmodule DSPEx.Schema.Variables do
@moduledoc """
Automatic variable extraction from DSPEx schemas
"""
def extract_variables(program_module) do
program_module.__dspex_variables__()
|> Enum.reduce(ElixirML.Variable.Space.new(), fn variable_spec, space ->
variable = build_variable(variable_spec)
ElixirML.Variable.Space.add_variable(space, variable)
end)
end
defp build_variable({name, type, opts}) do
case type do
:probability ->
ElixirML.Variable.float(name,
range: {0.0, 1.0},
default: opts[:default] || 0.5
)
:integer when opts[:range] ->
ElixirML.Variable.integer(name,
range: opts[:range],
default: opts[:default]
)
_ when opts[:choices] ->
ElixirML.Variable.choice(name,
opts[:choices],
default: opts[:default]
)
_ ->
ElixirML.Variable.composite(name,
type: type,
default: opts[:default]
)
end
end
end
Milestone 1.3: Program Registry
Implementation:
# lib/dspex/program/registry.ex
defmodule DSPEx.Program.Registry do
@moduledoc """
Registry for active DSPEx programs with metadata
"""
use GenServer
require Logger
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def register_program(program_module, pid, metadata \\ %{}) do
GenServer.call(__MODULE__, {:register, program_module, pid, metadata})
end
def lookup_program(program_module) do
GenServer.call(__MODULE__, {:lookup, program_module})
end
def list_programs do
GenServer.call(__MODULE__, :list)
end
# GenServer implementation
def init(opts) do
# Use ETS for fast lookups
table = :ets.new(:dspex_programs, [:set, :named_table, :public])
{:ok, %{table: table, opts: opts}}
end
def handle_call({:register, program_module, pid, metadata}, _from, state) do
program_info = %{
module: program_module,
pid: pid,
metadata: Map.merge(program_module.__dspex_metadata__(), metadata),
registered_at: DateTime.utc_now()
}
:ets.insert(:dspex_programs, {program_module, program_info})
Logger.info("Registered DSPEx program: #{program_module}")
{:reply, :ok, state}
end
def handle_call({:lookup, program_module}, _from, state) do
case :ets.lookup(:dspex_programs, program_module) do
[{^program_module, program_info}] -> {:reply, {:ok, program_info}, state}
[] -> {:reply, {:error, :not_found}, state}
end
end
def handle_call(:list, _from, state) do
programs = :ets.tab2list(:dspex_programs)
{:reply, programs, state}
end
end
Week 2: DSPEx.Optimizer Implementation
Milestone 2.1: Universal Optimization Interface
Files to Create:
lib/dspex/
├── optimizer.ex # Core optimization interface
├── optimizer/
│ ├── simba.ex # SIMBA integration
│ ├── multi_agent.ex # Multi-agent optimization
│ └── strategies.ex # Optimization strategies
test/dspex/
└── optimizer_test.exs # Optimization tests
Implementation:
# lib/dspex/optimizer.ex
defmodule DSPEx.Optimizer do
@moduledoc """
Universal optimization interface for DSPEx programs
"""
require Logger
alias DSPEx.Schema.Variables
@doc """
Optimize a DSPEx program with training data
"""
def optimize(program_module, training_data, opts \\ []) do
Logger.info("Starting optimization for #{program_module}")
# Extract variable space
variable_space = Variables.extract_variables(program_module)
if ElixirML.Variable.Space.empty?(variable_space) do
Logger.warning("No variables found for optimization in #{program_module}")
{:error, :no_variables}
else
# Select and run optimizer
optimizer = opts[:optimizer] || :simba
run_optimization(optimizer, program_module, variable_space, training_data, opts)
end
end
defp run_optimization(:simba, program_module, variable_space, training_data, opts) do
DSPEx.Optimizer.SIMBA.optimize(program_module, variable_space, training_data, opts)
end
defp run_optimization(:multi_agent, program_module, variable_space, training_data, opts) do
DSPEx.Optimizer.MultiAgent.optimize(program_module, variable_space, training_data, opts)
end
defp run_optimization(optimizer, _program_module, _variable_space, _training_data, _opts) do
{:error, {:unknown_optimizer, optimizer}}
end
end
Milestone 2.2: SIMBA Integration
Implementation:
# lib/dspex/optimizer/simba.ex
defmodule DSPEx.Optimizer.SIMBA do
@moduledoc """
SIMBA optimization strategy for DSPEx programs
"""
alias ElixirML.Variable.Space
def optimize(program_module, variable_space, training_data, opts) do
# Create evaluation function
eval_fn = create_evaluation_function(program_module, training_data)
# Configure SIMBA
config = %{
population_size: opts[:population_size] || 50,
generations: opts[:generations] || 100,
mutation_rate: opts[:mutation_rate] || 0.1,
crossover_rate: opts[:crossover_rate] || 0.8,
early_stopping: opts[:early_stopping] || %{patience: 10, min_delta: 0.001}
}
# Run optimization
case ElixirML.Teleprompter.SIMBA.optimize(variable_space, eval_fn, config) do
{:ok, optimized_config} ->
# Create optimized program version
create_optimized_program(program_module, optimized_config)
{:error, reason} ->
{:error, reason}
end
end
defp create_evaluation_function(program_module, training_data) do
fn configuration ->
scores =
Enum.map(training_data, fn %{input: input, expected: expected} ->
# Apply configuration to input
configured_input = Map.merge(input, configuration)
# Run program
case program_module.predict(configured_input) do
{:ok, result} ->
calculate_score(result, expected)
{:error, _} ->
0.0 # Penalty for errors
end
end)
# Return average score
Enum.sum(scores) / length(scores)
end
end
defp calculate_score(result, expected) do
# Simple similarity score - can be enhanced with domain-specific metrics
case {result, expected} do
{%{answer: result_text}, %{answer: expected_text}} ->
string_similarity(result_text, expected_text)
_ ->
0.5 # Default score for incompatible formats
end
end
defp create_optimized_program(program_module, optimized_config) do
# Create a new module with optimized defaults
optimized_module_name = Module.concat(program_module, Optimized)
# This would typically involve code generation
# For now, return the configuration
{:ok, %{
original_module: program_module,
optimized_config: optimized_config,
optimization_metadata: %{
optimized_at: DateTime.utc_now(),
optimizer: :simba
}
}}
end
end
Week 3: MABEAM Integration
Milestone 3.1: Program to Agent Conversion
Files to Create:
lib/dspex/
├── mabeam.ex # MABEAM integration
├── agents/
│ ├── program_agent.ex # Program execution agent
│ ├── optimizer_agent.ex # Optimization agent
│ └── coordinator_agent.ex # Coordination agent
test/dspex/
└── mabeam_test.exs # MABEAM integration tests
Implementation:
# lib/dspex/mabeam.ex
defmodule DSPEx.MABEAM do
@moduledoc """
Convert DSPEx programs into multi-agent systems using Foundation MABEAM
"""
require Logger
alias ElixirML.MABEAM
def convert_program(program_module, opts \\ []) do
Logger.info("Converting #{program_module} to multi-agent system")
# Analyze program for agent requirements
analysis = analyze_program(program_module)
# Create agent specifications
agent_specs = create_agent_specs(program_module, analysis, opts)
# Initialize MABEAM system
case MABEAM.create_agent_system(agent_specs) do
{:ok, system} ->
# Register with DSPEx registry
DSPEx.Program.Registry.register_program(
program_module,
system.coordinator_pid,
%{type: :multi_agent, agents: length(agent_specs)}
)
{:ok, system}
{:error, reason} ->
Logger.error("Failed to convert #{program_module} to agents: #{inspect(reason)}")
{:error, reason}
end
end
defp analyze_program(program_module) do
metadata = program_module.__dspex_metadata__()
%{
complexity: estimate_complexity(metadata),
has_variables: length(metadata.variables) > 0,
has_skills: length(metadata.skills) > 0,
requires_optimization: length(metadata.variables) > 5,
requires_coordination: metadata.complexity > :simple
}
end
defp create_agent_specs(program_module, analysis, opts) do
# Always include program execution agent
base_specs = [
{:program_executor, DSPEx.Agents.ProgramAgent, %{
program_module: program_module
}}
]
# Add optimizer agent if needed
specs = if analysis.has_variables do
base_specs ++ [
{:optimizer, DSPEx.Agents.OptimizerAgent, %{
strategy: opts[:optimization_strategy] || :simba
}}
]
else
base_specs
end
# Add coordinator if complex
if analysis.requires_coordination do
specs ++ [
{:coordinator, DSPEx.Agents.CoordinatorAgent, %{
coordination_strategy: opts[:coordination_strategy] || :simple
}}
]
else
specs
end
end
defp estimate_complexity(metadata) do
score = 0
score = score + length(metadata.variables) * 2
score = score + length(metadata.skills) * 3
score = score + if String.contains?(metadata.description, "complex"), do: 5, else: 0
cond do
score < 5 -> :simple
score < 15 -> :moderate
true -> :complex
end
end
end
Milestone 3.2: Specialized DSPEx Agents
Implementation:
# lib/dspex/agents/program_agent.ex
defmodule DSPEx.Agents.ProgramAgent do
@moduledoc """
Agent that executes DSPEx program logic with caching and optimization
"""
use JidoSystem.Agents.FoundationAgent,
name: "dspex_program_agent",
description: "Executes DSPEx programs with automatic optimization and caching",
actions: [
DSPEx.Actions.ExecuteProgram,
DSPEx.Actions.CacheResult,
DSPEx.Actions.UpdateConfiguration,
DSPEx.Actions.ValidateInput
],
schema: [
program_module: [type: :atom, required: true],
current_config: [type: :map, default: %{}],
result_cache: [type: :map, default: %{}],
execution_stats: [type: :map, default: %{
total_executions: 0,
cache_hits: 0,
average_time: 0,
success_rate: 1.0
}]
]
require Logger
def handle_execute_program(agent, %{input: input} = params) do
start_time = System.monotonic_time(:microsecond)
# Apply current configuration
configured_input = Map.merge(input, agent.state.current_config)
# Check cache first
cache_key = generate_cache_key(configured_input)
case Map.get(agent.state.result_cache, cache_key) do
nil ->
# Execute program
case execute_with_telemetry(agent, configured_input) do
{:ok, result} ->
execution_time = System.monotonic_time(:microsecond) - start_time
# Update cache and stats
new_state = agent.state
|> update_cache(cache_key, result)
|> update_stats(:success, execution_time)
{:ok, result, %{agent | state: new_state}}
{:error, reason} ->
new_state = update_stats(agent.state, :error, 0)
{:error, reason, %{agent | state: new_state}}
end
cached_result ->
# Cache hit
new_state = update_stats(agent.state, :cache_hit, 0)
{:ok, cached_result, %{agent | state: new_state}}
end
end
defp execute_with_telemetry(agent, input) do
:telemetry.execute([:dspex, :program, :execution, :start], %{}, %{
program: agent.state.program_module,
agent_id: agent.id
})
try do
result = apply(agent.state.program_module, :predict, [input])
:telemetry.execute([:dspex, :program, :execution, :success], %{}, %{
program: agent.state.program_module,
agent_id: agent.id
})
result
rescue
e ->
:telemetry.execute([:dspex, :program, :execution, :error], %{}, %{
program: agent.state.program_module,
agent_id: agent.id,
error: inspect(e)
})
{:error, {:execution_error, e}}
end
end
defp update_cache(state, key, result) do
# Simple LRU cache with size limit
max_cache_size = 1000
new_cache = Map.put(state.result_cache, key, result)
if map_size(new_cache) > max_cache_size do
# Remove oldest entries (simple implementation)
trimmed_cache =
new_cache
|> Enum.take(max_cache_size)
|> Map.new()
%{state | result_cache: trimmed_cache}
else
%{state | result_cache: new_cache}
end
end
defp update_stats(state, result_type, execution_time) do
stats = state.execution_stats
new_stats = case result_type do
:success ->
total = stats.total_executions + 1
avg_time = (stats.average_time * stats.total_executions + execution_time) / total
%{stats |
total_executions: total,
average_time: avg_time,
success_rate: stats.total_executions / total
}
:cache_hit ->
%{stats | cache_hits: stats.cache_hits + 1}
:error ->
total = stats.total_executions + 1
success_count = round(stats.success_rate * stats.total_executions)
%{stats |
total_executions: total,
success_rate: success_count / total
}
end
%{state | execution_stats: new_stats}
end
defp generate_cache_key(input) do
# Generate deterministic hash of input
:crypto.hash(:sha256, :erlang.term_to_binary(input))
|> Base.encode16()
end
end
Week 4: Skills and Actions System
Milestone 4.1: Skills Framework
Files to Create:
lib/dspex/
├── skills.ex # Skills framework
├── skills/
│ ├── web_search.ex # Web search skill
│ ├── reasoning.ex # Reasoning skill
│ └── embeddings.ex # Embeddings skill
└── actions/
├── execute_program.ex # Program execution action
├── optimize_parameters.ex # Parameter optimization action
└── cache_result.ex # Result caching action
Implementation:
# lib/dspex/skills/web_search.ex
defmodule DSPEx.Skills.WebSearch do
@moduledoc """
Web search capabilities with multiple providers and semantic enhancement
"""
require Logger
def search(query, opts \\ []) do
provider = opts[:provider] || :duckduckgo
max_results = opts[:max_results] || 5
Logger.debug("Searching for: #{query} using #{provider}")
case search_with_provider(provider, query, opts) do
{:ok, results} ->
# Enhance with semantic ranking if embedding available
enhanced_results = if opts[:semantic_ranking] do
rank_semantically(query, results, opts)
else
results
end
{:ok, Enum.take(enhanced_results, max_results)}
{:error, reason} ->
Logger.error("Search failed: #{inspect(reason)}")
{:error, reason}
end
end
defp search_with_provider(:duckduckgo, query, _opts) do
# Use Foundation's HTTP client with circuit breaker
url = "https://api.duckduckgo.com/instant-answer"
case Foundation.Services.HTTPClient.get(url, [
{"q", query},
{"format", "json"},
{"no_html", "1"}
]) do
{:ok, response} ->
results = parse_duckduckgo_response(response.body)
{:ok, results}
{:error, reason} ->
{:error, {:search_provider_error, reason}}
end
end
defp rank_semantically(query, results, opts) do
case DSPEx.Skills.Embeddings.embed(query) do
{:ok, query_embedding} ->
# Calculate semantic similarity
scored_results =
Enum.map(results, fn result ->
case DSPEx.Skills.Embeddings.embed(result.snippet) do
{:ok, result_embedding} ->
similarity = cosine_similarity(query_embedding, result_embedding)
%{result | semantic_score: similarity}
{:error, _} ->
%{result | semantic_score: 0.0}
end
end)
# Sort by semantic score
Enum.sort_by(scored_results, & &1.semantic_score, :desc)
{:error, _} ->
results
end
end
end
Phase 2: Advanced Features (Weeks 5-8)
Week 5: Production Features
Milestone 5.1: Monitoring and Telemetry
Files to Create:
lib/dspex/
├── telemetry.ex # Telemetry setup
├── monitoring/
│ ├── dashboard.ex # LiveView dashboard
│ ├── metrics.ex # Custom metrics
│ └── alerts.ex # Alert system
└── production/
├── deployment.ex # Deployment helpers
└── scaling.ex # Auto-scaling
Week 6: Cost Optimization
Milestone 6.1: Cost Tracking and Optimization
Implementation includes:
- Token usage tracking across all LLM calls
- Cost calculation per model and provider
- Optimization suggestions based on usage patterns
- Budget alerts and automatic throttling
Week 7: Advanced Optimization
Milestone 7.1: Multi-Agent Optimization
Features:
- Distributed optimization across agent teams
- Multi-objective optimization (accuracy, cost, latency)
- Advanced optimization strategies (BEACON, genetic algorithms)
Week 8: Integration Testing
Milestone 8.1: End-to-End Testing
Test Suite:
- Complete DSPEx workflow testing
- Performance benchmarking
- Load testing for multi-agent scenarios
- Integration with external services
Phase 3: Ecosystem Development (Weeks 9-12)
Week 9: Skills Marketplace
Milestone 9.1: Community Skills System
Features:
- Skill packaging and distribution
- Version management and dependencies
- Quality assurance and security scanning
- Community contribution guidelines
Week 10: Developer Experience
Milestone 10.1: Development Tools
Deliverables:
- Interactive development environment
- Debugging and profiling tools
- Performance optimization recommendations
- Comprehensive documentation
Week 11: Enterprise Features
Milestone 11.1: Enterprise Readiness
Features:
- Multi-tenant architecture
- Advanced security and compliance
- Enterprise SSO integration
- SLA monitoring and guarantees
Week 12: Documentation and Launch
Milestone 12.1: Launch Preparation
Deliverables:
- Complete documentation suite
- Video tutorials and workshops
- Conference presentations
- Community engagement plan
Testing Strategy
Unit Testing
- Every module has comprehensive unit tests
- Property-based testing for complex logic
- Mock external dependencies
Integration Testing
- End-to-end workflow testing
- Foundation service integration
- External API integration
Performance Testing
- Benchmark against baseline implementations
- Load testing for concurrent usage
- Memory and CPU profiling
Acceptance Testing
- User story validation
- Developer experience validation
- Production deployment validation
Quality Gates
Code Quality
- 90%+ test coverage maintained
- 0 Dialyzer warnings
- 0 Credo warnings
- Consistent code formatting
Performance
- <100ms response time for simple operations
- <1s response time for complex operations
- 99.9% uptime for production deployments
Documentation
- 100% function documentation
- Complete user guides
- Architecture documentation
- API documentation
Deployment Strategy
Development Environment
- Local development with hot reloading
- Automated testing on every commit
- Integration testing with staging Foundation
Staging Environment
- Production-like environment
- End-to-end testing
- Performance validation
- Security testing
Production Environment
- Blue-green deployment
- Automated rollback capabilities
- Comprehensive monitoring
- Gradual feature rollout
This implementation roadmap provides concrete steps to build the complete DSPEx interface on our simplified Foundation infrastructure, delivering a production-grade ML platform that leverages BEAM’s unique strengths.