DSPy to Elixir/BEAM: Detailed Technical Specifications
Detailed Module System Architecture
Module Process Lifecycle and State Management
defmodule DSPy.Module.Instance do
use GenServer
defstruct [
:id,
:signature,
:parameters,
:demos,
:lm_ref,
:adapter_ref,
:callbacks,
:metrics,
:parent_ref
]
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: via_tuple(opts[:id]))
end
def init(opts) do
state = %__MODULE__{
id: opts[:id] || UUID.uuid4(),
signature: opts[:signature],
parameters: %{},
demos: [],
callbacks: opts[:callbacks] || [],
metrics: DSPy.Metrics.new(),
parent_ref: opts[:parent_ref]
}
# Register with parent supervision tree
if state.parent_ref do
DSPy.Module.Registry.register(state.parent_ref, state.id, self())
end
{:ok, state}
end
# Main execution interface
def handle_call({:forward, inputs, opts}, from, state) do
# Implement async execution to avoid blocking
task = Task.async(fn ->
with {:ok, processed_inputs} <- validate_inputs(inputs, state.signature),
{:ok, outputs} <- execute_module(processed_inputs, state, opts),
{:ok, validated_outputs} <- validate_outputs(outputs, state.signature) do
{:ok, validated_outputs}
else
{:error, reason} -> {:error, reason}
end
end)
{:noreply, %{state | current_task: {task, from}}}
end
# Handle task completion
def handle_info({ref, result}, %{current_task: {%Task{ref: ref}, from}} = state) do
GenServer.reply(from, result)
Process.demonitor(ref, [:flush])
# Update metrics
new_metrics = DSPy.Metrics.update(state.metrics, result)
{:noreply, %{state | current_task: nil, metrics: new_metrics}}
end
private
defp via_tuple(id), do: {:via, Registry, {DSPy.Module.Registry, id}}
end
Signature System with Pattern Matching
defmodule DSPy.Signature do
@moduledoc """
Type-safe signature system using Elixir structs and pattern matching
"""
defmacro __using__(opts) do
quote do
@behaviour DSPy.Signature
defstruct [
:instructions,
:fields,
:input_fields,
:output_fields
]
def new(instructions \\ "", fields \\ %{}) do
%__MODULE__{
instructions: instructions,
fields: fields,
input_fields: Enum.filter(fields, fn {_k, v} -> v.type == :input end),
output_fields: Enum.filter(fields, fn {_k, v} -> v.type == :output end)
}
end
end
end
@callback fields() :: %{atom() => DSPy.Field.t()}
@callback instructions() :: String.t()
@callback validate_inputs(map()) :: {:ok, map()} | {:error, term()}
@callback validate_outputs(map()) :: {:ok, map()} | {:error, term()}
end
defmodule DSPy.Field do
@type field_type :: :input | :output
@type annotation :: atom() | module() | {:list, atom()} | {:union, [atom()]}
defstruct [
:name,
:type, # :input | :output
:annotation, # String.t() | integer() | custom_type()
:desc,
:prefix,
:format,
:parser,
:default,
:required
]
def input_field(name, annotation \\ :string, opts \\ []) do
%__MODULE__{
name: name,
type: :input,
annotation: annotation,
desc: opts[:desc] || "",
prefix: opts[:prefix],
format: opts[:format],
required: Keyword.get(opts, :required, true)
}
end
def output_field(name, annotation \\ :string, opts \\ []) do
%__MODULE__{
name: name,
type: :output,
annotation: annotation,
desc: opts[:desc] || "",
prefix: opts[:prefix],
format: opts[:format],
required: Keyword.get(opts, :required, true)
}
end
end
# Example signature definition
defmodule DSPy.Signatures.QuestionAnswer do
use DSPy.Signature
def fields do
%{
question: DSPy.Field.input_field(:question, :string, desc: "The question to answer"),
context: DSPy.Field.input_field(:context, {:list, :string}, desc: "Relevant context"),
answer: DSPy.Field.output_field(:answer, :string, desc: "The answer to the question")
}
end
def instructions do
"Answer the question based on the provided context. Be concise and accurate."
end
def validate_inputs(%{question: q, context: c} = inputs)
when is_binary(q) and is_list(c) do
{:ok, inputs}
end
def validate_inputs(_), do: {:error, :invalid_inputs}
def validate_outputs(%{answer: a} = outputs) when is_binary(a) do
{:ok, outputs}
end
def validate_outputs(_), do: {:error, :invalid_outputs}
end
Language Model Abstraction Layer
LM Provider Pool Architecture
defmodule DSPy.LM.Supervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
children = [
# Provider pools
{DSPy.LM.Provider.OpenAI.Pool, []},
{DSPy.LM.Provider.Anthropic.Pool, []},
{DSPy.LM.Provider.Local.Pool, []},
# Caching layer
{DSPy.Cache.Manager, []},
# Circuit breaker manager
{DSPy.CircuitBreaker.Manager, []},
# Rate limiter
{DSPy.RateLimit.Manager, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
defmodule DSPy.LM.Provider.OpenAI.Pool do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def get_worker(model, opts \\ []) do
case DSPy.LM.Provider.Registry.lookup({:openai, model}) do
[{pid, _}] when is_pid(pid) ->
if Process.alive?(pid), do: {:ok, pid}, else: start_worker(model, opts)
[] ->
start_worker(model, opts)
end
end
defp start_worker(model, opts) do
child_spec = {DSPy.LM.Provider.OpenAI.Worker, [model: model] ++ opts}
DynamicSupervisor.start_child(__MODULE__, child_spec)
end
end
defmodule DSPy.LM.Provider.OpenAI.Worker do
use GenServer
defstruct [
:model,
:api_key,
:base_url,
:client,
:circuit_breaker,
:rate_limiter,
:metrics
]
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def init(opts) do
model = opts[:model]
state = %__MODULE__{
model: model,
api_key: opts[:api_key] || System.get_env("OPENAI_API_KEY"),
base_url: opts[:base_url] || "https://api.openai.com/v1",
circuit_breaker: DSPy.CircuitBreaker.new(model),
rate_limiter: DSPy.RateLimit.new(model),
metrics: DSPy.Metrics.new()
}
# Register in provider registry
Registry.register(DSPy.LM.Provider.Registry, {:openai, model}, %{
worker_pid: self(),
model: model,
started_at: DateTime.utc_now()
})
{:ok, state}
end
def handle_call({:complete, messages, opts}, _from, state) do
case DSPy.CircuitBreaker.call(state.circuit_breaker, fn ->
DSPy.RateLimit.call(state.rate_limiter, fn ->
perform_completion(messages, opts, state)
end)
end) do
{:ok, result} ->
new_metrics = DSPy.Metrics.record_success(state.metrics, result)
{:reply, {:ok, result}, %{state | metrics: new_metrics}}
{:error, reason} ->
new_metrics = DSPy.Metrics.record_error(state.metrics, reason)
{:reply, {:error, reason}, %{state | metrics: new_metrics}}
end
end
defp perform_completion(messages, opts, state) do
# Check cache first
cache_key = DSPy.Cache.key(state.model, messages, opts)
case DSPy.Cache.get(cache_key) do
{:ok, cached_result} ->
{:ok, cached_result}
:miss ->
# Make API call
with {:ok, response} <- make_api_call(messages, opts, state),
:ok <- DSPy.Cache.put(cache_key, response) do
{:ok, response}
end
end
end
defp make_api_call(messages, opts, state) do
# Implement OpenAI API call with proper error handling
# This would use something like Finch or HTTPoison
end
end
Streaming Support with GenStage
defmodule DSPy.LM.StreamProducer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
def init(opts) do
{:producer, %{
lm_worker: opts[:lm_worker],
messages: opts[:messages],
opts: opts[:opts],
buffer: :queue.new(),
demand: 0
}}
end
def handle_demand(demand, state) when demand > 0 do
new_state = %{state | demand: state.demand + demand}
if :queue.is_empty(state.buffer) do
# Start streaming from LM
start_streaming(new_state)
else
dispatch_events(new_state)
end
end
defp start_streaming(state) do
# Start async streaming task
Task.start_link(fn ->
DSPy.LM.Provider.stream(
state.lm_worker,
state.messages,
state.opts ++ [stream_to: self()]
)
end)
{:noreply, [], state}
end
def handle_info({:stream_chunk, chunk}, state) do
new_buffer = :queue.in(chunk, state.buffer)
dispatch_events(%{state | buffer: new_buffer})
end
def handle_info(:stream_end, state) do
# Signal completion
{:stop, :normal, state}
end
defp dispatch_events(%{demand: 0} = state) do
{:noreply, [], state}
end
defp dispatch_events(%{demand: demand, buffer: buffer} = state) do
{events, new_buffer, new_demand} = take_events(buffer, demand, [])
{:noreply, events, %{state | buffer: new_buffer, demand: new_demand}}
end
defp take_events(buffer, 0, acc), do: {Enum.reverse(acc), buffer, 0}
defp take_events(buffer, demand, acc) do
case :queue.out(buffer) do
{{:value, event}, new_buffer} ->
take_events(new_buffer, demand - 1, [event | acc])
{:empty, buffer} ->
{Enum.reverse(acc), buffer, demand}
end
end
end
defmodule DSPy.LM.StreamConsumer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
def init(opts) do
{:consumer, %{
callback: opts[:callback],
buffer: ""
}}
end
def handle_events(events, _from, state) do
# Process streaming chunks
new_buffer = Enum.reduce(events, state.buffer, fn chunk, acc ->
updated_buffer = acc <> chunk.content
# Call callback with accumulated content
state.callback.(updated_buffer, chunk)
updated_buffer
end)
{:noreply, [], %{state | buffer: new_buffer}}
end
end
Advanced Adapter System
Adapter Pipeline Architecture
defmodule DSPy.Adapter do
@callback format(DSPy.Signature.t(), list(), map()) :: {:ok, list()} | {:error, term()}
@callback parse(DSPy.Signature.t(), String.t()) :: {:ok, map()} | {:error, term()}
@callback supports_streaming?() :: boolean()
end
defmodule DSPy.Adapter.Chat do
@behaviour DSPy.Adapter
def format(signature, demos, inputs) do
with {:ok, system_message} <- build_system_message(signature),
{:ok, demo_messages} <- format_demos(demos, signature),
{:ok, user_message} <- format_user_message(inputs, signature) do
messages = [system_message] ++ demo_messages ++ [user_message]
{:ok, messages}
end
end
def parse(signature, completion) do
DSPy.Adapter.Chat.Parser.parse(completion, signature)
end
def supports_streaming?, do: true
defp build_system_message(signature) do
content = """
#{signature.instructions}
#{format_field_descriptions(signature)}
#{format_field_structure(signature)}
"""
{:ok, %{role: "system", content: String.trim(content)}}
end
defp format_field_descriptions(signature) do
input_desc = format_fields_description(signature.input_fields, "Input")
output_desc = format_fields_description(signature.output_fields, "Output")
input_desc <> "\n\n" <> output_desc
end
defp format_field_structure(signature) do
input_fields = Enum.map(signature.input_fields, fn {name, field} ->
"- #{name}: #{field.desc || "Input field"}"
end)
output_fields = Enum.map(signature.output_fields, fn {name, field} ->
"- #{name}: #{field.desc || "Output field"}"
end)
"""
Input Fields:
#{Enum.join(input_fields, "\n")}
Output Fields:
#{Enum.join(output_fields, "\n")}
Follow this format for your response:
#{format_output_template(signature)}
"""
end
defp format_output_template(signature) do
signature.output_fields
|> Enum.map(fn {name, _field} -> "[[ ## #{name} ## ]]" end)
|> Enum.join("\n")
end
end
defmodule DSPy.Adapter.JSON do
@behaviour DSPy.Adapter
def format(signature, demos, inputs) do
# Similar to ChatAdapter but formatted for JSON response
with {:ok, system_message} <- build_json_system_message(signature),
{:ok, demo_messages} <- format_json_demos(demos, signature),
{:ok, user_message} <- format_json_user_message(inputs, signature) do
messages = [system_message] ++ demo_messages ++ [user_message]
{:ok, messages}
end
end
def parse(signature, completion) do
DSPy.Adapter.JSON.Parser.parse(completion, signature)
end
def supports_streaming?, do: false
defp build_json_system_message(signature) do
schema = build_json_schema(signature)
content = """
#{signature.instructions}
Respond with valid JSON that matches this schema:
#{Jason.encode!(schema, pretty: true)}
"""
{:ok, %{role: "system", content: content}}
end
defp build_json_schema(signature) do
properties =
signature.output_fields
|> Enum.into(%{}, fn {name, field} ->
{name, field_to_json_schema(field)}
end)
required =
signature.output_fields
|> Enum.filter(fn {_name, field} -> field.required end)
|> Enum.map(fn {name, _field} -> name end)
%{
type: "object",
properties: properties,
required: required,
additionalProperties: false
}
end
end
Custom Type System
defmodule DSPy.CustomType do
@callback format(any()) :: list() | String.t()
@callback parse(String.t()) :: {:ok, any()} | {:error, term()}
@callback description() :: String.t()
end
defmodule DSPy.Types.Image do
@behaviour DSPy.CustomType
defstruct [:url, :data, :format]
def format(%__MODULE__{url: url}) when is_binary(url) do
[%{type: "image_url", image_url: %{url: url}}]
end
def format(%__MODULE__{data: data, format: format}) when is_binary(data) do
data_uri = "data:image/#{format};base64,#{data}"
[%{type: "image_url", image_url: %{url: data_uri}}]
end
def parse(input) when is_binary(input) do
cond do
String.starts_with?(input, "http") ->
{:ok, %__MODULE__{url: input}}
String.starts_with?(input, "data:image/") ->
parse_data_uri(input)
true ->
{:error, :invalid_image_format}
end
end
def description, do: "An image that can be referenced by URL or embedded as base64 data"
defp parse_data_uri("data:image/" <> rest) do
case String.split(rest, ";base64,", parts: 2) do
[format, data] ->
{:ok, %__MODULE__{data: data, format: format}}
_ ->
{:error, :invalid_data_uri}
end
end
end
defmodule DSPy.Types.Tool do
@behaviour DSPy.CustomType
defstruct [:name, :description, :function, :parameters]
def format(%__MODULE__{} = tool) do
%{
type: "function",
function: %{
name: tool.name,
description: tool.description,
parameters: tool.parameters
}
}
end
def parse(tool_call_json) do
# Parse tool call from LM response
with {:ok, decoded} <- Jason.decode(tool_call_json),
{:ok, tool} <- extract_tool_call(decoded) do
{:ok, tool}
end
end
def description, do: "A function that can be called by the language model"
end
Optimization/Teleprompting Architecture
COPRO Optimizer Implementation
defmodule DSPy.Teleprompt.COPRO do
use GenServer
defstruct [
:prompt_model,
:metric,
:breadth,
:depth,
:temperature,
:candidates,
:evaluation_results,
:optimization_state
]
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def compile(pid, student, trainset, eval_opts \\ []) do
GenServer.call(pid, {:compile, student, trainset, eval_opts}, :infinity)
end
def init(opts) do
state = %__MODULE__{
prompt_model: opts[:prompt_model],
metric: opts[:metric],
breadth: opts[:breadth] || 10,
depth: opts[:depth] || 3,
temperature: opts[:temperature] || 1.4,
candidates: %{},
evaluation_results: %{},
optimization_state: :idle
}
{:ok, state}
end
def handle_call({:compile, student, trainset, eval_opts}, _from, state) do
# Start optimization process
optimization_pid = start_optimization_process(student, trainset, eval_opts, state)
# Wait for completion
result = wait_for_optimization(optimization_pid)
{:reply, result, %{state | optimization_state: :idle}}
end
defp start_optimization_process(student, trainset, eval_opts, state) do
Task.async(fn ->
run_copro_optimization(student, trainset, eval_opts, state)
end)
end
defp run_copro_optimization(student, trainset, eval_opts, state) do
# Initialize candidates with baseline
initial_candidates = initialize_candidates(student, state)
# Run depth iterations
final_candidates =
1..state.depth
|> Enum.reduce(initial_candidates, fn depth, candidates ->
run_optimization_round(candidates, trainset, eval_opts, state, depth)
end)
# Select best candidate
select_best_candidate(final_candidates, trainset, eval_opts, state)
end
defp run_optimization_round(candidates, trainset, eval_opts, state, depth) do
# Generate new instruction candidates
new_instructions = generate_instruction_candidates(candidates, state)
# Evaluate candidates in parallel
evaluation_tasks =
new_instructions
|> Enum.map(fn instruction ->
Task.async(fn ->
evaluate_candidate(instruction, trainset, eval_opts, state)
end)
end)
# Collect results
results = Task.await_many(evaluation_tasks, :infinity)
# Update candidates with results
update_candidates(candidates, results)
end
defp evaluate_candidate(instruction, trainset, eval_opts, state) do
# Create module with new instruction
module = update_module_instruction(instruction)
# Evaluate on trainset
evaluator = DSPy.Evaluate.start_link(
devset: trainset,
metric: state.metric,
num_threads: eval_opts[:num_threads] || 4
)
score = DSPy.Evaluate.call(evaluator, module)
%{
instruction: instruction,
module: module,
score: score
}
end
end
Bootstrap Few-Shot with Process Pools
defmodule DSPy.Teleprompt.Bootstrap do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
children = [
# Teacher pool for generating examples
{DSPy.Teleprompt.Bootstrap.TeacherPool, opts},
# Evaluation worker pool
{DSPy.Teleprompt.Bootstrap.EvaluationPool, opts},
# Demo candidate manager
{DSPy.Teleprompt.Bootstrap.DemoManager, opts},
# Progress tracker
{DSPy.Teleprompt.Bootstrap.ProgressTracker, opts}
]
Supervisor.init(children, strategy: :one_for_one)
end
def compile(student, teacher, trainset, opts \\ []) do
# Start compilation process
compilation_id = UUID.uuid4()
DSPy.Teleprompt.Bootstrap.Coordinator.start_compilation(
compilation_id,
student,
teacher,
trainset,
opts
)
end
end
defmodule DSPy.Teleprompt.Bootstrap.TeacherPool do
use DynamicSupervisor
def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def generate_examples(teacher, examples, opts) do
# Distribute example generation across worker pool
tasks =
examples
|> Enum.chunk_every(opts[:batch_size] || 10)
|> Enum.map(fn batch ->
start_teacher_worker(teacher, batch, opts)
end)
# Collect results
Task.await_many(tasks, :infinity)
|> List.flatten()
end
defp start_teacher_worker(teacher, batch, opts) do
Task.async(fn ->
worker_pid = start_teacher_instance(teacher)
results =
batch
|> Enum.map(fn example ->
DSPy.Module.call(worker_pid, example, opts)
end)
|> Enum.filter(&valid_example?/1)
# Clean up worker
GenServer.stop(worker_pid)
results
end)
end
end
Distributed Architecture Support
Multi-Node Coordination
defmodule DSPy.Cluster.Manager do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: {:global, __MODULE__})
end
def init(opts) do
# Set up cluster monitoring
:net_kernel.monitor_nodes(true)
state = %{
nodes: [Node.self()],
node_capabilities: %{Node.self() => get_node_capabilities()},
distributed_tasks: %{},
load_balancer: DSPy.LoadBalancer.new()
}
{:ok, state}
end
def handle_info({:nodeup, node}, state) do
# New node joined cluster
capabilities = :rpc.call(node, __MODULE__, :get_node_capabilities, [])
new_state = %{
state |
nodes: [node | state.nodes],
node_capabilities: Map.put(state.node_capabilities, node, capabilities)
}
{:noreply, new_state}
end
def handle_info({:nodedown, node}, state) do
# Node left cluster - redistribute tasks
tasks_to_redistribute = get_tasks_for_node(state.distributed_tasks, node)
new_state = redistribute_tasks(state, tasks_to_redistribute)
{:noreply, new_state}
end
def distribute_optimization(optimization_spec) do
GenServer.call({:global, __MODULE__}, {:distribute_optimization, optimization_spec})
end
def handle_call({:distribute_optimization, spec}, _from, state) do
# Determine optimal node distribution based on capabilities
distribution_plan = plan_distribution(spec, state)
# Start distributed optimization
task_refs = start_distributed_tasks(distribution_plan)
{:reply, {:ok, task_refs}, state}
end
defp plan_distribution(spec, state) do
# Analyze optimization requirements and node capabilities
# Return distribution plan
end
end
defmodule DSPy.Cluster.DistributedOptimization do
def run_distributed_copro(student, trainset, opts) do
# Split trainset across available nodes
nodes = DSPy.Cluster.Manager.get_available_nodes()
trainset_chunks = chunk_data(trainset, length(nodes))
# Start optimization tasks on each node
tasks =
Enum.zip(nodes, trainset_chunks)
|> Enum.map(fn {node, chunk} ->
:rpc.async_call(node, DSPy.Teleprompt.COPRO, :run_partial_optimization, [
student, chunk, opts
])
end)
# Collect and merge results
results =
tasks
|> Enum.map(&:rpc.yield/1)
|> merge_optimization_results()
results
end
defp chunk_data(data, num_chunks) do
chunk_size = div(length(data), num_chunks)
Enum.chunk_every(data, chunk_size)
end
defp merge_optimization_results(results) do
# Implement result merging logic
# This would combine candidates from different nodes
# and potentially run final evaluation round
end
end
Persistent State with Mnesia
defmodule DSPy.Storage.Schema do
def install(nodes \\ [node()]) do
# Create Mnesia schema
:mnesia.create_schema(nodes)
:mnesia.start()
# Create tables
create_tables()
:ok
end
defp create_tables do
# Optimization runs table
:mnesia.create_table(:optimization_runs, [
attributes: [:id, :student_module, :trainset_hash, :optimizer_type, :config,
:status, :results, :started_at, :completed_at],
type: :set,
disc_copies: [node()]
])
# Module parameters table
:mnesia.create_table(:module_parameters, [
attributes: [:module_id, :signature_hash, :parameters, :demos, :updated_at],
type: :set,
disc_copies: [node()]
])
# Evaluation cache table
:mnesia.create_table(:evaluation_cache, [
attributes: [:cache_key, :module_hash, :input_hash, :result, :score, :timestamp],
type: :set,
disc_copies: [node()],
index: [:module_hash, :timestamp]
])
# Training examples table
:mnesia.create_table(:training_examples, [
attributes: [:id, :dataset_id, :input_data, :output_data, :metadata, :created_at],
type: :set,
disc_copies: [node()],
index: [:dataset_id]
])
# LM usage statistics
:mnesia.create_table(:lm_usage_stats, [
attributes: [:session_id, :provider, :model, :tokens_used, :cost, :timestamp],
type: :bag,
disc_copies: [node()],
index: [:provider, :timestamp]
])
end
end
defmodule DSPy.Storage.OptimizationRuns do
def save_run(run_data) do
:mnesia.transaction(fn ->
:mnesia.write({:optimization_runs,
run_data.id,
run_data.student_module,
run_data.trainset_hash,
run_data.optimizer_type,
run_data.config,
run_data.status,
run_data.results,
run_data.started_at,
run_data.completed_at
})
end)
end
def get_run(run_id) do
:mnesia.transaction(fn ->
:mnesia.read(:optimization_runs, run_id)
end)
end
def list_runs_by_type(optimizer_type) do
:mnesia.transaction(fn ->
:mnesia.match_object({:optimization_runs, :_, :_, :_, optimizer_type, :_, :_, :_, :_, :_})
end)
end
def get_best_run_for_signature(signature_hash) do
# Find the optimization run with the best score for a given signature
:mnesia.transaction(fn ->
runs = :mnesia.match_object({:optimization_runs, :_, :_, :_, :_, :_, :_, :_, :_, :_})
runs
|> Enum.filter(fn {_, _, _, _, _, _, results, _, _} ->
results != nil and results.signature_hash == signature_hash
end)
|> Enum.max_by(fn {_, _, _, _, _, _, results, _, _} -> results.score end, fn -> nil end)
end)
end
end
Real-time Monitoring and Observability
Phoenix LiveView Dashboard
defmodule DSPyWeb.DashboardLive do
use DSPyWeb, :live_view
def mount(_params, _session, socket) do
if connected?(socket) do
# Subscribe to real-time updates
DSPy.PubSub.subscribe("optimization:progress")
DSPy.PubSub.subscribe("module:metrics")
DSPy.PubSub.subscribe("lm:usage")
# Schedule periodic updates
:timer.send_interval(1000, self(), :update_metrics)
end
socket =
socket
|> assign(:optimization_runs, get_active_optimizations())
|> assign(:module_metrics, get_module_metrics())
|> assign(:lm_usage, get_lm_usage_summary())
|> assign(:system_health, get_system_health())
{:ok, socket}
end
def handle_info({:optimization_progress, data}, socket) do
# Update optimization progress in real-time
updated_runs = update_optimization_run(socket.assigns.optimization_runs, data)
{:noreply, assign(socket, :optimization_runs, updated_runs)}
end
def handle_info({:module_metrics, metrics}, socket) do
# Update module performance metrics
{:noreply, assign(socket, :module_metrics, metrics)}
end
def handle_info({:lm_usage, usage}, socket) do
# Update LM usage statistics
updated_usage = merge_usage_stats(socket.assigns.lm_usage, usage)
{:noreply, assign(socket, :lm_usage, updated_usage)}
end
def handle_info(:update_metrics, socket) do
# Periodic system health check
health = get_system_health()
{:noreply, assign(socket, :system_health, health)}
end
def render(assigns) do
~H"""
<div class="dashboard">
<.header>DSPy System Dashboard</.header>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-6">
<!-- System Health Card -->
<.card title="System Health">
<.health_indicator health={@system_health} />
</.card>
<!-- Active Optimizations -->
<.card title="Active Optimizations">
<.optimization_list runs={@optimization_runs} />
</.card>
<!-- LM Usage -->
<.card title="LM Usage">
<.usage_chart usage={@lm_usage} />
</.card>
<!-- Module Performance -->
<.card title="Module Performance">
<.metrics_table metrics={@module_metrics} />
</.card>
</div>
<!-- Detailed Views -->
<div class="mt-8">
<.live_component
module={DSPyWeb.OptimizationDetailComponent}
id="optimization-detail"
runs={@optimization_runs}
/>
</div>
</div>
"""
end
end
defmodule DSPy.Telemetry do
def setup do
events = [
[:dspy, :module, :call, :start],
[:dspy, :module, :call, :stop],
[:dspy, :module, :call, :exception],
[:dspy, :lm, :request, :start],
[:dspy, :lm, :request, :stop],
[:dspy, :lm, :request, :exception],
[:dspy, :optimization, :start],
[:dspy, :optimization, :stop],
[:dspy, :cache, :hit],
[:dspy, :cache, :miss]
]
:telemetry.attach_many(
"dspy-telemetry",
events,
&handle_event/4,
%{}
)
end
def handle_event([:dspy, :module, :call, :start], measurements, metadata, _config) do
# Track module execution start
Phoenix.PubSub.broadcast(DSPy.PubSub, "module:metrics", {
:module_start,
%{
module_id: metadata.module_id,
timestamp: measurements.system_time,
inputs: metadata.inputs
}
})
end
def handle_event([:dspy, :module, :call, :stop], measurements, metadata, _config) do
# Track module execution completion
duration = measurements.duration
Phoenix.PubSub.broadcast(DSPy.PubSub, "module:metrics", {
:module_complete,
%{
module_id: metadata.module_id,
duration: duration,
outputs: metadata.outputs,
success: true
}
})
# Update metrics in ETS
DSPy.Metrics.record_execution(metadata.module_id, duration, true)
end
def handle_event([:dspy, :lm, :request, :stop], measurements, metadata, _config) do
# Track LM usage
usage = %{
provider: metadata.provider,
model: metadata.model,
tokens_used: metadata.tokens_used,
cost: metadata.cost,
duration: measurements.duration
}
Phoenix.PubSub.broadcast(DSPy.PubSub, "lm:usage", usage)
# Store in persistent storage for billing/analytics
DSPy.Storage.LMUsage.record(usage)
end
end
Error Handling and Recovery Patterns
Circuit Breaker Implementation
defmodule DSPy.CircuitBreaker do
use GenServer
defstruct [
:name,
:state, # :closed | :open | :half_open
:failure_count,
:failure_threshold,
:timeout,
:last_failure_time,
:metrics
]
def start_link(opts) do
name = opts[:name] || __MODULE__
GenServer.start_link(__MODULE__, opts, name: name)
end
def call(circuit_breaker, function) when is_function(function) do
GenServer.call(circuit_breaker, {:call, function})
end
def init(opts) do
state = %__MODULE__{
name: opts[:name],
state: :closed,
failure_count: 0,
failure_threshold: opts[:failure_threshold] || 5,
timeout: opts[:timeout] || 60_000, # 1 minute
last_failure_time: nil,
metrics: DSPy.Metrics.new()
}
{:ok, state}
end
def handle_call({:call, function}, _from, %{state: :open} = state) do
# Check if timeout has elapsed
if timeout_elapsed?(state) do
# Transition to half-open and try the call
new_state = %{state | state: :half_open}
execute_function(function, new_state)
else
# Circuit is open, fail fast
{:reply, {:error, :circuit_open}, state}
end
end
def handle_call({:call, function}, _from, state) do
execute_function(function, state)
end
defp execute_function(function, state) do
try do
result = function.()
# Success - reset circuit breaker
new_state = %{state |
state: :closed,
failure_count: 0,
metrics: DSPy.Metrics.record_success(state.metrics, result)
}
{:reply, {:ok, result}, new_state}
rescue
error ->
# Failure - increment count and possibly open circuit
new_failure_count = state.failure_count + 1
new_metrics = DSPy.Metrics.record_error(state.metrics, error)
new_state =
if new_failure_count >= state.failure_threshold do
%{state |
state: :open,
failure_count: new_failure_count,
last_failure_time: DateTime.utc_now(),
metrics: new_metrics
}
else
%{state |
failure_count: new_failure_count,
metrics: new_metrics
}
end
{:reply, {:error, error}, new_state}
end
end
defp timeout_elapsed?(%{last_failure_time: nil}), do: false
defp timeout_elapsed?(%{last_failure_time: last_time, timeout: timeout}) do
DateTime.diff(DateTime.utc_now(), last_time, :millisecond) >= timeout
end
end
defmodule DSPy.Supervisor.Strategies do
@moduledoc """
Custom supervisor strategies for different DSPy components
"""
def module_supervisor_spec do
# Modules should restart immediately but with exponential backoff
# if they keep failing
%{
strategy: :one_for_one,
intensity: 5,
period: 60,
restart: :transient,
shutdown: 5000
}
end
def lm_provider_supervisor_spec do
# LM providers should restart but with circuit breaker protection
%{
strategy: :rest_for_one,
intensity: 3,
period: 30,
restart: :permanent,
shutdown: 10000
}
end
def optimization_supervisor_spec do
# Optimization processes are temporary and shouldn't restart
%{
strategy: :one_for_one,
intensity: 1,
period: 10,
restart: :temporary,
shutdown: :infinity
}
end
end
Poison Message Handling
defmodule DSPy.PoisonMessage.Handler do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
state = %{
poison_messages: %{},
quarantine_threshold: 3,
quarantine_duration: :timer.hours(1)
}
{:ok, state}
end
def report_poison_message(message, error, context) do
GenServer.cast(__MODULE__, {:poison_message, message, error, context})
end
def is_quarantined?(message) do
GenServer.call(__MODULE__, {:is_quarantined, message})
end
def handle_cast({:poison_message, message, error, context}, state) do
message_hash = hash_message(message)
current_count = Map.get(state.poison_messages, message_hash, 0)
new_count = current_count + 1
new_state =
if new_count >= state.quarantine_threshold do
# Quarantine the message
quarantine_message(message, error, context)
%{state | poison_messages: Map.put(state.poison_messages, message_hash, new_count)}
else
%{state | poison_messages: Map.put(state.poison_messages, message_hash, new_count)}
end
{:noreply, new_state}
end
def handle_call({:is_quarantined, message}, _from, state) do
message_hash = hash_message(message)
count = Map.get(state.poison_messages, message_hash, 0)
quarantined = count >= state.quarantine_threshold
{:reply, quarantined, state}
end
defp quarantine_message(message, error, context) do
# Store in persistent storage for analysis
DSPy.Storage.PoisonMessages.store(%{
message: message,
error: error,
context: context,
quarantined_at: DateTime.utc_now(),
hash: hash_message(message)
})
# Notify administrators
DSPy.Notifications.send_poison_message_alert(message, error, context)
end
defp hash_message(message) do
:crypto.hash(:sha256, :erlang.term_to_binary(message))
|> Base.encode16()
end
end
defmodule DSPy.Module.SafeWrapper do
@moduledoc """
Wraps module execution with poison message detection and recovery
"""
def safe_call(module_pid, inputs, opts \\ []) do
if DSPy.PoisonMessage.Handler.is_quarantined?(inputs) do
{:error, :quarantined_input}
else
try do
case GenServer.call(module_pid, {:forward, inputs, opts}) do
{:ok, result} -> {:ok, result}
{:error, reason} = error ->
# Check if this should be considered a poison message
if should_quarantine?(reason, opts) do
DSPy.PoisonMessage.Handler.report_poison_message(
inputs,
reason,
%{module_pid: module_pid, opts: opts}
)
end
error
end
catch
:exit, reason ->
# Process crashed - definitely a poison message
DSPy.PoisonMessage.Handler.report_poison_message(
inputs,
{:process_crash, reason},
%{module_pid: module_pid, opts: opts}
)
{:error, :process_crashed}
end
end
end
defp should_quarantine?(reason, _opts) do
case reason do
:timeout -> true
:invalid_format -> true
{:parse_error, _} -> true
_ -> false
end
end
end
Performance Optimization Patterns
ETS-based Caching Layer
defmodule DSPy.Cache.Manager do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
# Create ETS tables for different cache types
:ets.new(:lm_response_cache, [:named_table, :set, :public, read_concurrency: true])
:ets.new(:module_result_cache, [:named_table, :set, :public, read_concurrency: true])
:ets.new(:evaluation_cache, [:named_table, :set, :public, read_concurrency: true])
# Start cache cleanup timer
:timer.send_interval(:timer.minutes(10), self(), :cleanup_expired)
{:ok, %{}}
end
def get(cache_type, key) do
table = cache_table(cache_type)
case :ets.lookup(table, key) do
[{^key, value, expires_at}] ->
if DateTime.compare(DateTime.utc_now(), expires_at) == :lt do
{:hit, value}
else
:ets.delete(table, key)
:miss
end
[] ->
:miss
end
end
def put(cache_type, key, value, ttl \\ :timer.minutes(30)) do
table = cache_table(cache_type)
expires_at = DateTime.add(DateTime.utc_now(), ttl, :millisecond)
:ets.insert(table, {key, value, expires_at})
:ok
end
def handle_info(:cleanup_expired, state) do
now = DateTime.utc_now()
# Clean up expired entries from all tables
[:lm_response_cache, :module_result_cache, :evaluation_cache]
|> Enum.each(fn table ->
cleanup_table(table, now)
end)
{:noreply, state}
end
defp cache_table(:lm_response), do: :lm_response_cache
defp cache_table(:module_result), do: :module_result_cache
defp cache_table(:evaluation), do: :evaluation_cache
defp cleanup_table(table, now) do
# Use select_delete for efficient cleanup
match_spec = [
{{:_, :_, :"$1"}, [{:<, :"$1", {:const, now}}], [true]}
]
:ets.select_delete(table, match_spec)
end
end
defmodule DSPy.Cache.Key do
@moduledoc """
Utilities for generating consistent cache keys
"""
def lm_key(provider, model, messages, opts) do
# Create deterministic key for LM responses
content = %{
provider: provider,
model: model,
messages: normalize_messages(messages),
opts: normalize_opts(opts)
}
:crypto.hash(:sha256, :erlang.term_to_binary(content))
|> Base.encode16()
end
def module_key(module_id, signature_hash, inputs) do
content = %{
module_id: module_id,
signature_hash: signature_hash,
inputs: normalize_inputs(inputs)
}
:crypto.hash(:sha256, :erlang.term_to_binary(content))
|> Base.encode16()
end
defp normalize_messages(messages) do
# Normalize messages for consistent hashing
messages
|> Enum.map(fn msg ->
msg
|> Map.take(["role", "content", "name"])
|> Map.update("content", "", &String.trim/1)
end)
end
defp normalize_opts(opts) do
# Only include cache-relevant options
opts
|> Map.take(["temperature", "max_tokens", "top_p", "frequency_penalty"])
end
defp normalize_inputs(inputs) do
# Normalize inputs for consistent hashing
inputs
|> Enum.sort()
|> Enum.into(%{})
end
end
Batch Processing with GenStage
defmodule DSPy.BatchProcessor do
@moduledoc """
Batch processing pipeline for efficient LM calls and evaluations
"""
def start_link(opts) do
# Set up GenStage pipeline
{:ok, producer} = DSPy.BatchProcessor.Producer.start_link(opts)
{:ok, processor} = DSPy.BatchProcessor.Processor.start_link(opts)
{:ok, consumer} = DSPy.BatchProcessor.Consumer.start_link(opts)
# Connect the pipeline
GenStage.sync_subscribe(processor, to: producer)
GenStage.sync_subscribe(consumer, to: processor)
{:ok, %{producer: producer, processor: processor, consumer: consumer}}
end
end
defmodule DSPy.BatchProcessor.Producer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
def init(opts) do
{:producer, %{
queue: :queue.new(),
demand: 0,
batch_size: opts[:batch_size] || 10
}}
end
def add_request(producer, request) do
GenStage.cast(producer, {:add_request, request})
end
def handle_demand(demand, state) when demand > 0 do
dispatch_events(%{state | demand: state.demand + demand})
end
def handle_cast({:add_request, request}, state) do
new_queue = :queue.in(request, state.queue)
dispatch_events(%{state | queue: new_queue})
end
defp dispatch_events(%{demand: 0} = state) do
{:noreply, [], state}
end
defp dispatch_events(%{demand: demand, queue: queue} = state) do
{events, new_queue, new_demand} = take_events(queue, demand, [])
{:noreply, events, %{state | queue: new_queue, demand: new_demand}}
end
defp take_events(queue, 0, acc), do: {Enum.reverse(acc), queue, 0}
defp take_events(queue, demand, acc) do
case :queue.out(queue) do
{{:value, event}, new_queue} ->
take_events(new_queue, demand - 1, [event | acc])
{:empty, queue} ->
{Enum.reverse(acc), queue, demand}
end
end
end
defmodule DSPy.BatchProcessor.Processor do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
def init(opts) do
{:producer_consumer, %{
batch_size: opts[:batch_size] || 10,
lm_worker: opts[:lm_worker]
}}
end
def handle_events(events, _from, state) do
# Group events into batches for efficient processing
batches = Enum.chunk_every(events, state.batch_size)
processed_events =
batches
|> Enum.map(fn batch ->
process_batch(batch, state)
end)
|> List.flatten()
{:noreply, processed_events, state}
end
defp process_batch(batch, state) do
# Combine requests for efficient LM batching
combined_messages = combine_batch_messages(batch)
case DSPy.LM.Provider.batch_complete(state.lm_worker, combined_messages) do
{:ok, results} ->
# Split results back to individual requests
split_batch_results(batch, results)
{:error, reason} ->
# Handle batch failure
Enum.map(batch, fn request ->
%{request | status: :error, error: reason}
end)
end
end
end
This comprehensive technical specification provides the detailed architecture for porting DSPy to Elixir/BEAM, leveraging OTP’s strengths while maintaining DSPy’s core functionality. The design emphasizes fault tolerance, scalability, and the “let it crash” philosophy while providing robust error handling and recovery mechanisms.