9. NEW: Advanced Evaluation System
defmodule DSPEx.Teleprompter.SIMBA.Evaluation do
@moduledoc """
Advanced evaluation system for SIMBA with comprehensive metrics and analysis.
"""
alias DSPEx.{Program, Example}
@type evaluation_result :: %{
score: float(),
detailed_scores: map(),
execution_time: non_neg_integer(),
memory_usage: non_neg_integer(),
success: boolean(),
error: any() | nil,
metadata: map()
}
@spec evaluate_comprehensive(Program.t(), [Example.t()], function()) :: evaluation_result()
def evaluate_comprehensive(program, examples, metric_fn) do
start_time = System.monotonic_time()
{memory_before, _} = :erlang.process_info(self(), [:memory, :heap_size])
# Execute on all examples with detailed tracking
results = examples
|> Task.async_stream(
fn example ->
evaluate_single_example_detailed(program, example, metric_fn)
end,
max_concurrency: 10,
timeout: 30_000,
on_timeout: :kill_task
)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} -> %{success: false, score: 0.0, error: reason}
end)
{memory_after, _} = :erlang.process_info(self(), [:memory, :heap_size])
execution_time = System.monotonic_time() - start_time
# Aggregate results
successful_results = Enum.filter(results, & &1.success)
total_score = if Enum.empty?(successful_results) do
0.0
else
successful_results
|> Enum.map(& &1.score)
|> Enum.sum()
|> Kernel./(length(successful_results))
end
%{
score: total_score,
detailed_scores: calculate_detailed_scores(results),
execution_time: execution_time,
memory_usage: memory_after - memory_before,
success: length(successful_results) > 0,
error: aggregate_errors(results),
metadata: %{
total_examples: length(examples),
successful_examples: length(successful_results),
success_rate: length(successful_results) / length(examples),
score_distribution: calculate_score_distribution(results),
performance_percentiles: calculate_performance_percentiles(results)
}
}
end
defp evaluate_single_example_detailed(program, example, metric_fn) do
start_time = System.monotonic_time()
inputs = Example.inputs(example)
case Program.forward(program, inputs) do
{:ok, outputs} ->
score = try do
metric_fn.(example, outputs)
rescue
error ->
{:error_in_metric, error}
catch
thrown ->
{:error_in_metric, thrown}
end
case score do
score when is_number(score) ->
%{
success: true,
score: score,
execution_time: System.monotonic_time() - start_time,
outputs: outputs,
error: nil
}
error ->
%{
success: false,
score: 0.0,
execution_time: System.monotonic_time() - start_time,
outputs: outputs,
error: error
}
end
{:error, reason} ->
%{
success: false,
score: 0.0,
execution_time: System.monotonic_time() - start_time,
outputs: %{},
error: reason
}
end
rescue
error ->
%{
success: false,
score: 0.0,
execution_time: System.monotonic_time() - start_time,
outputs: %{},
error: {:execution_exception, error}
}
end
defp calculate_detailed_scores(results) do
successful_results = Enum.filter(results, & &1.success)
scores = Enum.map(successful_results, & &1.score)
if Enum.empty?(scores) do
%{
mean: 0.0,
median: 0.0,
min: 0.0,
max: 0.0,
std_dev: 0.0
}
else
sorted_scores = Enum.sort(scores)
mean = Enum.sum(scores) / length(scores)
%{
mean: mean,
median: calculate_median(sorted_scores),
min: List.first(sorted_scores),
max: List.last(sorted_scores),
std_dev: calculate_standard_deviation(scores, mean)
}
end
end
defp calculate_median(sorted_list) when length(sorted_list) == 0, do: 0.0
defp calculate_median(sorted_list) do
len = length(sorted_list)
mid = div(len, 2)
if rem(len, 2) == 0 do
(Enum.at(sorted_list, mid - 1) + Enum.at(sorted_list, mid)) / 2
else
Enum.at(sorted_list, mid)
end
end
defp calculate_standard_deviation(scores, mean) when length(scores) < 2, do: 0.0
defp calculate_standard_deviation(scores, mean) do
variance = scores
|> Enum.map(fn score -> :math.pow(score - mean, 2) end)
|> Enum.sum()
|> Kernel./(length(scores) - 1)
:math.sqrt(variance)
end
defp aggregate_errors(results) do
errors = results
|> Enum.filter(fn result -> not result.success end)
|> Enum.map(& &1.error)
|> Enum.group_by(& &1)
|> Enum.map(fn {error_type, occurrences} ->
{error_type, length(occurrences)}
end)
if Enum.empty?(errors), do: nil, else: errors
end
defp calculate_score_distribution(results) do
scores = results
|> Enum.filter(& &1.success)
|> Enum.map(& &1.score)
if Enum.empty?(scores) do
%{buckets: [], total_count: 0}
else
# Create score buckets (0-0.1, 0.1-0.2, ..., 0.9-1.0)
buckets = 0..9
|> Enum.map(fn i ->
lower = i / 10
upper = (i + 1) / 10
count = Enum.count(scores, fn score -> score >= lower and score < upper end)
# Handle the special case for score = 1.0
count = if i == 9 do
count + Enum.count(scores, fn score -> score == 1.0 end)
else
count
end
%{range: "#{lower}-#{upper}", count: count}
end)
%{buckets: buckets, total_count: length(scores)}
end
end
defp calculate_performance_percentiles(results) do
scores = results
|> Enum.filter(& &1.success)
|> Enum.map(& &1.score)
|> Enum.sort()
if Enum.empty?(scores) do
%{}
else
%{
p10: calculate_percentile(scores, 0.1),
p25: calculate_percentile(scores, 0.25),
p50: calculate_percentile(scores, 0.5),
p75: calculate_percentile(scores, 0.75),
p90: calculate_percentile(scores, 0.9),
p95: calculate_percentile(scores, 0.95),
p99: calculate_percentile(scores, 0.99)
}
end
end
defp calculate_percentile(sorted_scores, percentile) do
len = length(sorted_scores)
index = (len - 1) * percentile
if index == trunc(index) do
Enum.at(sorted_scores, trunc(index))
else
lower_index = floor(index)
upper_index = ceil(index)
weight = index - lower_index
lower_value = Enum.at(sorted_scores, lower_index)
upper_value = Enum.at(sorted_scores, upper_index)
lower_value + weight * (upper_value - lower_value)
end
end
end
10. NEW: Predictor Mapping System
defmodule DSPEx.Teleprompter.SIMBA.PredictorMapping do
@moduledoc """
System for building and managing predictor mappings for program introspection.
This module analyzes programs to extract predictors and create bidirectional
mappings between predictor names and actual predictor instances.
"""
alias DSPEx.Program
@type predictor_info :: %{
name: String.t(),
type: atom(),
signature: any(),
instructions: String.t() | nil,
metadata: map()
}
@type predictor_mappings :: {
predictor_to_name :: %{any() => String.t()},
name_to_predictor :: %{String.t() => any()}
}
@spec build_predictor_mappings(Program.t()) :: predictor_mappings()
def build_predictor_mappings(program) do
predictors = extract_predictors_from_program(program)
predictor_to_name = predictors
|> Enum.with_index()
|> Enum.reduce(%{}, fn {{predictor, info}, index}, acc ->
name = generate_predictor_name(info, index)
Map.put(acc, predictor, name)
end)
name_to_predictor = predictor_to_name
|> Enum.reduce(%{}, fn {predictor, name}, acc ->
Map.put(acc, name, predictor)
end)
{predictor_to_name, name_to_predictor}
end
@spec extract_predictors_from_program(Program.t()) :: [{any(), predictor_info()}]
defp extract_predictors_from_program(program) do
case program do
# ChainOfThought program
%{predictors: predictors} when is_list(predictors) ->
predictors
|> Enum.with_index()
|> Enum.map(fn {predictor, index} ->
info = analyze_predictor(predictor, "step_#{index}")
{predictor, info}
end)
# Single predictor program
%{predictor: predictor} when not is_nil(predictor) ->
info = analyze_predictor(predictor, "main")
[{predictor, info}]
# OptimizedProgram wrapper
%DSPEx.OptimizedProgram{program: inner_program} ->
extract_predictors_from_program(inner_program)
# Try to introspect the program structure
_ ->
introspect_program_predictors(program)
end
end
@spec analyze_predictor(any(), String.t()) :: predictor_info()
defp analyze_predictor(predictor, default_name) do
signature = extract_signature(predictor)
instructions = extract_instructions(predictor)
predictor_type = determine_predictor_type(predictor)
%{
name: default_name,
type: predictor_type,
signature: signature,
instructions: instructions,
metadata: %{
has_signature: not is_nil(signature),
has_instructions: not is_nil(instructions),
extracted_at: DateTime.utc_now()
}
}
end
defp extract_signature(predictor) do
cond do
is_map(predictor) and Map.has_key?(predictor, :signature) ->
predictor.signature
function_exported?(predictor, :signature, 0) ->
try do
predictor.signature()
rescue
_ -> nil
end
true ->
nil
end
end
defp extract_instructions(predictor) do
signature = extract_signature(predictor)
cond do
is_nil(signature) ->
nil
function_exported?(signature, :instructions, 0) ->
try do
signature.instructions()
rescue
_ -> nil
end
is_map(signature) and Map.has_key?(signature, :instructions) ->
signature.instructions
true ->
nil
end
end
defp determine_predictor_type(predictor) do
cond do
is_map(predictor) and Map.has_key?(predictor, :__struct__) ->
predictor.__struct__
|> Module.split()
|> List.last()
|> String.downcase()
|> String.to_atom()
is_function(predictor) ->
:function
is_atom(predictor) ->
:module
true ->
:unknown
end
end
defp introspect_program_predictors(program) do
# Try to find predictor-like fields through introspection
program_fields = if is_map(program) do
Map.keys(program)
else
[]
end
predictor_fields = Enum.filter(program_fields, fn field ->
field_name = to_string(field)
String.contains?(field_name, "predict") or
String.contains?(field_name, "forward") or
String.contains?(field_name, "generate")
end)
predictor_fields
|> Enum.map(fn field ->
predictor = Map.get(program, field)
info = analyze_predictor(predictor, to_string(field))
{predictor, info}
end)
end
defp generate_predictor_name(info, index) do
base_name = case info.type do
:chainofthought -> "cot"
:generate -> "gen"
:predict -> "pred"
:classify -> "cls"
_ -> "pred"
end
if info.metadata.has_signature and not is_nil(info.instructions) do
instruction_summary = info.instructions
|> String.split()
|> Enum.take(3)
|> Enum.join("_")
|> String.downcase()
|> String.replace(~r/[^a-z0-9_]/, "")
"#{base_name}_#{index}_#{instruction_summary}"
else
"#{base_name}_#{index}"
end
end
@spec find_predictor_by_name(predictor_mappings(), String.t()) :: any() | nil
def find_predictor_by_name({_predictor_to_name, name_to_predictor}, name) do
Map.get(name_to_predictor, name)
end
@spec find_name_by_predictor(predictor_mappings(), any()) :: String.t() | nil
def find_name_by_predictor({predictor_to_name, _name_to_predictor}, predictor) do
Map.get(predictor_to_name, predictor)
end
@spec list_all_predictors(predictor_mappings()) :: [String.t()]
def list_all_predictors({_predictor_to_name, name_to_predictor}) do
Map.keys(name_to_predictor)
end
end
11. NEW: Adaptive Temperature Scheduling
defmodule DSPEx.Teleprompter.SIMBA.TemperatureScheduler do
@moduledoc """
Adaptive temperature scheduling for SIMBA optimization.
Implements multiple temperature schedules:
- Linear decay
- Exponential decay
- Cosine annealing
- Adaptive based on performance
"""
@type schedule_type :: :linear | :exponential | :cosine | :adaptive
@type scheduler_state :: %{
initial_temp: float(),
current_temp: float(),
schedule_type: schedule_type(),
step: non_neg_integer(),
max_steps: non_neg_integer(),
performance_history: [float()],
last_improvement: non_neg_integer()
}
@spec new(schedule_type(), float(), non_neg_integer()) :: scheduler_state()
def new(schedule_type, initial_temp, max_steps) do
%{
initial_temp: initial_temp,
current_temp: initial_temp,
schedule_type: schedule_type,
step: 0,
max_steps: max_steps,
performance_history: [],
last_improvement: 0
}
end
@spec update(scheduler_state(), float()) :: scheduler_state()
def update(state, current_performance) do
updated_history = [current_performance | state.performance_history] |> Enum.take(10)
# Check for improvement
last_improvement = if improved?(updated_history) do
state.step
else
state.last_improvement
end
# Calculate new temperature based on schedule
new_temp = calculate_temperature(state.schedule_type, state)
%{state |
current_temp: new_temp,
step: state.step + 1,
performance_history: updated_history,
last_improvement: last_improvement
}
end
defp improved?(history) when length(history) < 2, do: true
defp improved?([current, previous | _]), do: current > previous
defp calculate_temperature(:linear, state) do
progress = state.step / state.max_steps
state.initial_temp * (1.0 - progress)
end
defp calculate_temperature(:exponential, state) do
decay_rate = 0.95
state.initial_temp * :math.pow(decay_rate, state.step)
end
defp calculate_temperature(:cosine, state) do
progress = state.step / state.max_steps
min_temp = 0.1
temp_range = state.initial_temp - min_temp
min_temp + temp_range * 0.5 * (1 + :math.cos(:math.pi() * progress))
end
defp calculate_temperature(:adaptive, state) do
base_temp = calculate_temperature(:cosine, state)
# Adjust based on recent performance
steps_since_improvement = state.step - state.last_improvement
cond do
# No recent improvement - increase exploration
steps_since_improvement > 3 ->
min(base_temp * 1.5, state.initial_temp)
# Recent improvement - decrease exploration
steps_since_improvement == 0 ->
base_temp * 0.8
# Default
true ->
base_temp
end
end
end
12. NEW: Memory-Efficient Trajectory Management
defmodule DSPEx.Teleprompter.SIMBA.TrajectoryManager do
@moduledoc """
Memory-efficient trajectory management with selective storage and compression.
"""
use GenServer
alias DSPEx.Teleprompter.SIMBA.Trajectory
@type trajectory_summary :: %{
score: float(),
program_type: atom(),
execution_time: non_neg_integer(),
success: boolean(),
hash: binary()
}
defstruct [
:max_trajectories,
:compression_threshold,
trajectories: [],
summaries: [],
total_stored: 0,
memory_usage: 0
]
@type t :: %__MODULE__{
max_trajectories: pos_integer(),
compression_threshold: pos_integer(),
trajectories: [Trajectory.t()],
summaries: [trajectory_summary()],
total_stored: non_neg_integer(),
memory_usage: non_neg_integer()
}
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def store_trajectory(trajectory) do
GenServer.call(__MODULE__, {:store_trajectory, trajectory})
end
def get_recent_trajectories(count \\ 100) do
GenServer.call(__MODULE__, {:get_recent, count})
end
def get_trajectory_statistics() do
GenServer.call(__MODULE__, :get_statistics)
end
def cleanup_old_trajectories() do
GenServer.cast(__MODULE__, :cleanup)
end
# Server Callbacks
@impl GenServer
def init(opts) do
state = %__MODULE__{
max_trajectories: Keyword.get(opts, :max_trajectories, 1000),
compression_threshold: Keyword.get(opts, :compression_threshold, 500)
}
# Schedule periodic cleanup
:timer.send_interval(30_000, :cleanup)
{:ok, state}
end
@impl GenServer
def handle_call({:store_trajectory, trajectory}, _from, state) do
{updated_state, stored} = store_trajectory_internal(state, trajectory)
{:reply, stored, updated_state}
end
def handle_call({:get_recent, count}, _from, state) do
recent = Enum.take(state.trajectories, count)
{:reply, recent, state}
end
def handle_call(:get_statistics, _from, state) do
stats = calculate_statistics(state)
{:reply, stats, state}
end
@impl GenServer
def handle_cast(:cleanup, state) do
updated_state = cleanup_trajectories(state)
{:noreply, updated_state}
end
@impl GenServer
def handle_info(:cleanup, state) do
updated_state = cleanup_trajectories(state)
{:noreply, updated_state}
end
# Internal Functions
defp store_trajectory_internal(state, trajectory) do
# Calculate trajectory hash for deduplication
trajectory_hash = calculate_trajectory_hash(trajectory)
# Check if we already have this trajectory
existing_hash = Enum.find(state.summaries, &(&1.hash == trajectory_hash))
if existing_hash do
{state, false} # Don't store duplicate
else
# Store trajectory and create summary
summary = create_trajectory_summary(trajectory, trajectory_hash)
updated_trajectories = [trajectory | state.trajectories]
updated_summaries = [summary | state.summaries]
# Check if we need compression/cleanup
updated_state = %{state |
trajectories: updated_trajectories,
summaries: updated_summaries,
total_stored: state.total_stored + 1
}
final_state = if length(updated_trajectories) > state.compression_threshold do
compress_old_trajectories(updated_state)
else
updated_state
end
{final_state, true}
end
end
defp calculate_trajectory_hash(trajectory) do
hash_data = %{
inputs: trajectory.inputs,
outputs: trajectory.outputs,
program_type: trajectory.metadata[:program_type],
model_config: trajectory.model_config
}
hash_data
|> :erlang.term_to_binary()
|> :crypto.hash(:sha256)
end
defp create_trajectory_summary(trajectory, hash) do
%{
score: trajectory.score,
program_type: trajectory.metadata[:program_type] || :unknown,
execution_time: trajectory.duration,
success: trajectory.success,
hash: hash
}
end
defp compress_old_trajectories(state) do
# Keep recent high-performing trajectories, compress the rest
{keep_full, compress} = state.trajectories
|> Enum.with_index()
|> Enum.split_with(fn {trajectory, index} ->
index < 100 or trajectory.score > 0.8 # Keep recent or high-scoring
end)
kept_trajectories = Enum.map(keep_full, fn {trajectory, _} -> trajectory end)
compressed_summaries = Enum.map(compress, fn {trajectory, _} ->
create_trajectory_summary(trajectory, calculate_trajectory_hash(trajectory))
end)
%{state |
trajectories: kept_trajectories,
summaries: state.summaries ++ compressed_summaries
}
end
defp cleanup_trajectories(state) do
# Remove oldest trajectories if we exceed max_trajectories
if length(state.trajectories) > state.max_trajectories do
kept_trajectories = Enum.take(state.trajectories, state.max_trajectories)
%{state | trajectories: kept_trajectories}
else
state
end
end
defp calculate_statistics(state) do
trajectory_scores = Enum.map(state.trajectories, & &1.score)
summary_scores = Enum.map(state.summaries, & &1.score)
all_scores = trajectory_scores ++ summary_scores
%{
total_trajectories: length(state.trajectories),
total_summaries: length(state.summaries),
total_stored: state.total_stored,
memory_usage_estimate: estimate_memory_usage(state),
score_statistics: if Enum.empty?(all_scores) do
%{}
else
%{
mean: Enum.sum(all_scores) / length(all_scores),
min: Enum.min(all_scores),
max: Enum.max(all_scores),
count: length(all_scores)
}
end
}
end
defp estimate_memory_usage(state) do
# Rough estimate of memory usage
trajectory_size = length(state.trajectories) * 1024 # Assume ~1KB per trajectory
summary_size = length(state.summaries) * 100 # Assume ~100B per summary
trajectory_size + summary_size
end
end
Part V: Configuration & Integration
13. NEW: Enhanced Configuration System
defmodule DSPEx.Teleprompter.SIMBA.Config do
@moduledoc """
Enhanced configuration system for SIMBA with validation and presets.
"""
@type strategy_config :: %{
name: atom(),
weight: float(),
params: map()
}
@type config :: %{
# Core algorithm parameters
max_steps: pos_integer(),
bsize: pos_integer(),
num_candidates: pos_integer(),
num_threads: pos_integer(),
# Temperature and sampling
temperature_for_sampling: float(),
temperature_for_candidates: float(),
temperature_schedule: atom(),
# Strategy configuration
strategies: [strategy_config()],
strategy_selection: :random | :weighted | :adaptive,
# Convergence and stopping
convergence_detection: boolean(),
early_stopping_patience: pos_integer(),
min_improvement_threshold: float(),
# Performance and memory
memory_limit_mb: pos_integer(),
trajectory_retention: pos_integer(),
evaluation_batch_size: pos_integer(),
# Observability
telemetry_enabled: boolean(),
progress_callback: function() | nil,
correlation_id: String.t(),
# Advanced features
adaptive_batch_size: boolean(),
dynamic_candidate_count: boolean(),
predictor_analysis: boolean()
}
@default_config %{
max_steps: 20,
bsize: 4,
num_candidates: 8,
num_threads: 20,
temperature_for_sampling: 1.4,
temperature_for_candidates: 0.7,
temperature_schedule: :cosine,
strategies: [
%{name: :append_demo, weight: 0.7, params: %{}},
%{name: :append_rule, weight: 0.3, params: %{}}
],
strategy_selection: :weighted,
convergence_detection: true,
early_stopping_patience: 5,
min_improvement_threshold: 0.01,
memory_limit_mb: 512,
trajectory_retention: 1000,
evaluation_batch_size: 10,
telemetry_enabled: true,
progress_callback: nil,
correlation_id: nil,
adaptive_batch_size: false,
dynamic_candidate_count: false,
predictor_analysis: true
}
@spec new(map()) :: {:ok, config()} | {:error, term()}
def new(opts \\ %{}) do
config = Map.merge(@default_config, opts)
case validate_config(config) do
:ok ->
{:ok, normalize_config(config)}
{:error, reason} ->
{:error, reason}
end
end
@spec get_preset(:fast | :balanced | :thorough | :memory_efficient) :: config()
def get_preset(:fast) do
Map.merge(@default_config, %{
max_steps: 10,
bsize: 8,
num_candidates: 4,
num_threads: 10,
convergence_detection: false,
trajectory_retention: 200
})
end