← Back to 004 foundation

01 gemini

Documentation for 01_gemini from the Ds ex repository.

Excellent. This is a comprehensive and insightful collection of brainstorming documents. The various proposals cover everything from low-level infrastructure to high-level AI-specific patterns. My task is to synthesize these discussions into a single, highly curated, and optimized specification for the DSPex Foundation.

This specification will define the core, reusable components specific to AI orchestration that will be built on top of the existing foundation library. It will serve as the stable base for all higher-level DSPEx modules like Predict, ChainOfThought, and various optimizers.


Specification: The DSPex Foundation Layer

Version: 1.0 Status: PROPOSED

1. Guiding Philosophy

The DSPex Foundation is not intended to reinvent general-purpose infrastructure. Its purpose is to provide a set of robust, reusable, and AI-centric components that directly address the unique challenges of programming and orchestrating Language Models. It will achieve this by leveraging the generic, production-grade infrastructure provided by the foundation Hex package (v0.1.1+).

This specification draws a clear line:

  • foundation library: Provides generic building blocks: Config, Service Registry, Events, Telemetry, Error Context, and Infrastructure Protection (Circuit Breakers, Rate Limiters, Connection Pools).
  • DSPex Foundation (this spec): Provides AI-specific building blocks: AI Provider Management, Prompt Engineering, Intelligent Caching, Request Orchestration, Cost Analytics, and a Workflow Engine.

2. Integration with the foundation Hex Library

The DSPex Foundation will assume the foundation library is a core dependency and will use its components extensively:

  • Configuration (Foundation.Config): All DSPEx settings, from API keys to cache TTLs, will be managed through foundation’s config system.
  • Service Discovery (Foundation.ServiceRegistry): All long-running DSPEx services (e.g., LM clients, Cache managers) will be registered with and discoverable via the ServiceRegistry.
  • Observability (Foundation.Events, Foundation.Telemetry): All significant DSPEx operations (LM calls, cache hits, optimization steps) will emit structured events and telemetry through the foundation layer, providing a unified observability plane.
  • Resilience (Foundation.Infrastructure): All external I/O, especially calls to LM providers, will be wrapped in Foundation.Infrastructure.execute_protected/3, gaining circuit-breaking, rate-limiting, and connection pooling automatically.
  • Error Handling (Foundation.ErrorContext): All complex operations will be wrapped in an ErrorContext, ensuring that failures produce rich, traceable error reports with a full execution history.

3. Core Components of the DSPex Foundation

The following components form the core of the DSPex Foundation.

Component 1: AI Provider Client (DSPEx.Client)

This component abstracts the interaction with external AI providers (e.g., OpenAI, Anthropic). It’s a GenServer that manages configuration and delegates execution to foundation’s resilient infrastructure.

defmodule DSPEx.Client do
  @moduledoc """
  A supervised GenServer for managing interactions with a specific AI provider.
  """
  use GenServer

  # --- Public API ---

  def start_link(opts) do
    provider_id = Keyword.fetch!(opts, :provider_id)
    # Register with foundation's service registry
    GenServer.start_link(__MODULE__, opts, name: via_tuple(provider_id))
  end

  def request(provider_id, messages, opts \\ %{}) do
    GenServer.call(via_tuple(provider_id), {:request, messages, opts})
  end

  # --- GenServer Callbacks ---

  @impl true
  def init(opts) do
    provider_id = Keyword.fetch!(opts, :provider_id)
    {:ok, config} = Foundation.Config.get([:dspex, :providers, provider_id])

    # Configure a dedicated connection pool and protection policy
    pool_name = :"#{provider_id}_pool"
    protection_key = :"#{provider_id}_protection"

    Foundation.Infrastructure.ConnectionManager.start_pool(pool_name,
      worker_module: DSPEx.Client.HttpWorker,
      worker_args: [config: config],
      size: config.pool_size || 10
    )

    Foundation.Infrastructure.configure_protection(protection_key, %{
      circuit_breaker: config.circuit_breaker,
      # Multi-bucket rate limiting for RPM and TPM
      rate_limiter: [
        rpm: {config.rate_limit_rpm.window, config.rate_limit_rpm.limit},
        tpm: {config.rate_limit_tpm.window, config.rate_limit_tpm.limit}
      ]
    })

    state = %{
      provider_id: provider_id,
      config: config,
      pool_name: pool_name,
      protection_key: protection_key
    }
    {:ok, state}
  end

  @impl true
  def handle_call({:request, messages, opts}, _from, state) do
    # Define the core API call function
    api_call_fun = fn worker_pid ->
      DSPEx.Client.HttpWorker.post(worker_pid, state.config.path, %{
        model: opts[:model] || state.config.default_model,
        messages: messages
        # ... other parameters
      })
    end

    # Calculate token cost for rate limiting
    token_cost = estimate_tokens(messages)

    # Use foundation's protection wrapper for the call
    response =
      Foundation.Infrastructure.execute_protected(
        state.protection_key,
        [
          connection_pool: state.pool_name,
          # Apply costs to the appropriate rate limit buckets
          rate_limiter_costs: [rpm: 1, tpm: token_cost]
        ],
        api_call_fun
      )

    # Emit telemetry via foundation
    emit_usage_telemetry(response, state, token_cost)
    {:reply, response, state}
  end

  # --- Private Helpers ---
  defp via_tuple(provider_id), do: Foundation.ServiceRegistry.via_tuple(:dspex, {:client, provider_id})
  defp estimate_tokens(messages), do: # ... token estimation logic ...
  defp emit_usage_telemetry(response, state, token_cost) do
    # ... logic to emit Foundation.Telemetry events for cost and latency ...
  end
end

Component 2: Prompt Template Engine (DSPEx.PromptTemplate)

This component provides a sophisticated, composable system for building and managing prompts, moving beyond the simple string-based Signature.

defmodule DSPEx.PromptTemplate do
  @moduledoc "A composable engine for building and optimizing prompts."
  defstruct [:id, :name, :template_ast, :variables, :metadata, :parent]

  def create(name, template_string, opts \\ []) do
    # Parses the string into an AST, identifies variables
    %__MODULE__{id: Foundation.Utils.generate_id(), name: name, ...}
  end

  def render(template, variables) do
    # Renders the AST with variables, handling inheritance from parent templates
  end

  def compose(template1, template2) do
    # Creates a new template that combines the two
  end

  def optimize(template, performance_data) do
    # Suggests improvements based on historical data (e.g., adding few-shot examples)
  end
end

Component 3: Semantic & Multi-Tier Cache (DSPEx.Cache)

This component provides a smart caching layer that understands semantic similarity, reducing redundant LM calls and costs.

defmodule DSPEx.Cache do
  @moduledoc "A multi-tier cache with semantic retrieval capabilities."
  use GenServer

  # L1: ETS (in-memory) for exact matches (fastest)
  # L2: On-disk (persistent) for exact matches
  # L3: Vector DB (semantic) for similar matches

  defstruct [:l1_cache, :l2_cache, :vector_store]

  def get_or_compute(cache_key, prompt_text, compute_fun, opts \\ []) do
    # 1. Check L1 cache for exact match
    # 2. Check L2 cache for exact match
    # 3. If enabled, generate embedding for prompt_text and check L3 for similar prompts
    # 4. If all miss, execute `compute_fun`, then cache the result in L1, L2, and L3
  end

  # Helper module for generating embeddings for caching
  defmodule Embedding do
    def generate(text), do: # ... logic to call a local or API-based embedding model ...
  end
end

Component 4: Cost & Performance Analytics (DSPEx.Analytics)

This component provides tools for tracking and analyzing the cost and performance of LM operations. It listens for foundation telemetry and events to build its analysis.

defmodule DSPEx.Analytics.CostManager do
  @moduledoc "Tracks and analyzes costs of LM operations."
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    # Subscribe to foundation events related to LM calls
    Foundation.Events.subscribe([:lm_request_complete])
    {:ok, %{cost_data: %{}}}
  end

  def handle_info({:foundation_event, event}, state) do
    # Process event, extract token counts and cost, update internal state
    # ...
    {:noreply, new_state}
  end

  def get_cost_analysis(time_range, group_by \\ [:provider, :model]) do
    # Query internal state to provide detailed cost breakdowns
  end
end

Component 5: Declarative Workflow Engine (DSPEx.Workflow)

This provides a high-level, declarative way to define multi-step AI programs. It orchestrates calls to other DSPEx components and automatically handles tracing and error context via foundation.

defmodule DSPEx.Workflow do
  @moduledoc "A declarative engine for running multi-step AI pipelines."

  @type step :: {step_name :: atom(), step_fun :: (map() -> {:ok, map()} | {:error, term()})}
  @type pipeline :: [step()]

  def run(pipeline, initial_data, opts \\ []) do
    # 1. Create a top-level Foundation.ErrorContext for the whole workflow.
    correlation_id = Keyword.get(opts, :correlation_id) || Foundation.Utils.generate_correlation_id()
    parent_context = Foundation.ErrorContext.new(__MODULE__, :run, correlation_id: correlation_id)

    Foundation.ErrorContext.with_context(parent_context, fn ->
      # 2. Use Enum.reduce_while to execute the pipeline.
      Enum.reduce_while(pipeline, {:ok, initial_data}, fn {step_name, fun}, {:ok, acc} ->
        # 3. For each step, create a child context and measure execution with Telemetry.
        step_context = Foundation.ErrorContext.child_context(parent_context, __MODULE__, step_name)

        {result, duration} = Foundation.Telemetry.measure([:dspex, :workflow, :step], %{}, fn ->
          Foundation.ErrorContext.with_context(step_context, fn -> fun.(acc) end)
        end)

        # 4. Handle step result and continue or halt.
        case result do
          {:ok, new_acc} -> {:cont, {:ok, Map.merge(acc, new_acc)}}
          {:error, reason} -> {:halt, {:error, reason}}
        end
      end)
    end)
  end
end

4. Example Usage: A Simple Predict Program

This example shows how the DSPex Foundation components are used to build a high-level DSPEx.Predict module.

defmodule DSPEx.Predict do
  defstruct [:signature, :client_id, :template]

  def forward(%__MODULE__{} = program, inputs) do
    # 1. Define the workflow using the DSPex Foundation engine.
    pipeline = [
      {:render_prompt, &render_prompt_step/1},
      {:call_lm, &call_lm_step/1},
      {:parse_response, &parse_response_step/1}
    ]

    # 2. The initial data for the workflow includes the program state and inputs.
    initial_data = %{
      program: program,
      inputs: inputs
    }

    # 3. Run the workflow.
    DSPEx.Workflow.run(pipeline, initial_data)
  end

  # --- Workflow Step Implementations ---

  defp render_prompt_step(%{program: prog, inputs: inputs}) do
    # Uses the PromptTemplate engine
    rendered_prompt = DSPEx.PromptTemplate.render(prog.template, Map.merge(inputs, prog.signature))
    {:ok, %{prompt_messages: [role: "user", content: rendered_prompt]}}
  end

  defp call_lm_step(%{program: prog, prompt_messages: messages}) do
    # Uses the AI Provider Client
    case DSPEx.Client.request(prog.client_id, messages) do
      {:ok, response} -> {:ok, %{lm_response: response}}
      error -> error
    end
  end

  defp parse_response_step(%{program: prog, lm_response: response}) do
    # Logic to parse the response based on prog.signature
    # For simplicity, we just extract the content here.
    answer = get_in(response, [:choices, 0, :message, :content])
    {:ok, %{answer: answer}}
  end
end

5. Conclusion

This specification outlines a powerful, modular, and robust DSPex Foundation. By building upon the generic infrastructure of the foundation library, it can focus on providing high-value, AI-specific components. The result is a system that is not only capable of replicating DSPy’s core functionality but also benefits from the inherent concurrency, fault tolerance, and observability of the Elixir/OTP ecosystem from day one. This foundation is the ideal platform for building the next generation of DSPEx’s advanced optimizers and agentic modules.