Looking at the comprehensive DSPy port documentation and the Foundation library, I can see how Foundation provides an excellent infrastructure base. However, the DSPEx port will need several specialized enhancements to Foundation to handle the unique requirements of language model programming. Here’s my analysis and recommended enhancements:
Foundation Enhancements for DSPEx Support
1. AI Provider Infrastructure
Foundation currently has general infrastructure protection, but DSPEx needs specialized AI provider management:
# New module: Foundation.AI
defmodule Foundation.AI do
@moduledoc """
AI provider management and configuration for Foundation.
"""
def configure_provider(provider_name, config) do
# Enhanced configuration for AI providers
Foundation.Config.update([:ai, :providers, provider_name], config)
end
def get_provider_config(provider_name) do
Foundation.Config.get([:ai, :providers, provider_name])
end
# Provider-specific circuit breaker and rate limiting
def create_provider_protection(provider_name, opts \\ []) do
Foundation.Infrastructure.configure_protection(
{:ai_provider, provider_name},
%{
circuit_breaker: %{
failure_threshold: opts[:failure_threshold] || 5,
recovery_time: opts[:recovery_time] || 30_000
},
rate_limiter: %{
scale: opts[:rate_window] || 60_000,
limit: opts[:rate_limit] || 100
}
}
)
end
end
2. Enhanced Correlation and Request Tracking
DSPEx needs sophisticated request correlation for complex multi-step AI workflows:
# Enhanced Foundation.Utils
defmodule Foundation.Utils do
# Existing functions...
@doc """
Generate hierarchical correlation IDs for nested AI operations.
"""
def generate_nested_correlation_id(parent_id \\ nil) do
base_id = generate_correlation_id()
case parent_id do
nil -> base_id
parent -> "#{parent}:#{String.slice(base_id, 0, 8)}"
end
end
@doc """
Create execution context for DSPEx operations.
"""
def create_execution_context(module, operation, opts \\ []) do
%{
correlation_id: Keyword.get(opts, :correlation_id) || generate_correlation_id(),
module: module,
operation: operation,
started_at: monotonic_timestamp(),
metadata: Keyword.get(opts, :metadata, %{})
}
end
end
3. Specialized Event Types for AI Operations
Foundation’s event system needs DSPEx-specific event types:
# Enhanced Foundation.Events
defmodule Foundation.Events do
# Existing functions...
@doc """
Create a language model request event.
"""
def new_lm_request_event(provider, model, prompt_data, opts \\ []) do
new_event(:lm_request, %{
provider: provider,
model: model,
prompt_tokens: estimate_tokens(prompt_data),
prompt_hash: hash_prompt(prompt_data)
}, opts)
end
@doc """
Create a language model response event.
"""
def new_lm_response_event(provider, model, response_data, request_event_id, opts \\ []) do
new_event(:lm_response, %{
provider: provider,
model: model,
completion_tokens: Map.get(response_data, :completion_tokens, 0),
total_tokens: Map.get(response_data, :total_tokens, 0),
cost_estimate: calculate_cost(response_data),
request_event_id: request_event_id
}, opts)
end
@doc """
Create a module execution event for DSPEx programs.
"""
def new_module_execution_event(module_type, signature, execution_data, opts \\ []) do
new_event(:module_execution, %{
module_type: module_type,
signature: signature,
input_fields: Map.keys(execution_data.inputs || %{}),
output_fields: Map.keys(execution_data.outputs || %{}),
duration_ms: execution_data.duration_ms,
success: execution_data.success
}, opts)
end
@doc """
Create an optimization iteration event.
"""
def new_optimization_event(optimizer_type, iteration_data, opts \\ []) do
new_event(:optimization_iteration, %{
optimizer_type: optimizer_type,
iteration: iteration_data.iteration,
candidate_count: iteration_data.candidate_count,
best_score: iteration_data.best_score,
improvement: iteration_data.improvement
}, opts)
end
# Private helper functions
defp estimate_tokens(prompt_data) do
# Simple estimation - could be enhanced
case prompt_data do
data when is_binary(data) -> div(String.length(data), 4)
data when is_list(data) ->
data |> Enum.map(&estimate_tokens/1) |> Enum.sum()
_ -> 0
end
end
defp hash_prompt(prompt_data) do
:crypto.hash(:sha256, :erlang.term_to_binary(prompt_data))
|> Base.encode16(case: :lower)
|> String.slice(0, 16)
end
defp calculate_cost(%{total_tokens: tokens, model: model}) when is_integer(tokens) do
# Model-specific cost calculation
base_cost_per_1k = get_model_cost(model)
(tokens / 1000) * base_cost_per_1k
end
defp calculate_cost(_), do: 0.0
defp get_model_cost("gpt-4"), do: 0.03
defp get_model_cost("gpt-3.5-turbo"), do: 0.002
defp get_model_cost(_), do: 0.01
end
4. AI-Specific Telemetry Metrics
Foundation’s telemetry needs enhancement for AI/ML observability:
# Enhanced Foundation.Telemetry
defmodule Foundation.Telemetry do
# Existing functions...
@doc """
Emit LM usage metrics.
"""
def emit_lm_usage(provider, model, usage_data, metadata \\ %{}) do
emit_gauge([:foundation, :ai, :tokens_used], usage_data.total_tokens,
Map.merge(metadata, %{provider: provider, model: model}))
emit_gauge([:foundation, :ai, :cost_usd], usage_data.cost,
Map.merge(metadata, %{provider: provider, model: model}))
emit_counter([:foundation, :ai, :requests],
Map.merge(metadata, %{provider: provider, model: model, status: :success}))
end
@doc """
Emit module performance metrics.
"""
def emit_module_performance(module_type, signature, duration_ms, success, metadata \\ %{}) do
emit_histogram([:foundation, :dspex, :module_duration], duration_ms,
Map.merge(metadata, %{module_type: module_type, signature: signature}))
status = if success, do: :success, else: :error
emit_counter([:foundation, :dspex, :module_executions],
Map.merge(metadata, %{module_type: module_type, signature: signature, status: status}))
end
@doc """
Emit optimization progress metrics.
"""
def emit_optimization_progress(optimizer_type, iteration, score, metadata \\ %{}) do
emit_gauge([:foundation, :dspex, :optimization_score], score,
Map.merge(metadata, %{optimizer_type: optimizer_type, iteration: iteration}))
emit_counter([:foundation, :dspex, :optimization_iterations],
Map.merge(metadata, %{optimizer_type: optimizer_type}))
end
@doc """
Emit cache performance metrics specific to AI operations.
"""
def emit_ai_cache_metrics(cache_type, operation, hit, metadata \\ %{}) do
status = if hit, do: :hit, else: :miss
emit_counter([:foundation, :ai, :cache, operation],
Map.merge(metadata, %{cache_type: cache_type, status: status}))
end
end
5. Enhanced Error Context for AI Operations
DSPEx needs richer error context for debugging AI workflows:
# Enhanced Foundation.ErrorContext
defmodule Foundation.ErrorContext do
# Existing functions...
@doc """
Create AI operation context with provider and model information.
"""
def new_ai_context(module, operation, ai_metadata, opts \\ []) do
enhanced_metadata = Map.merge(ai_metadata, %{
ai_operation: true,
provider: Map.get(ai_metadata, :provider),
model: Map.get(ai_metadata, :model),
estimated_cost: Map.get(ai_metadata, :estimated_cost, 0.0)
})
new(module, operation,
Keyword.put(opts, :metadata, enhanced_metadata))
end
@doc """
Add prompt information to error context.
"""
def add_prompt_context(context, prompt_data) do
prompt_metadata = %{
prompt_hash: hash_prompt(prompt_data),
prompt_token_estimate: estimate_tokens(prompt_data),
prompt_type: detect_prompt_type(prompt_data)
}
add_metadata(context, prompt_metadata)
end
@doc """
Add model response context for debugging failed operations.
"""
def add_response_context(context, response_data) do
response_metadata = %{
response_tokens: Map.get(response_data, :completion_tokens, 0),
total_tokens: Map.get(response_data, :total_tokens, 0),
finish_reason: Map.get(response_data, :finish_reason),
response_hash: hash_response(response_data)
}
add_metadata(context, response_metadata)
end
# Private helpers
defp detect_prompt_type(prompt_data) when is_list(prompt_data) do
if Enum.any?(prompt_data, &match?(%{role: _}, &1)), do: :chat, else: :completion
end
defp detect_prompt_type(prompt_data) when is_binary(prompt_data), do: :completion
defp detect_prompt_type(_), do: :unknown
defp hash_response(response_data) do
content = case response_data do
%{choices: [%{message: %{content: content}} | _]} -> content
%{choices: [%{text: text} | _]} -> text
_ -> inspect(response_data)
end
:crypto.hash(:sha256, content) |> Base.encode16(case: :lower) |> String.slice(0, 16)
end
end
6. Configuration Schema Extensions
Foundation’s configuration needs AI-specific schemas:
# Enhanced configuration structure
%{
# Existing Foundation config...
ai: %{
providers: %{
openai: %{
api_key: {:system, "OPENAI_API_KEY"},
base_url: "https://api.openai.com/v1",
default_model: "gpt-4",
timeout: 30_000,
retry_attempts: 3,
rate_limit: %{window: 60_000, limit: 100}
},
anthropic: %{
api_key: {:system, "ANTHROPIC_API_KEY"},
base_url: "https://api.anthropic.com",
default_model: "claude-3-sonnet-20240229",
timeout: 30_000,
retry_attempts: 3,
rate_limit: %{window: 60_000, limit: 50}
}
},
caching: %{
enabled: true,
memory_cache_size: 1000,
disk_cache_enabled: true,
disk_cache_path: "./cache/dspex",
ttl_seconds: 3600
},
optimization: %{
default_parallel_workers: 4,
max_optimization_iterations: 50,
early_stopping_patience: 5,
evaluation_batch_size: 10
},
monitoring: %{
cost_tracking_enabled: true,
performance_logging: true,
detailed_event_logging: false
}
},
dspex: %{
signatures: %{
validation_enabled: true,
strict_field_checking: false
},
modules: %{
default_adapter: :chat,
timeout: 30_000,
retry_failed_predictions: true
},
evaluation: %{
parallel_execution: true,
max_workers: :auto,
progress_reporting: true
}
}
}
7. Enhanced Process Registry for DSPEx Services
DSPEx will need specialized service registration patterns:
# Enhanced Foundation.ServiceRegistry
defmodule Foundation.ServiceRegistry do
# Existing functions...
@doc """
Register a DSPEx module instance with metadata.
"""
def register_dspex_module(namespace, module_id, pid, module_metadata \\ %{}) do
enhanced_name = {:dspex_module, module_id}
# Store metadata in process registry
:ok = ProcessRegistry.register_with_metadata(
namespace,
enhanced_name,
pid,
Map.merge(module_metadata, %{
type: :dspex_module,
registered_at: System.system_time(:millisecond)
})
)
# Emit registration event
Foundation.Events.new_event(:dspex_module_registered, %{
module_id: module_id,
module_type: Map.get(module_metadata, :module_type),
signature: Map.get(module_metadata, :signature)
}) |> Foundation.Events.store()
:ok
end
@doc """
List all active DSPEx modules in a namespace.
"""
def list_dspex_modules(namespace) do
ProcessRegistry.list_by_pattern(namespace, {:dspex_module, :_})
end
@doc """
Register a long-running optimization process.
"""
def register_optimization(namespace, optimization_id, pid, optimization_metadata) do
enhanced_name = {:optimization, optimization_id}
ProcessRegistry.register_with_metadata(
namespace,
enhanced_name,
pid,
Map.merge(optimization_metadata, %{
type: :optimization,
started_at: System.system_time(:millisecond)
})
)
end
end
8. Streaming Support Infrastructure
DSPEx needs streaming support for LM responses:
# New module: Foundation.Streaming
defmodule Foundation.Streaming do
@moduledoc """
Streaming support for Foundation with DSPEx integration.
"""
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
def init(opts) do
{:producer, %{
buffer: :queue.new(),
demand: 0,
correlation_id: opts[:correlation_id]
}}
end
@doc """
Create a streaming producer for LM responses.
"""
def stream_lm_response(provider, messages, opts \\ []) do
correlation_id = Foundation.Utils.generate_correlation_id()
# Start streaming producer
{:ok, producer} = start_link([correlation_id: correlation_id])
# Start async streaming task
Task.start_link(fn ->
execute_streaming_request(producer, provider, messages, opts, correlation_id)
end)
{:ok, producer}
end
def handle_demand(demand, state) when demand > 0 do
{events, new_buffer} = take_events(state.buffer, demand, [])
{:noreply, events, %{state | buffer: new_buffer, demand: state.demand + demand - length(events)}}
end
def handle_cast({:stream_chunk, chunk}, state) do
new_buffer = :queue.in(chunk, state.buffer)
{events, final_buffer} = take_events(new_buffer, state.demand, [])
{:noreply, events, %{state | buffer: final_buffer, demand: state.demand - length(events)}}
end
def handle_cast(:stream_complete, state) do
{:stop, :normal, state}
end
# Private implementation functions...
defp execute_streaming_request(producer, provider, messages, opts, correlation_id) do
# Implementation for streaming LM requests
# This would integrate with the actual LM provider streaming APIs
end
defp take_events(buffer, demand, acc) when demand <= 0 or :queue.is_empty(buffer) do
{Enum.reverse(acc), buffer}
end
defp take_events(buffer, demand, acc) do
{{:value, event}, new_buffer} = :queue.out(buffer)
take_events(new_buffer, demand - 1, [event | acc])
end
end
9. DSPEx-Specific Infrastructure Protection
Enhanced protection patterns for AI operations:
# Enhanced Foundation.Infrastructure
defmodule Foundation.Infrastructure do
# Existing functions...
@doc """
Execute an AI operation with comprehensive protection.
"""
def execute_ai_protected(provider, operation_type, metadata, fun) do
protection_key = {:ai_operation, provider, operation_type}
# Enhanced protection with AI-specific configuration
execute_protected(
protection_key,
[
circuit_breaker: :"#{provider}_breaker",
rate_limiter: {:ai_provider, provider, metadata[:user_id] || :global},
timeout: metadata[:timeout] || 30_000,
retry_strategy: metadata[:retry_strategy] || :exponential_backoff
],
fun
)
end
@doc """
Configure AI provider protection with intelligent defaults.
"""
def configure_ai_provider_protection(provider, opts \\ []) do
provider_config = Foundation.Config.get_with_default(
[:ai, :providers, provider],
%{}
)
protection_config = %{
circuit_breaker: %{
failure_threshold: opts[:failure_threshold] || provider_config[:failure_threshold] || 5,
recovery_time: opts[:recovery_time] || provider_config[:recovery_time] || 30_000
},
rate_limiter: %{
scale: opts[:rate_window] || provider_config[:rate_window] || 60_000,
limit: opts[:rate_limit] || provider_config[:rate_limit] || 100
}
}
configure_protection({:ai_provider, provider}, protection_config)
end
end
Summary
These enhancements to Foundation provide:
- AI-First Infrastructure: Specialized support for LM providers, streaming, and AI-specific protection patterns
- Rich Observability: Comprehensive telemetry and event tracking for AI operations, costs, and performance
- Enhanced Error Context: Debugging support for complex AI workflows with prompt and response tracking
- Streaming Support: Built-in streaming capabilities for real-time LM responses
- Optimization Infrastructure: Support for long-running optimization processes with proper monitoring
- Configuration Management: AI-specific configuration schemas and management
- Process Management: Specialized registry patterns for DSPEx modules and optimization processes
These enhancements maintain Foundation’s architectural principles while providing the specialized infrastructure DSPEx needs for robust, observable, and scalable language model programming in Elixir.