← Back to 002 foundation enhancements

100 claude

Documentation for 100_claude from the Ds ex repository.

Looking at the comprehensive DSPy port documentation and the Foundation library, I can see how Foundation provides an excellent infrastructure base. However, the DSPEx port will need several specialized enhancements to Foundation to handle the unique requirements of language model programming. Here’s my analysis and recommended enhancements:

Foundation Enhancements for DSPEx Support

1. AI Provider Infrastructure

Foundation currently has general infrastructure protection, but DSPEx needs specialized AI provider management:

# New module: Foundation.AI
defmodule Foundation.AI do
  @moduledoc """
  AI provider management and configuration for Foundation.
  """
  
  def configure_provider(provider_name, config) do
    # Enhanced configuration for AI providers
    Foundation.Config.update([:ai, :providers, provider_name], config)
  end
  
  def get_provider_config(provider_name) do
    Foundation.Config.get([:ai, :providers, provider_name])
  end
  
  # Provider-specific circuit breaker and rate limiting
  def create_provider_protection(provider_name, opts \\ []) do
    Foundation.Infrastructure.configure_protection(
      {:ai_provider, provider_name},
      %{
        circuit_breaker: %{
          failure_threshold: opts[:failure_threshold] || 5,
          recovery_time: opts[:recovery_time] || 30_000
        },
        rate_limiter: %{
          scale: opts[:rate_window] || 60_000,
          limit: opts[:rate_limit] || 100
        }
      }
    )
  end
end

2. Enhanced Correlation and Request Tracking

DSPEx needs sophisticated request correlation for complex multi-step AI workflows:

# Enhanced Foundation.Utils
defmodule Foundation.Utils do
  # Existing functions...
  
  @doc """
  Generate hierarchical correlation IDs for nested AI operations.
  """
  def generate_nested_correlation_id(parent_id \\ nil) do
    base_id = generate_correlation_id()
    case parent_id do
      nil -> base_id
      parent -> "#{parent}:#{String.slice(base_id, 0, 8)}"
    end
  end
  
  @doc """
  Create execution context for DSPEx operations.
  """
  def create_execution_context(module, operation, opts \\ []) do
    %{
      correlation_id: Keyword.get(opts, :correlation_id) || generate_correlation_id(),
      module: module,
      operation: operation,
      started_at: monotonic_timestamp(),
      metadata: Keyword.get(opts, :metadata, %{})
    }
  end
end

3. Specialized Event Types for AI Operations

Foundation’s event system needs DSPEx-specific event types:

# Enhanced Foundation.Events
defmodule Foundation.Events do
  # Existing functions...
  
  @doc """
  Create a language model request event.
  """
  def new_lm_request_event(provider, model, prompt_data, opts \\ []) do
    new_event(:lm_request, %{
      provider: provider,
      model: model,
      prompt_tokens: estimate_tokens(prompt_data),
      prompt_hash: hash_prompt(prompt_data)
    }, opts)
  end
  
  @doc """
  Create a language model response event.
  """
  def new_lm_response_event(provider, model, response_data, request_event_id, opts \\ []) do
    new_event(:lm_response, %{
      provider: provider,
      model: model,
      completion_tokens: Map.get(response_data, :completion_tokens, 0),
      total_tokens: Map.get(response_data, :total_tokens, 0),
      cost_estimate: calculate_cost(response_data),
      request_event_id: request_event_id
    }, opts)
  end
  
  @doc """
  Create a module execution event for DSPEx programs.
  """
  def new_module_execution_event(module_type, signature, execution_data, opts \\ []) do
    new_event(:module_execution, %{
      module_type: module_type,
      signature: signature,
      input_fields: Map.keys(execution_data.inputs || %{}),
      output_fields: Map.keys(execution_data.outputs || %{}),
      duration_ms: execution_data.duration_ms,
      success: execution_data.success
    }, opts)
  end
  
  @doc """
  Create an optimization iteration event.
  """
  def new_optimization_event(optimizer_type, iteration_data, opts \\ []) do
    new_event(:optimization_iteration, %{
      optimizer_type: optimizer_type,
      iteration: iteration_data.iteration,
      candidate_count: iteration_data.candidate_count,
      best_score: iteration_data.best_score,
      improvement: iteration_data.improvement
    }, opts)
  end
  
  # Private helper functions
  defp estimate_tokens(prompt_data) do
    # Simple estimation - could be enhanced
    case prompt_data do
      data when is_binary(data) -> div(String.length(data), 4)
      data when is_list(data) -> 
        data |> Enum.map(&estimate_tokens/1) |> Enum.sum()
      _ -> 0
    end
  end
  
  defp hash_prompt(prompt_data) do
    :crypto.hash(:sha256, :erlang.term_to_binary(prompt_data))
    |> Base.encode16(case: :lower)
    |> String.slice(0, 16)
  end
  
  defp calculate_cost(%{total_tokens: tokens, model: model}) when is_integer(tokens) do
    # Model-specific cost calculation
    base_cost_per_1k = get_model_cost(model)
    (tokens / 1000) * base_cost_per_1k
  end
  defp calculate_cost(_), do: 0.0
  
  defp get_model_cost("gpt-4"), do: 0.03
  defp get_model_cost("gpt-3.5-turbo"), do: 0.002
  defp get_model_cost(_), do: 0.01
end

4. AI-Specific Telemetry Metrics

Foundation’s telemetry needs enhancement for AI/ML observability:

# Enhanced Foundation.Telemetry
defmodule Foundation.Telemetry do
  # Existing functions...
  
  @doc """
  Emit LM usage metrics.
  """
  def emit_lm_usage(provider, model, usage_data, metadata \\ %{}) do
    emit_gauge([:foundation, :ai, :tokens_used], usage_data.total_tokens, 
      Map.merge(metadata, %{provider: provider, model: model}))
    
    emit_gauge([:foundation, :ai, :cost_usd], usage_data.cost, 
      Map.merge(metadata, %{provider: provider, model: model}))
    
    emit_counter([:foundation, :ai, :requests], 
      Map.merge(metadata, %{provider: provider, model: model, status: :success}))
  end
  
  @doc """
  Emit module performance metrics.
  """
  def emit_module_performance(module_type, signature, duration_ms, success, metadata \\ %{}) do
    emit_histogram([:foundation, :dspex, :module_duration], duration_ms,
      Map.merge(metadata, %{module_type: module_type, signature: signature}))
    
    status = if success, do: :success, else: :error
    emit_counter([:foundation, :dspex, :module_executions],
      Map.merge(metadata, %{module_type: module_type, signature: signature, status: status}))
  end
  
  @doc """
  Emit optimization progress metrics.
  """
  def emit_optimization_progress(optimizer_type, iteration, score, metadata \\ %{}) do
    emit_gauge([:foundation, :dspex, :optimization_score], score,
      Map.merge(metadata, %{optimizer_type: optimizer_type, iteration: iteration}))
    
    emit_counter([:foundation, :dspex, :optimization_iterations],
      Map.merge(metadata, %{optimizer_type: optimizer_type}))
  end
  
  @doc """
  Emit cache performance metrics specific to AI operations.
  """
  def emit_ai_cache_metrics(cache_type, operation, hit, metadata \\ %{}) do
    status = if hit, do: :hit, else: :miss
    emit_counter([:foundation, :ai, :cache, operation], 
      Map.merge(metadata, %{cache_type: cache_type, status: status}))
  end
end

5. Enhanced Error Context for AI Operations

DSPEx needs richer error context for debugging AI workflows:

# Enhanced Foundation.ErrorContext
defmodule Foundation.ErrorContext do
  # Existing functions...
  
  @doc """
  Create AI operation context with provider and model information.
  """
  def new_ai_context(module, operation, ai_metadata, opts \\ []) do
    enhanced_metadata = Map.merge(ai_metadata, %{
      ai_operation: true,
      provider: Map.get(ai_metadata, :provider),
      model: Map.get(ai_metadata, :model),
      estimated_cost: Map.get(ai_metadata, :estimated_cost, 0.0)
    })
    
    new(module, operation, 
      Keyword.put(opts, :metadata, enhanced_metadata))
  end
  
  @doc """
  Add prompt information to error context.
  """
  def add_prompt_context(context, prompt_data) do
    prompt_metadata = %{
      prompt_hash: hash_prompt(prompt_data),
      prompt_token_estimate: estimate_tokens(prompt_data),
      prompt_type: detect_prompt_type(prompt_data)
    }
    
    add_metadata(context, prompt_metadata)
  end
  
  @doc """
  Add model response context for debugging failed operations.
  """
  def add_response_context(context, response_data) do
    response_metadata = %{
      response_tokens: Map.get(response_data, :completion_tokens, 0),
      total_tokens: Map.get(response_data, :total_tokens, 0),
      finish_reason: Map.get(response_data, :finish_reason),
      response_hash: hash_response(response_data)
    }
    
    add_metadata(context, response_metadata)
  end
  
  # Private helpers
  defp detect_prompt_type(prompt_data) when is_list(prompt_data) do
    if Enum.any?(prompt_data, &match?(%{role: _}, &1)), do: :chat, else: :completion
  end
  defp detect_prompt_type(prompt_data) when is_binary(prompt_data), do: :completion
  defp detect_prompt_type(_), do: :unknown
  
  defp hash_response(response_data) do
    content = case response_data do
      %{choices: [%{message: %{content: content}} | _]} -> content
      %{choices: [%{text: text} | _]} -> text
      _ -> inspect(response_data)
    end
    :crypto.hash(:sha256, content) |> Base.encode16(case: :lower) |> String.slice(0, 16)
  end
end

6. Configuration Schema Extensions

Foundation’s configuration needs AI-specific schemas:

# Enhanced configuration structure
%{
  # Existing Foundation config...
  ai: %{
    providers: %{
      openai: %{
        api_key: {:system, "OPENAI_API_KEY"},
        base_url: "https://api.openai.com/v1",
        default_model: "gpt-4",
        timeout: 30_000,
        retry_attempts: 3,
        rate_limit: %{window: 60_000, limit: 100}
      },
      anthropic: %{
        api_key: {:system, "ANTHROPIC_API_KEY"},
        base_url: "https://api.anthropic.com",
        default_model: "claude-3-sonnet-20240229",
        timeout: 30_000,
        retry_attempts: 3,
        rate_limit: %{window: 60_000, limit: 50}
      }
    },
    caching: %{
      enabled: true,
      memory_cache_size: 1000,
      disk_cache_enabled: true,
      disk_cache_path: "./cache/dspex",
      ttl_seconds: 3600
    },
    optimization: %{
      default_parallel_workers: 4,
      max_optimization_iterations: 50,
      early_stopping_patience: 5,
      evaluation_batch_size: 10
    },
    monitoring: %{
      cost_tracking_enabled: true,
      performance_logging: true,
      detailed_event_logging: false
    }
  },
  dspex: %{
    signatures: %{
      validation_enabled: true,
      strict_field_checking: false
    },
    modules: %{
      default_adapter: :chat,
      timeout: 30_000,
      retry_failed_predictions: true
    },
    evaluation: %{
      parallel_execution: true,
      max_workers: :auto,
      progress_reporting: true
    }
  }
}

7. Enhanced Process Registry for DSPEx Services

DSPEx will need specialized service registration patterns:

# Enhanced Foundation.ServiceRegistry
defmodule Foundation.ServiceRegistry do
  # Existing functions...
  
  @doc """
  Register a DSPEx module instance with metadata.
  """
  def register_dspex_module(namespace, module_id, pid, module_metadata \\ %{}) do
    enhanced_name = {:dspex_module, module_id}
    
    # Store metadata in process registry
    :ok = ProcessRegistry.register_with_metadata(
      namespace, 
      enhanced_name, 
      pid, 
      Map.merge(module_metadata, %{
        type: :dspex_module,
        registered_at: System.system_time(:millisecond)
      })
    )
    
    # Emit registration event
    Foundation.Events.new_event(:dspex_module_registered, %{
      module_id: module_id,
      module_type: Map.get(module_metadata, :module_type),
      signature: Map.get(module_metadata, :signature)
    }) |> Foundation.Events.store()
    
    :ok
  end
  
  @doc """
  List all active DSPEx modules in a namespace.
  """
  def list_dspex_modules(namespace) do
    ProcessRegistry.list_by_pattern(namespace, {:dspex_module, :_})
  end
  
  @doc """
  Register a long-running optimization process.
  """
  def register_optimization(namespace, optimization_id, pid, optimization_metadata) do
    enhanced_name = {:optimization, optimization_id}
    
    ProcessRegistry.register_with_metadata(
      namespace,
      enhanced_name,
      pid,
      Map.merge(optimization_metadata, %{
        type: :optimization,
        started_at: System.system_time(:millisecond)
      })
    )
  end
end

8. Streaming Support Infrastructure

DSPEx needs streaming support for LM responses:

# New module: Foundation.Streaming
defmodule Foundation.Streaming do
  @moduledoc """
  Streaming support for Foundation with DSPEx integration.
  """
  
  use GenStage
  
  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts)
  end
  
  def init(opts) do
    {:producer, %{
      buffer: :queue.new(),
      demand: 0,
      correlation_id: opts[:correlation_id]
    }}
  end
  
  @doc """
  Create a streaming producer for LM responses.
  """
  def stream_lm_response(provider, messages, opts \\ []) do
    correlation_id = Foundation.Utils.generate_correlation_id()
    
    # Start streaming producer
    {:ok, producer} = start_link([correlation_id: correlation_id])
    
    # Start async streaming task
    Task.start_link(fn ->
      execute_streaming_request(producer, provider, messages, opts, correlation_id)
    end)
    
    {:ok, producer}
  end
  
  def handle_demand(demand, state) when demand > 0 do
    {events, new_buffer} = take_events(state.buffer, demand, [])
    {:noreply, events, %{state | buffer: new_buffer, demand: state.demand + demand - length(events)}}
  end
  
  def handle_cast({:stream_chunk, chunk}, state) do
    new_buffer = :queue.in(chunk, state.buffer)
    {events, final_buffer} = take_events(new_buffer, state.demand, [])
    {:noreply, events, %{state | buffer: final_buffer, demand: state.demand - length(events)}}
  end
  
  def handle_cast(:stream_complete, state) do
    {:stop, :normal, state}
  end
  
  # Private implementation functions...
  defp execute_streaming_request(producer, provider, messages, opts, correlation_id) do
    # Implementation for streaming LM requests
    # This would integrate with the actual LM provider streaming APIs
  end
  
  defp take_events(buffer, demand, acc) when demand <= 0 or :queue.is_empty(buffer) do
    {Enum.reverse(acc), buffer}
  end
  
  defp take_events(buffer, demand, acc) do
    {{:value, event}, new_buffer} = :queue.out(buffer)
    take_events(new_buffer, demand - 1, [event | acc])
  end
end

9. DSPEx-Specific Infrastructure Protection

Enhanced protection patterns for AI operations:

# Enhanced Foundation.Infrastructure
defmodule Foundation.Infrastructure do
  # Existing functions...
  
  @doc """
  Execute an AI operation with comprehensive protection.
  """
  def execute_ai_protected(provider, operation_type, metadata, fun) do
    protection_key = {:ai_operation, provider, operation_type}
    
    # Enhanced protection with AI-specific configuration
    execute_protected(
      protection_key,
      [
        circuit_breaker: :"#{provider}_breaker",
        rate_limiter: {:ai_provider, provider, metadata[:user_id] || :global},
        timeout: metadata[:timeout] || 30_000,
        retry_strategy: metadata[:retry_strategy] || :exponential_backoff
      ],
      fun
    )
  end
  
  @doc """
  Configure AI provider protection with intelligent defaults.
  """
  def configure_ai_provider_protection(provider, opts \\ []) do
    provider_config = Foundation.Config.get_with_default(
      [:ai, :providers, provider], 
      %{}
    )
    
    protection_config = %{
      circuit_breaker: %{
        failure_threshold: opts[:failure_threshold] || provider_config[:failure_threshold] || 5,
        recovery_time: opts[:recovery_time] || provider_config[:recovery_time] || 30_000
      },
      rate_limiter: %{
        scale: opts[:rate_window] || provider_config[:rate_window] || 60_000,
        limit: opts[:rate_limit] || provider_config[:rate_limit] || 100
      }
    }
    
    configure_protection({:ai_provider, provider}, protection_config)
  end
end

Summary

These enhancements to Foundation provide:

  1. AI-First Infrastructure: Specialized support for LM providers, streaming, and AI-specific protection patterns
  2. Rich Observability: Comprehensive telemetry and event tracking for AI operations, costs, and performance
  3. Enhanced Error Context: Debugging support for complex AI workflows with prompt and response tracking
  4. Streaming Support: Built-in streaming capabilities for real-time LM responses
  5. Optimization Infrastructure: Support for long-running optimization processes with proper monitoring
  6. Configuration Management: AI-specific configuration schemas and management
  7. Process Management: Specialized registry patterns for DSPEx modules and optimization processes

These enhancements maintain Foundation’s architectural principles while providing the specialized infrastructure DSPEx needs for robust, observable, and scalable language model programming in Elixir.