Phase 1 Core Foundation: Schema Engine Design
Revolutionary Schema System for ElixirML/DSPEx
Executive Summary
The Schema Engine represents the foundational data validation and transformation layer for ElixirML/DSPEx, built on Sinter’s advanced schema capabilities. This system provides compile-time and runtime schema validation, automatic data transformation, and seamless integration with the Variable System and Resource Framework.
Design Philosophy
Core Principles
- Schema-First Development: All data flows through validated schemas
- Runtime Flexibility: Dynamic schema creation and modification
- Compile-Time Safety: Maximum validation at compile time
- Transformation Pipeline: Automatic data transformation and normalization
- Integration Ready: Native integration with Variables and Resources
Strategic Advantages
- Type Safety: Comprehensive validation prevents runtime errors
- Performance: Optimized validation with minimal overhead
- Flexibility: Support for both static and dynamic schemas
- Composability: Schemas can be composed and extended
- Introspection: Full schema metadata for optimization
Architecture Overview
Schema Engine Architecture
├── 🏗️ Core Schema System
│ ├── Schema Definition DSL
│ ├── Validation Engine
│ ├── Transformation Pipeline
│ └── Metadata Extraction
│
├── 🔧 Integration Layer
│ ├── Variable System Integration
│ ├── Resource Framework Integration
│ ├── Program Schema Binding
│ └── Adapter Schema Validation
│
├── 🚀 Performance Layer
│ ├── Compile-Time Optimization
│ ├── Runtime Caching
│ ├── Lazy Validation
│ └── Batch Processing
│
└── 🧪 Development Tools
├── Schema Visualization
├── Validation Testing
├── Performance Profiling
└── Migration Utilities
Core Schema System
1. Schema Definition DSL
defmodule ElixirML.Schema do
@moduledoc """
The foundational schema system for ElixirML/DSPEx.
Provides compile-time and runtime schema validation with
automatic transformation capabilities.
"""
defmacro defschema(name, do: block) do
quote do
defmodule unquote(name) do
use ElixirML.Schema.Definition
# Import schema DSL
import ElixirML.Schema.DSL
# Process schema definition
unquote(block)
# Generate validation and transformation functions
@before_compile ElixirML.Schema.Compiler
end
end
end
defmacro field(name, type, opts \\ []) do
quote do
@fields {unquote(name), unquote(type), unquote(opts)}
end
end
defmacro validation(name, do: block) do
quote do
@validations {unquote(name), unquote(block)}
end
end
defmacro transform(name, do: block) do
quote do
@transforms {unquote(name), unquote(block)}
end
end
end
2. Advanced Schema Types
defmodule ElixirML.Schema.Types do
@moduledoc """
Advanced type system for ElixirML schemas.
Supports complex nested structures, unions, and ML-specific types.
"""
# Basic types
@type basic_type :: :string | :integer | :float | :boolean | :atom | :binary
# Complex types
@type complex_type ::
{:array, schema_type()} |
{:map, schema_type()} |
{:union, [schema_type()]} |
{:struct, module()} |
{:enum, [atom()]}
# ML-specific types
@type ml_type ::
:embedding |
:tensor |
:token_list |
:probability |
:confidence_score |
:model_response
@type schema_type :: basic_type() | complex_type() | ml_type()
# Type validation functions
def validate_type(value, :embedding) do
case value do
list when is_list(list) and length(list) > 0 ->
if Enum.all?(list, &is_number/1) do
{:ok, value}
else
{:error, "Embedding must be list of numbers"}
end
_ -> {:error, "Invalid embedding format"}
end
end
def validate_type(value, :probability) do
case value do
num when is_number(num) and num >= 0.0 and num <= 1.0 ->
{:ok, value}
_ -> {:error, "Probability must be between 0.0 and 1.0"}
end
end
def validate_type(value, :confidence_score) do
validate_type(value, :probability)
end
def validate_type(value, :token_list) do
case value do
list when is_list(list) ->
if Enum.all?(list, &(is_binary(&1) or is_integer(&1))) do
{:ok, value}
else
{:error, "Token list must contain strings or integers"}
end
_ -> {:error, "Invalid token list format"}
end
end
end
3. Schema Validation Engine
defmodule ElixirML.Schema.Validator do
@moduledoc """
High-performance validation engine with support for
nested validation, custom validators, and error aggregation.
"""
alias ElixirML.Schema.Types
@doc """
Validate data against a schema with comprehensive error reporting.
"""
def validate(data, schema, opts \\ []) do
context = %{
path: [],
errors: [],
strict: Keyword.get(opts, :strict, true),
transform: Keyword.get(opts, :transform, true)
}
case validate_fields(data, schema.fields, context) do
%{errors: []} = result ->
validated_data = if context.transform do
apply_transforms(result.data, schema.transforms)
else
result.data
end
{:ok, validated_data}
%{errors: errors} ->
{:error, format_errors(errors)}
end
end
defp validate_fields(data, fields, context) do
Enum.reduce(fields, %{data: %{}, errors: context.errors}, fn
{field_name, field_type, field_opts}, acc ->
field_path = [field_name | context.path]
case Map.get(data, field_name) do
nil ->
if Keyword.get(field_opts, :required, false) do
error = %{
path: Enum.reverse(field_path),
message: "Required field missing",
value: nil
}
%{acc | errors: [error | acc.errors]}
else
default_value = Keyword.get(field_opts, :default)
%{acc | data: Map.put(acc.data, field_name, default_value)}
end
value ->
case validate_field_value(value, field_type, field_opts, field_path) do
{:ok, validated_value} ->
%{acc | data: Map.put(acc.data, field_name, validated_value)}
{:error, error} ->
%{acc | errors: [error | acc.errors]}
end
end
end)
end
defp validate_field_value(value, type, opts, path) do
with {:ok, type_validated} <- Types.validate_type(value, type),
{:ok, constraint_validated} <- validate_constraints(type_validated, opts) do
{:ok, constraint_validated}
else
{:error, message} ->
{:error, %{
path: Enum.reverse(path),
message: message,
value: value
}}
end
end
defp validate_constraints(value, opts) do
constraints = Keyword.get(opts, :constraints, [])
Enum.reduce_while(constraints, {:ok, value}, fn constraint, {:ok, val} ->
case apply_constraint(val, constraint) do
{:ok, new_val} -> {:cont, {:ok, new_val}}
{:error, _} = error -> {:halt, error}
end
end)
end
defp apply_constraint(value, {:min, min_val}) when is_number(value) do
if value >= min_val do
{:ok, value}
else
{:error, "Value #{value} is less than minimum #{min_val}"}
end
end
defp apply_constraint(value, {:max, max_val}) when is_number(value) do
if value <= max_val do
{:ok, value}
else
{:error, "Value #{value} is greater than maximum #{max_val}"}
end
end
defp apply_constraint(value, {:in, allowed_values}) do
if value in allowed_values do
{:ok, value}
else
{:error, "Value #{inspect(value)} not in allowed values #{inspect(allowed_values)}"}
end
end
defp apply_constraint(value, {:custom, validator_fn}) when is_function(validator_fn, 1) do
validator_fn.(value)
end
defp apply_constraint(value, _constraint), do: {:ok, value}
end
ML-Specific Schema Definitions
1. Program Schemas
defmodule ElixirML.Schemas.Program do
use ElixirML.Schema
defschema ProgramInput do
field :text, :string, required: true
field :context, :string, default: ""
field :metadata, :map, default: %{}
field :variables, :map, default: %{}
validation :text_not_empty do
field(:text) |> String.trim() |> String.length() > 0
end
transform :normalize_text do
update_field(:text, &String.trim/1)
end
end
defschema ProgramOutput do
field :result, :string, required: true
field :confidence, :confidence_score, required: true
field :reasoning, :string
field :metadata, :map, default: %{}
field :tokens_used, :integer, constraints: [min: 0]
field :latency_ms, :integer, constraints: [min: 0]
validation :confidence_valid do
field(:confidence) >= 0.0 and field(:confidence) <= 1.0
end
transform :ensure_result_format do
update_field(:result, &String.trim/1)
end
end
defschema ProgramConfiguration do
field :temperature, :float, default: 0.7, constraints: [min: 0.0, max: 2.0]
field :max_tokens, :integer, default: 1000, constraints: [min: 1, max: 8192]
field :model, :string, required: true
field :provider, :atom, required: true, constraints: [in: [:openai, :anthropic, :google]]
field :timeout_ms, :integer, default: 30_000, constraints: [min: 1000]
validation :provider_model_compatibility do
case {field(:provider), field(:model)} do
{:openai, model} when model in ["gpt-4", "gpt-3.5-turbo"] -> true
{:anthropic, model} when model in ["claude-3", "claude-2"] -> true
{:google, model} when model in ["gemini-pro", "gemini-1.5"] -> true
_ -> false
end
end
end
end
2. Variable System Schemas
defmodule ElixirML.Schemas.Variable do
use ElixirML.Schema
defschema VariableDefinition do
field :name, :atom, required: true
field :type, :atom, required: true, constraints: [in: [:float, :integer, :choice, :module]]
field :default, :union, types: [:string, :integer, :float, :atom, :module]
field :constraints, :map, default: %{}
field :description, :string
field :metadata, :map, default: %{}
validation :type_constraints_match do
case {field(:type), field(:constraints)} do
{:float, %{range: {min, max}}} when is_number(min) and is_number(max) and min < max -> true
{:integer, %{range: {min, max}}} when is_integer(min) and is_integer(max) and min < max -> true
{:choice, %{choices: choices}} when is_list(choices) and length(choices) > 0 -> true
{:module, %{modules: modules}} when is_list(modules) -> true
_ -> false
end
end
end
defschema VariableSpace do
field :variables, {:array, VariableDefinition}, required: true
field :dependencies, :map, default: %{}
field :constraints, {:array, :map}, default: []
field :metadata, :map, default: %{}
validation :no_circular_dependencies do
# Complex validation for dependency cycles
dependency_graph = field(:dependencies)
!has_cycles?(dependency_graph)
end
validation :unique_variable_names do
names = field(:variables) |> Enum.map(& &1.name)
length(names) == length(Enum.uniq(names))
end
end
defschema VariableConfiguration do
field :variable_values, :map, required: true
field :metadata, :map, default: %{}
field :performance_score, :float
field :cost_score, :float
field :latency_score, :float
validation :all_required_variables_present do
# Validate against associated variable space
true # Implementation depends on context
end
end
end
3. Optimization Schemas
defmodule ElixirML.Schemas.Optimization do
use ElixirML.Schema
defschema OptimizationRun do
field :id, :string, required: true
field :program_id, :string, required: true
field :variable_space, VariableSpace, required: true
field :training_data, {:array, :map}, required: true
field :configuration, :map, required: true
field :status, :atom, default: :pending, constraints: [in: [:pending, :running, :completed, :failed]]
field :started_at, :datetime
field :completed_at, :datetime
field :best_score, :float
field :iterations, :integer, default: 0
field :metadata, :map, default: %{}
validation :valid_time_range do
case {field(:started_at), field(:completed_at)} do
{nil, nil} -> true
{start, nil} -> not is_nil(start)
{start, finish} -> DateTime.compare(start, finish) in [:lt, :eq]
_ -> false
end
end
end
defschema OptimizationResult do
field :run_id, :string, required: true
field :best_configuration, VariableConfiguration, required: true
field :performance_metrics, :map, required: true
field :pareto_frontier, {:array, :map}, default: []
field :convergence_data, :map, default: %{}
field :metadata, :map, default: %{}
validation :performance_metrics_valid do
metrics = field(:performance_metrics)
required_keys = [:accuracy, :cost, :latency]
Enum.all?(required_keys, &Map.has_key?(metrics, &1))
end
end
end
Integration Architecture
1. Variable System Integration
defmodule ElixirML.Schema.VariableIntegration do
@moduledoc """
Seamless integration between the Schema Engine and Variable System.
Enables automatic schema generation from variable definitions.
"""
def generate_schema_from_variables(variables) do
fields = Enum.map(variables, fn variable ->
{variable.name, variable_type_to_schema_type(variable), variable_opts_to_schema_opts(variable)}
end)
%ElixirML.Schema.Runtime{
fields: fields,
validations: generate_variable_validations(variables),
transforms: generate_variable_transforms(variables)
}
end
defp variable_type_to_schema_type(%{type: :float}), do: :float
defp variable_type_to_schema_type(%{type: :integer}), do: :integer
defp variable_type_to_schema_type(%{type: :choice, constraints: %{choices: choices}}), do: {:enum, choices}
defp variable_type_to_schema_type(%{type: :module}), do: :atom
defp variable_opts_to_schema_opts(variable) do
opts = []
opts = if variable.default do
[{:default, variable.default} | opts]
else
opts
end
opts = case variable.constraints do
%{range: {min, max}} -> [{:constraints, [min: min, max: max]} | opts]
%{choices: choices} -> [{:constraints, [in: choices]} | opts]
_ -> opts
end
opts
end
defp generate_variable_validations(variables) do
Enum.flat_map(variables, fn variable ->
case variable.constraints do
%{custom_validation: validator} -> [{variable.name, validator}]
_ -> []
end
end)
end
defp generate_variable_transforms(variables) do
Enum.flat_map(variables, fn variable ->
case variable.metadata[:transform] do
nil -> []
transform_fn -> [{variable.name, transform_fn}]
end
end)
end
end
2. Resource Framework Integration
defmodule ElixirML.Schema.ResourceIntegration do
@moduledoc """
Integration with the Resource Framework for automatic
schema validation and transformation in resource operations.
"""
defmacro __using__(opts) do
quote do
use Ash.Resource, unquote(opts)
# Add schema validation to resource actions
import ElixirML.Schema.ResourceIntegration
# Schema-aware attribute definition
defmacro schema_attribute(name, schema_module, opts \\ []) do
quote do
attribute unquote(name), :map, unquote(opts)
# Add validation for schema compliance
validate unquote(:"validate_#{name}_schema") do
ElixirML.Schema.ResourceIntegration.validate_attribute_schema(
unquote(name),
unquote(schema_module)
)
end
end
end
end
end
def validate_attribute_schema(attribute_name, schema_module) do
fn changeset ->
case Ash.Changeset.get_attribute(changeset, attribute_name) do
nil -> changeset
value ->
case schema_module.validate(value) do
{:ok, validated_value} ->
Ash.Changeset.change_attribute(changeset, attribute_name, validated_value)
{:error, errors} ->
Ash.Changeset.add_error(changeset, field: attribute_name, message: format_schema_errors(errors))
end
end
end
end
defp format_schema_errors(errors) when is_list(errors) do
errors
|> Enum.map(fn error -> "#{Enum.join(error.path, ".")}: #{error.message}" end)
|> Enum.join(", ")
end
end
Performance Optimization
1. Compile-Time Optimization
defmodule ElixirML.Schema.Compiler do
@moduledoc """
Compile-time optimization for schema validation.
Generates optimized validation functions based on schema structure.
"""
defmacro __before_compile__(env) do
schema_fields = Module.get_attribute(env.module, :fields, [])
schema_validations = Module.get_attribute(env.module, :validations, [])
schema_transforms = Module.get_attribute(env.module, :transforms, [])
# Generate optimized validation function
validation_ast = generate_validation_function(schema_fields, schema_validations)
# Generate optimized transformation function
transform_ast = generate_transform_function(schema_transforms)
# Generate schema metadata
metadata_ast = generate_metadata_function(schema_fields, schema_validations, schema_transforms)
quote do
unquote(validation_ast)
unquote(transform_ast)
unquote(metadata_ast)
def validate(data, opts \\ []) do
__validate__(data, opts)
end
def transform(data, opts \\ []) do
__transform__(data, opts)
end
def metadata() do
__metadata__()
end
end
end
defp generate_validation_function(fields, validations) do
# Complex AST generation for optimized validation
field_validations = Enum.map(fields, fn {name, type, opts} ->
generate_field_validation_ast(name, type, opts)
end)
custom_validations = Enum.map(validations, fn {name, validation_block} ->
generate_custom_validation_ast(name, validation_block)
end)
quote do
def __validate__(data, opts) do
with unquote_splicing(field_validations),
unquote_splicing(custom_validations) do
{:ok, data}
else
{:error, _} = error -> error
end
end
end
end
defp generate_field_validation_ast(name, type, opts) do
quote do
{:ok, _} <- validate_field(Map.get(data, unquote(name)), unquote(type), unquote(opts))
end
end
defp generate_custom_validation_ast(name, validation_block) do
quote do
{:ok, _} <- (fn -> unquote(validation_block) end).()
end
end
defp generate_transform_function(transforms) do
transform_steps = Enum.map(transforms, fn {name, transform_block} ->
quote do
data = unquote(transform_block).(data)
end
end)
quote do
def __transform__(data, _opts) do
unquote_splicing(transform_steps)
{:ok, data}
end
end
end
defp generate_metadata_function(fields, validations, transforms) do
quote do
def __metadata__() do
%{
fields: unquote(Macro.escape(fields)),
validations: unquote(Macro.escape(validations)),
transforms: unquote(Macro.escape(transforms)),
compiled_at: unquote(DateTime.utc_now())
}
end
end
end
end
2. Runtime Caching
defmodule ElixirML.Schema.Cache do
@moduledoc """
High-performance caching system for schema validation results.
Uses ETS tables for fast lookup and LRU eviction.
"""
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
table_name = Keyword.get(opts, :table_name, :schema_cache)
max_size = Keyword.get(opts, :max_size, 10_000)
:ets.new(table_name, [:named_table, :public, :set])
{:ok, %{
table: table_name,
max_size: max_size,
current_size: 0,
access_order: :queue.new()
}}
end
def get_cached_validation(schema_module, data_hash) do
case :ets.lookup(:schema_cache, {schema_module, data_hash}) do
[{_, result, _timestamp}] ->
# Update access time
GenServer.cast(__MODULE__, {:access, schema_module, data_hash})
{:hit, result}
[] ->
:miss
end
end
def cache_validation_result(schema_module, data_hash, result) do
GenServer.cast(__MODULE__, {:cache, schema_module, data_hash, result})
end
def handle_cast({:access, schema_module, data_hash}, state) do
# Update access time for LRU
:ets.update_element(:schema_cache, {schema_module, data_hash}, {3, System.monotonic_time()})
{:noreply, state}
end
def handle_cast({:cache, schema_module, data_hash, result}, state) do
key = {schema_module, data_hash}
timestamp = System.monotonic_time()
case :ets.insert_new(:schema_cache, {key, result, timestamp}) do
true ->
# New entry added
new_size = state.current_size + 1
new_queue = :queue.in(key, state.access_order)
# Check if we need to evict
if new_size > state.max_size do
{{:value, oldest_key}, updated_queue} = :queue.out(new_queue)
:ets.delete(:schema_cache, oldest_key)
{:noreply, %{state | current_size: state.max_size, access_order: updated_queue}}
else
{:noreply, %{state | current_size: new_size, access_order: new_queue}}
end
false ->
# Key already exists, update value
:ets.insert(:schema_cache, {key, result, timestamp})
{:noreply, state}
end
end
end
Development Tools
1. Schema Visualization
defmodule ElixirML.Schema.Visualizer do
@moduledoc """
Tools for visualizing schema structure and relationships.
Generates GraphViz diagrams and interactive documentation.
"""
def generate_schema_diagram(schema_module, opts \\ []) do
metadata = schema_module.metadata()
nodes = generate_field_nodes(metadata.fields)
edges = generate_validation_edges(metadata.validations)
graphviz_content = """
digraph schema {
rankdir=TB;
node [shape=box, style=rounded];
#{nodes}
#{edges}
}
"""
output_file = Keyword.get(opts, :output, "schema_diagram.dot")
File.write!(output_file, graphviz_content)
if Keyword.get(opts, :render, false) do
System.cmd("dot", ["-Tpng", output_file, "-o", String.replace(output_file, ".dot", ".png")])
end
end
defp generate_field_nodes(fields) do
fields
|> Enum.map(fn {name, type, opts} ->
label = "#{name}\\n#{format_type(type)}"
color = if Keyword.get(opts, :required, false), do: "red", else: "blue"
" #{name} [label=\"#{label}\", color=#{color}];"
end)
|> Enum.join("\n")
end
defp generate_validation_edges(validations) do
# Generate edges showing validation dependencies
validations
|> Enum.flat_map(fn {name, _block} ->
# Parse validation block to find field dependencies
# This is a simplified version
[" #{name} -> validation_#{name};"]
end)
|> Enum.join("\n")
end
defp format_type(type) when is_atom(type), do: to_string(type)
defp format_type({:array, inner_type}), do: "array(#{format_type(inner_type)})"
defp format_type({:union, types}), do: "union(#{Enum.map(types, &format_type/1) |> Enum.join(", ")})"
defp format_type(type), do: inspect(type)
end
2. Testing Framework
defmodule ElixirML.Schema.TestHelpers do
@moduledoc """
Testing utilities for schema validation and transformation.
Provides property-based testing and comprehensive test data generation.
"""
import StreamData
def schema_property_test(schema_module, opts \\ []) do
iterations = Keyword.get(opts, :iterations, 100)
property_test = property([valid_data <- generate_valid_data(schema_module)], do: (
assert {:ok, _} = schema_module.validate(valid_data)
))
StreamData.check_all(property_test, iterations: iterations)
end
def generate_valid_data(schema_module) do
metadata = schema_module.metadata()
metadata.fields
|> Enum.reduce(%{}, fn {field_name, field_type, field_opts}, acc ->
field_generator = generate_field_data(field_type, field_opts)
Map.put(acc, field_name, field_generator)
end)
|> fixed_map()
end
defp generate_field_data(:string, opts) do
min_length = get_in(opts, [:constraints, :min_length]) || 0
max_length = get_in(opts, [:constraints, :max_length]) || 100
string(:alphanumeric, min_length: min_length, max_length: max_length)
end
defp generate_field_data(:integer, opts) do
min_val = get_in(opts, [:constraints, :min]) || 0
max_val = get_in(opts, [:constraints, :max]) || 1000
integer(min_val..max_val)
end
defp generate_field_data(:float, opts) do
min_val = get_in(opts, [:constraints, :min]) || 0.0
max_val = get_in(opts, [:constraints, :max]) || 1.0
float(min: min_val, max: max_val)
end
defp generate_field_data({:enum, choices}, _opts) do
member_of(choices)
end
defp generate_field_data({:array, inner_type}, opts) do
min_length = get_in(opts, [:constraints, :min_length]) || 0
max_length = get_in(opts, [:constraints, :max_length]) || 10
list_of(generate_field_data(inner_type, []), min_length: min_length, max_length: max_length)
end
defp generate_field_data(type, _opts) do
# Fallback for unknown types
constant("default_#{type}")
end
end
Implementation Roadmap
Week 1: Core Schema System
- Basic schema definition DSL
- Core validation engine
- Basic type system
- Runtime schema creation
Week 2: Advanced Features
- Complex type validation
- Transformation pipeline
- Performance optimization
- Caching system
Week 3: Integration Layer
- Variable System integration
- Resource Framework integration
- Program schema binding
- Adapter validation
Week 4: Development Tools & Testing
- Schema visualization tools
- Testing framework
- Performance profiling
- Documentation generation
Success Metrics
Functional Requirements
- 100% schema validation accuracy
- Sub-millisecond validation for simple schemas
- Support for complex nested structures
- Seamless Variable System integration
Performance Requirements
- <1ms validation time for typical schemas
- <100MB memory usage for 10,000 cached schemas
- 95%+ cache hit rate in production workloads
- Zero-copy transformation where possible
Integration Requirements
- Full Variable System compatibility
- Resource Framework integration
- Backward compatibility with existing code
- Extensible type system
This Schema Engine design provides the foundational data validation and transformation layer for ElixirML/DSPEx, enabling type-safe, high-performance ML workflows with comprehensive validation and automatic optimization capabilities.