← Back to Docs

20250719 ash first class citizen in dspex

Documentation for 20250719_ash_first_class_citizen_in_dspex from the Dspex repository.

Ash as a First-Class Citizen in DSPex: A Comprehensive Integration Strategy

Executive Summary

This document outlines how Ash Framework will be integrated as a first-class citizen in DSPex V2, transforming it from a simple Python bridge into a production-grade cognitive orchestration platform. By leveraging Ash’s declarative resource modeling, action-oriented architecture, and extensive extension system, DSPex can achieve unprecedented levels of maintainability, observability, and developer experience while maintaining full compatibility with Python DSPy’s capabilities.

Table of Contents

  1. Vision: Why Ash for DSPex
  2. Core Integration Architecture
  3. DSPex Resources as Ash Resources
  4. Custom Data Layer for Python Bridge
  5. Ash.Reactor for Cognitive Orchestration
  6. Extension System for DSPy Concepts
  7. Production Features Integration
  8. Implementation Roadmap
  9. Code Examples
  10. Performance Implications
  11. Migration Strategy

Vision: Why Ash for DSPex

The Perfect Match

Ash’s philosophy of declarative, resource-oriented programming aligns perfectly with DSPex’s cognitive orchestration goals:

  1. Resources = Cognitive Components: Signatures, Modules, Programs map naturally to Ash resources
  2. Actions = Cognitive Operations: Each DSPy operation becomes a well-defined Ash action
  3. Domains = Cognitive Domains: Group related cognitive capabilities into logical domains
  4. Extensions = DSPy Integration: Ash’s extension system enables seamless DSPy integration

Benefits for DSPex

  • Production Readiness: Built-in monitoring, security, and error handling
  • Developer Experience: Consistent APIs, automatic documentation, GraphQL/REST generation
  • Maintainability: Clear separation of concerns, compile-time validations
  • Observability: Every cognitive operation is tracked and auditable
  • Extensibility: Easy to add new cognitive patterns and optimizers

Core Integration Architecture

Layered Approach

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│          DSPex Public API               │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│        Ash Resource Layer               │
│  (Signatures, Modules, Programs)        │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│     Ash Actions & Manual Actions        │
│   (Native Elixir + Python Bridge)       │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│        Custom Data Layers               │
│    (Memory, Postgres, Python)           │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│         Snakepit Integration            │
│      (Python Process Management)        │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Key Integration Points

  1. Resource Definitions: All DSPex concepts become Ash resources
  2. Action System: Leverage Ash actions for all operations
  3. Data Layer: Custom data layer for Python bridge
  4. Reactor: Complex orchestration via Ash.Reactor
  5. Extensions: DSPy-specific DSL and behaviors

DSPex Resources as Ash Resources

Core Resources

1. Signature Resource

defmodule DSPex.Cognitive.Signature do
  use Ash.Resource,
    domain: DSPex.Cognitive,
    data_layer: DSPex.DataLayer.Memory,
    extensions: [DSPex.Extensions.Signature]

  attributes do
    uuid_primary_key :id
    
    attribute :name, :string, allow_nil?: false
    attribute :spec, :string, allow_nil?: false
    attribute :parsed_spec, :map, allow_nil?: false
    attribute :input_fields, {:array, :map}, default: []
    attribute :output_fields, {:array, :map}, default: []
    attribute :metadata, :map, default: %{}
    
    timestamps()
  end

  actions do
    defaults [:read, :destroy]
    
    create :parse do
      argument :spec, :string, allow_nil?: false
      
      change DSPex.Changes.ParseSignature
      change DSPex.Changes.ValidateSignature
    end
    
    update :optimize do
      argument :optimization_results, :map
      
      change DSPex.Changes.ApplyOptimization
    end
    
    read :by_name do
      argument :name, :string, allow_nil?: false
      filter expr(name == ^arg(:name))
    end
  end
  
  calculations do
    calculate :complexity, :integer, DSPex.Calculations.SignatureComplexity
    calculate :usage_stats, :map, DSPex.Calculations.UsageStatistics
  end
  
  code_interface do
    define :parse, args: [:spec]
    define :by_name, args: [:name]
    define :optimize, args: [:optimization_results]
  end
end

2. Module Resource

defmodule DSPex.Cognitive.Module do
  use Ash.Resource,
    domain: DSPex.Cognitive,
    data_layer: DSPex.DataLayer.Hybrid,
    extensions: [DSPex.Extensions.CognitiveModule]

  attributes do
    uuid_primary_key :id
    
    attribute :name, :string, allow_nil?: false
    attribute :type, :atom, 
      constraints: [one_of: [:predict, :retrieve, :optimize, :custom]],
      allow_nil?: false
    attribute :implementation, :atom,
      constraints: [one_of: [:native, :python, :hybrid]],
      default: :python
    attribute :config, :map, default: %{}
    attribute :performance_metrics, :map, default: %{}
    
    timestamps()
  end
  
  relationships do
    belongs_to :signature, DSPex.Cognitive.Signature
    has_many :executions, DSPex.Cognitive.Execution
  end
  
  actions do
    defaults [:read]
    
    create :register do
      argument :module_spec, :map, allow_nil?: false
      
      change DSPex.Changes.ValidateModule
      change DSPex.Changes.RegisterWithRouter
    end
    
    update :update_metrics do
      argument :new_metrics, :map
      
      change DSPex.Changes.MergeMetrics
    end
    
    # Manual action for execution
    action :execute, :map do
      argument :input, :map, allow_nil?: false
      argument :context, :map, default: %{}
      
      run DSPex.Actions.ExecuteModule
    end
  end
  
  state_machine do
    initial_states [:uninitialized]
    default_initial_state :uninitialized
    
    state :uninitialized
    state :initializing
    state :ready
    state :executing  
    state :error
    
    transition :initialize, from: :uninitialized, to: :initializing
    transition :mark_ready, from: :initializing, to: :ready
    transition :start_execution, from: :ready, to: :executing
    transition :complete_execution, from: :executing, to: :ready
    transition :fail, from: [:initializing, :executing], to: :error
    transition :reset, from: :error, to: :uninitialized
  end
end

3. Program Resource

defmodule DSPex.Cognitive.Program do
  use Ash.Resource,
    domain: DSPex.Cognitive,
    data_layer: AshPostgres.DataLayer,
    extensions: [DSPex.Extensions.Program]

  postgres do
    table "cognitive_programs"
    repo DSPex.Repo
  end

  attributes do
    uuid_primary_key :id
    
    attribute :name, :string, allow_nil?: false
    attribute :description, :string
    attribute :pipeline, {:array, :map}, default: []
    attribute :variables, :map, default: %{}
    attribute :optimization_history, {:array, :map}, default: []
    attribute :status, :atom,
      constraints: [one_of: [:draft, :active, :optimizing, :archived]],
      default: :draft
    
    timestamps()
  end
  
  relationships do
    has_many :modules, DSPex.Cognitive.Module
    has_many :executions, DSPex.Cognitive.Execution
    belongs_to :optimizer, DSPex.Cognitive.Optimizer
  end
  
  actions do
    defaults [:read, :update, :destroy]
    
    create :compose do
      argument :modules, {:array, :uuid}, allow_nil?: false
      argument :pipeline_spec, :map
      
      change DSPex.Changes.ComposePipeline
      change DSPex.Changes.ValidatePipeline
    end
    
    update :optimize do
      accept []
      
      change DSPex.Changes.StartOptimization
      change set_attribute(:status, :optimizing)
    end
    
    action :run, :map do
      argument :input, :map, allow_nil?: false
      argument :options, :map, default: %{}
      
      run DSPex.Actions.RunProgram
    end
  end
  
  policies do
    policy action_type(:read) do
      authorize_if expr(status != :archived)
    end
    
    policy action(:optimize) do
      authorize_if expr(status == :active)
    end
  end
end

4. Execution Resource

defmodule DSPex.Cognitive.Execution do
  use Ash.Resource,
    domain: DSPex.Cognitive,
    data_layer: AshPostgres.DataLayer

  postgres do
    table "cognitive_executions"
    repo DSPex.Repo
  end

  attributes do
    uuid_primary_key :id
    
    attribute :input, :map, allow_nil?: false
    attribute :output, :map
    attribute :trace, {:array, :map}, default: []
    attribute :metrics, :map, default: %{}
    attribute :duration_ms, :integer
    attribute :status, :atom,
      constraints: [one_of: [:pending, :running, :completed, :failed]],
      default: :pending
    attribute :error, :map
    
    timestamps()
  end
  
  relationships do
    belongs_to :program, DSPex.Cognitive.Program
    belongs_to :module, DSPex.Cognitive.Module
    belongs_to :parent_execution, DSPex.Cognitive.Execution
    has_many :child_executions, DSPex.Cognitive.Execution,
      destination_attribute: :parent_execution_id
  end
  
  calculations do
    calculate :success_rate, :float, DSPex.Calculations.SuccessRate
    calculate :avg_latency, :integer, DSPex.Calculations.AverageLatency
  end
end

Supporting Resources

5. Optimizer Resource

defmodule DSPex.Cognitive.Optimizer do
  use Ash.Resource,
    domain: DSPex.Cognitive,
    extensions: [DSPex.Extensions.Optimizer]

  attributes do
    uuid_primary_key :id
    
    attribute :name, :string, allow_nil?: false
    attribute :type, :atom,
      constraints: [one_of: [:simba, :beacon, :bootstrap, :mipro, :custom]]
    attribute :config, :map, default: %{}
    attribute :state, :map, default: %{}
  end
  
  actions do
    action :optimize, :map do
      argument :program_id, :uuid, allow_nil?: false
      argument :training_data, {:array, :map}
      argument :validation_data, {:array, :map}
      
      run DSPex.Actions.RunOptimizer
    end
  end
end

Custom Data Layer for Python Bridge

Hybrid Data Layer Implementation

defmodule DSPex.DataLayer.Hybrid do
  @behaviour Ash.DataLayer
  
  use Spark.Dsl.Extension,
    sections: [@data_layer],
    transformers: [DSPex.DataLayer.Transformers.SetupHybrid]
  
  @impl true
  def can?(resource, :read), do: true
  def can?(resource, :create), do: true
  def can?(resource, :update), do: true
  def can?(resource, :destroy), do: true
  def can?(resource, :sort), do: true
  def can?(resource, :filter), do: true
  def can?(resource, :limit), do: true
  def can?(resource, :offset), do: true
  def can?(resource, :multitenancy), do: false
  def can?(resource, :aggregate), do: true
  def can?(resource, :calculate), do: true
  def can?(_, _), do: false
  
  @impl true
  def run_query(query, resource, parent \\ nil) do
    case query.action.type do
      :read -> handle_read(query, resource)
      :create -> handle_create(query, resource)
      :update -> handle_update(query, resource)
      :destroy -> handle_destroy(query, resource)
    end
  end
  
  defp handle_read(query, resource) do
    # Route to appropriate backend based on implementation type
    case get_implementation_type(resource) do
      :native -> DSPex.DataLayer.Memory.run_query(query, resource)
      :python -> DSPex.DataLayer.Python.run_query(query, resource)
      :hybrid -> merge_results(query, resource)
    end
  end
  
  defp handle_create(query, resource) do
    # Store metadata in Elixir, delegate execution to Python if needed
    with {:ok, record} <- DSPex.DataLayer.Memory.run_query(query, resource),
         {:ok, _} <- maybe_register_with_python(record, resource) do
      {:ok, [record]}
    end
  end
  
  defp merge_results(query, resource) do
    # Merge results from multiple backends
    with {:ok, native_results} <- DSPex.DataLayer.Memory.run_query(query, resource),
         {:ok, python_results} <- DSPex.DataLayer.Python.run_query(query, resource) do
      merged = Enum.uniq_by(native_results ++ python_results, & &1.id)
      {:ok, merged}
    end
  end
end

Python Data Layer

defmodule DSPex.DataLayer.Python do
  @behaviour Ash.DataLayer
  
  alias DSPex.Python.Bridge
  
  @impl true
  def run_query(query, resource, _parent \\ nil) do
    request = build_python_request(query, resource)
    
    case Bridge.call_python(:query, request) do
      {:ok, results} -> 
        records = Enum.map(results, &to_ash_record(&1, resource))
        {:ok, records}
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp build_python_request(query, resource) do
    %{
      resource: resource_to_python_name(resource),
      action: query.action.name,
      filters: compile_filters(query.filter),
      sorts: query.sort,
      limit: query.limit,
      offset: query.offset,
      arguments: query.arguments
    }
  end
  
  defp to_ash_record(python_data, resource) do
    struct(resource, normalize_data(python_data))
  end
end

Ash.Reactor for Cognitive Orchestration

Complex Pipeline Definition

defmodule DSPex.Reactors.CognitivePipeline do
  use Ash.Reactor
  
  input :query
  input :context
  
  # Step 1: Parse and validate input
  ash_step :parse_input do
    resource DSPex.Cognitive.Signature
    action :parse
    
    inputs %{
      spec: "query: str -> parsed_query: dict"
    }
  end
  
  # Step 2: Parallel retrieval from multiple sources
  group :retrieval, type: :async do
    ash_step :vector_search do
      resource DSPex.Cognitive.Module
      action :execute
      
      inputs %{
        input: result(:parse_input),
        module_name: "vector_retriever"
      }
    end
    
    ash_step :keyword_search do
      resource DSPex.Cognitive.Module  
      action :execute
      
      inputs %{
        input: result(:parse_input),
        module_name: "keyword_retriever"
      }
    end
    
    python_step :colbert_search do
      module "dspy.ColBERTv2"
      
      inputs %{
        query: result(:parse_input),
        k: 10
      }
    end
  end
  
  # Step 3: Merge and rank results
  step :merge_results do
    run fn inputs, _context ->
      results = Map.values(inputs.retrieval)
      {:ok, DSPex.Utils.merge_and_rank(results)}
    end
  end
  
  # Step 4: Generate response
  ash_step :generate_response do
    resource DSPex.Cognitive.Module
    action :execute
    
    inputs %{
      module_name: "chain_of_thought",
      input: %{
        query: input(:query),
        context: result(:merge_results)
      }
    }
    
    # Compensation if this fails
    compensate :fallback_response
  end
  
  # Fallback compensation
  step :fallback_response do
    run fn _inputs, _context ->
      {:ok, %{response: "I apologize, but I'm unable to process this request at the moment."}}
    end
  end
  
  return :generate_response
end

Usage in Programs

defmodule DSPex.Actions.RunProgram do
  use Ash.Resource.ManualAction
  
  def run(input, opts, context) do
    program = context.resource
    
    # Build reactor from program pipeline
    reactor = build_reactor(program.pipeline)
    
    # Execute with monitoring
    case Reactor.run(reactor, input.arguments, context: build_context(opts)) do
      {:ok, result} ->
        # Record execution
        {:ok, execution} = DSPex.Cognitive.Execution.create(%{
          program_id: program.id,
          input: input.arguments.input,
          output: result,
          status: :completed,
          metrics: collect_metrics(reactor)
        })
        
        {:ok, result}
        
      {:error, reason} ->
        # Record failure
        DSPex.Cognitive.Execution.create(%{
          program_id: program.id,
          input: input.arguments.input,
          status: :failed,
          error: %{reason: inspect(reason)}
        })
        
        {:error, reason}
    end
  end
  
  defp build_reactor(pipeline_spec) do
    # Dynamic reactor construction from pipeline specification
    DSPex.Reactors.Builder.build(pipeline_spec)
  end
end

Extension System for DSPy Concepts

Signature Extension

defmodule DSPex.Extensions.Signature do
  use Spark.Dsl.Extension,
    sections: [
      %Spark.Dsl.Section{
        name: :signature,
        describe: "Configure DSPy signature behavior",
        schema: [
          parser: [
            type: :atom,
            default: DSPex.Signature.Parser.Enhanced,
            doc: "Parser module for signature specs"
          ],
          strict_mode: [
            type: :boolean,
            default: true,
            doc: "Enforce strict type checking"
          ],
          ml_types: [
            type: {:list, :atom},
            default: [:embedding, :probability, :tensor],
            doc: "Additional ML-specific types to support"
          ]
        ],
        entities: [
          %Spark.Dsl.Entity{
            name: :field_type,
            describe: "Define custom field types",
            target: DSPex.Extensions.Signature.FieldType,
            schema: [
              name: [type: :atom, required: true],
              validator: [type: :mfa, required: true],
              converter: [type: :mfa, required: true]
            ]
          }
        ]
      }
    ],
    transformers: [
      DSPex.Extensions.Signature.Transformer
    ]
  
  def parser(resource) do
    Spark.Dsl.Extension.get_opt(resource, [:signature, :parser], DSPex.Signature.Parser.Enhanced)
  end
  
  def field_types(resource) do
    resource
    |> Spark.Dsl.Extension.get_entities([:signature, :field_type])
    |> Enum.map(&{&1.name, &1})
    |> Map.new()
  end
end

Module Extension

defmodule DSPex.Extensions.CognitiveModule do
  use Spark.Dsl.Extension,
    sections: [
      %Spark.Dsl.Section{
        name: :cognitive,
        describe: "Configure cognitive module behavior",
        schema: [
          python_module: [
            type: :string,
            doc: "Python DSPy module name"
          ],
          native_module: [
            type: :atom,
            doc: "Native Elixir implementation module"
          ],
          routing_strategy: [
            type: {:in, [:native_first, :python_first, :performance_based]},
            default: :performance_based,
            doc: "How to route between implementations"
          ],
          cache_ttl: [
            type: :pos_integer,
            default: 300,
            doc: "Cache TTL in seconds"
          ]
        ],
        entities: [
          %Spark.Dsl.Entity{
            name: :optimization_hint,
            describe: "Hints for optimizer",
            target: DSPex.Extensions.CognitiveModule.OptimizationHint,
            schema: [
              parameter: [type: :string, required: true],
              range: [type: {:or, [{:tuple, [:float, :float]}, {:list, :any}]}],
              strategy: [type: :atom]
            ]
          }
        ]
      }
    ]
end

Production Features Integration

1. Monitoring and Telemetry

defmodule DSPex.Telemetry do
  use Supervisor
  
  def start_link(arg) do
    Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end
  
  def init(_arg) do
    children = [
      {Telemetry.Metrics.ConsoleReporter, metrics: metrics()},
      {DSPex.Telemetry.LLMReporter, []}
    ]
    
    Supervisor.init(children, strategy: :one_for_one)
  end
  
  def metrics do
    [
      # Execution metrics
      counter("dspex.execution.count"),
      summary("dspex.execution.duration", unit: {:native, :millisecond}),
      distribution("dspex.execution.token_count"),
      
      # Module metrics
      counter("dspex.module.execute.count", tags: [:module_name, :implementation]),
      summary("dspex.module.execute.duration", tags: [:module_name]),
      
      # Python bridge metrics
      counter("dspex.python.call.count"),
      summary("dspex.python.call.duration"),
      counter("dspex.python.error.count"),
      
      # Cache metrics
      counter("dspex.cache.hit"),
      counter("dspex.cache.miss"),
      
      # LLM metrics
      counter("dspex.llm.request.count", tags: [:provider]),
      summary("dspex.llm.request.duration", tags: [:provider]),
      distribution("dspex.llm.request.tokens", tags: [:provider, :type])
    ]
  end
end

2. Authorization Policies

defmodule DSPex.Policies do
  defmodule AdminOnly do
    use Ash.Policy.SimpleCheck
    
    def match?(_actor, %{action: %{type: type}}, _context) 
        when type in [:create, :update, :destroy] do
      {:ok, false} # Override in production
    end
    
    def match?(_, _, _), do: {:ok, true}
  end
  
  defmodule RateLimiter do
    use Ash.Policy.SimpleCheck
    
    def match?(actor, %{resource: resource, action: action}, _context) do
      key = "#{actor.id}:#{resource}:#{action.name}"
      
      case Hammer.check_rate(key, 60_000, 100) do
        {:allow, _count} -> {:ok, true}
        {:deny, _limit} -> {:ok, false}
      end
    end
  end
end

3. Error Handling and Recovery

defmodule DSPex.ErrorHandler do
  def handle_error(%Ash.Error.Invalid{} = error, context) do
    # Log validation errors
    Logger.warning("Validation error in #{context}", error: error)
    
    # Return user-friendly error
    {:error, format_validation_error(error)}
  end
  
  def handle_error(%DSPex.Python.BridgeError{} = error, context) do
    # Log Python bridge errors
    Logger.error("Python bridge error in #{context}", error: error)
    
    # Attempt fallback to native implementation
    case fallback_to_native(context) do
      {:ok, result} -> {:ok, result}
      _ -> {:error, "Service temporarily unavailable"}
    end
  end
  
  def handle_error(error, context) do
    # Log unexpected errors
    Logger.error("Unexpected error in #{context}", error: error)
    
    # Report to error tracking
    Sentry.capture_exception(error, extra: context)
    
    {:error, "An unexpected error occurred"}
  end
end

4. Background Jobs with Oban

defmodule DSPex.Workers.OptimizationWorker do
  use Oban.Worker, queue: :optimization, max_attempts: 3
  
  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"program_id" => program_id, "config" => config}}) do
    with {:ok, program} <- DSPex.Cognitive.Program.get(program_id),
         {:ok, optimizer} <- DSPex.Cognitive.Optimizer.get(program.optimizer_id),
         {:ok, results} <- run_optimization(program, optimizer, config) do
      
      # Update program with results
      DSPex.Cognitive.Program.update(program, %{
        optimization_history: [results | program.optimization_history],
        variables: merge_optimized_variables(program.variables, results),
        status: :active
      })
    else
      error ->
        Logger.error("Optimization failed for program #{program_id}", error: error)
        {:error, error}
    end
  end
  
  defp run_optimization(program, optimizer, config) do
    # Long-running optimization process
    DSPex.Optimizers.run(optimizer.type, program, config)
  end
end

Implementation Roadmap

Phase 1: Foundation (Week 1)

  1. Set up Ash in DSPex project
  2. Define core resources (Signature, Module, Program)
  3. Implement memory data layer
  4. Basic CRUD operations

Phase 2: Python Integration (Week 2)

  1. Implement hybrid data layer
  2. Create Python bridge data layer
  3. Manual actions for module execution
  4. Integration tests

Phase 3: Orchestration (Week 3)

  1. Implement Ash.Reactor pipelines
  2. State machine for execution tracking
  3. Error handling and compensation
  4. Performance monitoring

Phase 4: Production Features (Week 4)

  1. Add authorization policies
  2. Implement caching layer
  3. Background job processing
  4. GraphQL/REST API generation

Phase 5: Advanced Features (Week 5+)

  1. Custom DSL extensions
  2. Optimizer integration
  3. Advanced telemetry
  4. Distributed execution

Code Examples

Using DSPex with Ash

# Create a new signature
{:ok, signature} = DSPex.Cognitive.Signature.parse(%{
  spec: "question: str, context: list[str] -> answer: str, confidence: float"
})

# Register a new module
{:ok, module} = DSPex.Cognitive.Module.register(%{
  module_spec: %{
    name: "rag_qa",
    type: :predict,
    signature_id: signature.id,
    implementation: :hybrid,
    config: %{
      retriever: "colbert_v2",
      generator: "gpt4"
    }
  }
})

# Create a program
{:ok, program} = DSPex.Cognitive.Program.compose(%{
  modules: [module.id],
  pipeline_spec: %{
    steps: [
      %{type: :retrieve, module: "colbert_v2", k: 10},
      %{type: :predict, module: "rag_qa"}
    ]
  }
})

# Run the program
{:ok, result} = DSPex.Cognitive.Program.run(program, %{
  input: %{
    question: "What is Ash Framework?",
    context: []
  }
})

# Query executions
executions = DSPex.Cognitive.Execution
  |> Ash.Query.filter(program_id == ^program.id)
  |> Ash.Query.filter(status == :completed)
  |> Ash.Query.sort(inserted_at: :desc)
  |> Ash.Query.limit(10)
  |> DSPex.Cognitive.read!()

GraphQL API (Auto-generated)

query GetProgram($id: ID!) {
  getProgram(id: $id) {
    id
    name
    status
    modules {
      id
      name
      type
      performanceMetrics
    }
    executions(first: 10, filter: { status: { eq: COMPLETED } }) {
      edges {
        node {
          id
          input
          output
          duration
          metrics
        }
      }
    }
  }
}

mutation RunProgram($programId: ID!, $input: JSON!) {
  runProgram(programId: $programId, input: $input) {
    result {
      output
      trace
      metrics
    }
    errors {
      field
      message
    }
  }
}

Performance Implications

Benefits

  1. Compile-time Optimizations: Ash transformers optimize at compile time
  2. Efficient Queries: Ash query engine optimizes across data layers
  3. Caching: Built-in support for result caching
  4. Batch Operations: Automatic batching of operations
  5. Connection Pooling: Managed by data layers

Considerations

  1. Memory Overhead: Ash metadata adds some memory overhead
  2. Compilation Time: More complex compile-time due to transformers
  3. Learning Curve: Developers need to understand Ash concepts

Benchmarks (Projected)

Operation                  | Without Ash | With Ash | Difference
--------------------------|-------------|----------|------------
Simple Module Execute     | 50ms        | 55ms     | +10%
Complex Pipeline          | 500ms       | 480ms    | -4%
Batch Operations (100)    | 5000ms      | 2000ms   | -60%
With Caching (2nd call)   | 50ms        | 5ms      | -90%
GraphQL Query             | N/A         | 10ms     | N/A
Authorization Check       | 5ms         | 1ms      | -80%

Migration Strategy

From Current DSPex to Ash-based DSPex

  1. Parallel Implementation: Build Ash layer alongside existing code
  2. Gradual Migration: Migrate one module at a time
  3. Compatibility Layer: Maintain backward compatibility
  4. Feature Parity: Ensure all features work before switching
  5. Performance Testing: Validate performance improvements
  6. Documentation: Update all documentation and examples

Migration Code Example

defmodule DSPex.Migration.V2Adapter do
  @moduledoc """
  Compatibility layer for migrating from DSPex v1 to v2 (Ash-based)
  """
  
  # Old API
  def create_signature(spec) do
    # Delegate to new Ash-based API
    case DSPex.Cognitive.Signature.parse(%{spec: spec}) do
      {:ok, signature} -> {:ok, signature.parsed_spec}
      error -> error
    end
  end
  
  # Old API
  def run_module(module_name, input, opts \\ []) do
    # Find module and execute via Ash
    with {:ok, module} <- find_module(module_name),
         {:ok, result} <- DSPex.Cognitive.Module.execute(module, %{
           input: input,
           context: Keyword.get(opts, :context, %{})
         }) do
      {:ok, result}
    end
  end
  
  defp find_module(name) do
    DSPex.Cognitive.Module
    |> Ash.Query.filter(name == ^name)
    |> Ash.Query.limit(1)
    |> DSPex.Cognitive.read()
    |> case do
      {:ok, [module]} -> {:ok, module}
      {:ok, []} -> {:error, :module_not_found}
      error -> error
    end
  end
end

Conclusion

By making Ash a first-class citizen in DSPex, we transform it from a simple DSPy bridge into a production-grade cognitive orchestration platform. This integration provides:

  1. Production Readiness: Built-in monitoring, security, and error handling
  2. Developer Experience: Clean APIs, automatic documentation, type safety
  3. Scalability: Leverages BEAM concurrency and Ash optimizations
  4. Maintainability: Clear separation of concerns, declarative design
  5. Extensibility: Easy to add new cognitive patterns and capabilities

The investment in Ash integration will pay dividends as DSPex evolves from a library into a platform for building cognitive applications. The declarative nature of Ash aligns perfectly with the cognitive orchestration vision, where variables become first-class optimization targets and every operation is observable and optimizable.

This is not just a technical integration—it’s a philosophical alignment that positions DSPex as the premier platform for cognitive orchestration in the Elixir ecosystem.