โ† Back to Dspex

README V3 POOLER

Documentation for README_V3_POOLER from the Dspex repository.

DSPex V3 Pool Architecture - Complete Documentation

๐Ÿš€ Overview

The DSPex V3 Pool represents a major architectural advancement, delivering 1000x+ performance improvements through concurrent worker initialization and intelligent resource management. This document provides comprehensive coverage of the V3 pool implementation.

๐Ÿ“ˆ Performance Breakthroughs

Before V3 (Sequential Initialization)

  • V2 Pool: ~16 seconds for 8 workers (2s ร— 8 sequential)
  • Bottleneck: Sequential Python process creation
  • Scalability: Linear degradation with worker count

After V3 (Concurrent Initialization)

  • V3 Pool: ~10-30ms for 8 workers (parallel startup)
  • Speedup: 1000x+ faster initialization
  • Throughput: 1300+ requests/second under load
  • Scalability: Constant initialization time regardless of worker count

๐Ÿ—๏ธ Architecture Components

Core Components

1. DSPex.Python.Pool - Main Pool Manager

  • Location: lib/dspex/python/pool.ex
  • Purpose: Concurrent worker management and request distribution
  • Features:
    • Parallel worker initialization using Task.async_stream
    • Queue-based load balancing
    • Automatic request queueing when workers busy
    • Real-time statistics and monitoring

2. DSPex.Python.Worker - Individual Python Workers

  • Location: lib/dspex/python/worker.ex
  • Purpose: Python process management and command execution
  • Features:
    • Port-based Python communication
    • Command execution with timeout handling
    • Worker state management

3. DSPex.Python.WorkerSupervisor - Worker Process Supervision

  • Location: lib/dspex/python/worker_supervisor.ex
  • Purpose: Dynamic worker process lifecycle management
  • Features:
    • Dynamic worker spawning
    • Process monitoring and restart
    • Resource cleanup

4. DSPex.Python.Registry - Worker Discovery

  • Location: lib/dspex/python/registry.ex
  • Purpose: Worker PID and metadata tracking
  • Features:
    • Worker registration and lookup
    • Process monitoring
    • Metadata storage

5. DSPex.PythonBridge.SessionStore - Session Management

  • Location: lib/dspex/python_bridge/session_store.ex
  • Purpose: Session-based program and state management
  • Features:
    • ETS-backed session storage
    • Program CRUD operations (store_program/3, get_program/2, update_program/3)
    • TTL-based session expiration
    • Global program sharing

6. DSPex.Python.ProcessRegistry - Orphaned Process Management

  • Location: lib/dspex/python/process_registry.ex
  • Purpose: Cross-reference tracking of Python processes for intelligent cleanup
  • Features:
    • OS-level PID mapping (Worker ID โ†” Elixir PID โ†” Python PID)
    • Process fingerprinting for unique identification
    • Automatic cleanup of dead worker entries
    • 100% active worker protection during cleanup

๐Ÿ“– Complete Process Management Documentation โ†’

๐Ÿ”ง Configuration

Basic Pool Configuration

# Application configuration
config :dspex, :pool_config, %{
  v2_enabled: false,    # Disable legacy V2 pool
  v3_enabled: true,     # Enable V3 pool
  pool_size: 8          # Number of concurrent Python workers
}

config :dspex, :pooling_enabled, true

Advanced Configuration

# Pool-specific settings
config :dspex, DSPex.Python.Pool,
  size: System.schedulers_online() * 2,  # Default pool size
  startup_timeout: 10_000,               # Worker startup timeout
  queue_timeout: 5_000,                  # Request queue timeout
  max_concurrency: 8                     # Concurrent worker initialization

# Worker configuration  
config :dspex, DSPex.Python.Worker,
  execution_timeout: 30_000,             # Command execution timeout
  restart_strategy: :temporary           # Worker restart behavior

# Session store configuration
config :dspex, DSPex.PythonBridge.SessionStore,
  table_name: :dspex_sessions,           # ETS table name
  cleanup_interval: 60_000,              # Session cleanup interval
  default_ttl: 3600                      # Session TTL in seconds

๐Ÿ“š API Reference

Pool Management

Starting the Pool

# Start with default configuration
{:ok, _pid} = DSPex.Python.Pool.start_link()

# Start with custom configuration
{:ok, _pid} = DSPex.Python.Pool.start_link(size: 12, name: MyPool)

Executing Commands

# Simple command execution
{:ok, result} = DSPex.Python.Pool.execute("ping", %{test: true})

# Command with custom timeout
{:ok, result} = DSPex.Python.Pool.execute("analyze", %{data: data}, timeout: 60_000)

# Anonymous execution (no session context)
{:ok, result} = DSPex.Python.Pool.execute("quick_task", %{input: "data"})

Session-Based Execution

# Execute with session context
session_id = "user_session_123"

# Create a DSPy program in session
{:ok, program} = DSPex.Python.Pool.execute_in_session(
  session_id, 
  "create_program", 
  %{
    id: "qa_program",
    signature: %{
      inputs: [%{name: "question", type: "str"}],
      outputs: [%{name: "answer", type: "str"}]
    },
    instructions: "Answer questions concisely"
  }
)

# Execute the program with session continuity
{:ok, response} = DSPex.Python.Pool.execute_in_session(
  session_id,
  "execute_program",
  %{
    program_id: "qa_program", 
    inputs: %{question: "What is DSPy?"}
  }
)

Session Management

Session Store Operations

# Create a new session
{:ok, session} = DSPex.PythonBridge.SessionStore.create_session("session_123")

# Get session data
{:ok, session} = DSPex.PythonBridge.SessionStore.get_session("session_123")

# Update session with custom data
{:ok, updated_session} = DSPex.PythonBridge.SessionStore.update_session(
  "session_123", 
  fn session -> 
    Map.put(session, :custom_data, %{user_id: 456})
  end
)

Program Management

# Store program in session
:ok = DSPex.PythonBridge.SessionStore.store_program(
  "session_123", 
  "my_program", 
  %{signature: %{}, instructions: "..."}
)

# Retrieve program from session
{:ok, program} = DSPex.PythonBridge.SessionStore.get_program("session_123", "my_program")

# Update program data
:ok = DSPex.PythonBridge.SessionStore.update_program(
  "session_123", 
  "my_program", 
  %{updated_field: "new_value"}
)

Global Program Sharing

# Store program globally (accessible to all workers)
:ok = DSPex.PythonBridge.SessionStore.store_global_program(
  "shared_qa_program", 
  %{signature: %{}, instructions: "Global Q&A program"}
)

# Access global program from any session
{:ok, program} = DSPex.PythonBridge.SessionStore.get_global_program("shared_qa_program")

Pool Statistics and Monitoring

# Get comprehensive pool statistics
stats = DSPex.Python.Pool.get_stats()
# Returns:
# %{
#   workers: 8,           # Total workers
#   available: 6,         # Available workers  
#   busy: 2,             # Busy workers
#   queued: 0,           # Queued requests
#   requests: 1234,      # Total requests processed
#   errors: 5,           # Total errors
#   queue_timeouts: 1    # Queue timeout count
# }

# List all worker IDs
worker_ids = DSPex.Python.Pool.list_workers()

# Get session store statistics
session_stats = DSPex.PythonBridge.SessionStore.get_stats()

๐Ÿ”„ Request Flow

1. Request Reception

Client Request โ†’ Pool.execute() โ†’ GenServer.call()

2. Worker Assignment

Pool Manager โ†’ checkout_worker() โ†’ Available Queue โ†’ Worker Assignment

3. Concurrent Execution

Task.start() โ†’ Worker.execute() โ†’ Python Process โ†’ Response

4. Response Handling

Worker Complete โ†’ GenServer.cast() โ†’ Client Reply โ†’ Worker Return to Pool

5. Queue Management

No Workers Available โ†’ Request Queue โ†’ Timeout Management โ†’ FIFO Processing

๐Ÿ’Ž Advanced Features

Concurrent Worker Initialization

The V3 pool’s primary innovation is concurrent worker startup:

# V3 Pool concurrent initialization
defp start_workers_concurrently(count) do
  1..count
  |> Task.async_stream(
    fn i ->
      worker_id = "python_worker_#{i}_#{:erlang.unique_integer([:positive])}"
      case DSPex.Python.WorkerSupervisor.start_worker(worker_id) do
        {:ok, _pid} -> worker_id
        {:error, reason} -> nil
      end
    end,
    timeout: @startup_timeout,
    max_concurrency: count,      # All workers start simultaneously
    on_timeout: :kill_task
  )
  |> Enum.filter(&(&1 != nil))
end

Key Benefits:

  • Parallel Startup: All workers initialize simultaneously
  • Fault Tolerance: Failed workers don’t block others
  • Timeout Protection: Individual worker timeouts don’t affect pool
  • Resource Efficiency: Optimal CPU and I/O utilization

Intelligent Queue Management

# Request queueing when no workers available
case checkout_worker(state) do
  {:ok, worker_id, new_state} -> 
    # Execute immediately
    execute_on_worker(worker_id, command, args, opts)
    
  {:error, :no_workers} ->
    # Queue request with timeout
    request = {from, command, args, opts, System.monotonic_time()}
    new_queue = :queue.in(request, state.request_queue)
    Process.send_after(self(), {:queue_timeout, from}, @queue_timeout)
end

Session Data Enhancement

The V3 pool automatically enhances execution arguments with session context:

defp enhance_args_with_session_data(args, session_id, command) do
  base_args = Map.put(args, :session_id, session_id)
  
  # For execute_program commands, fetch program data from SessionStore
  if command == "execute_program" do
    program_id = Map.get(args, :program_id)
    case DSPex.PythonBridge.SessionStore.get_program(session_id, program_id) do
      {:ok, program_data} -> Map.put(base_args, :program_data, program_data)
      {:error, _} -> base_args
    end
  else
    base_args
  end
end

๐Ÿงช Testing and Validation

Performance Testing

# Run V3 pool performance demo
elixir examples/pool_v3_demo.exs

# Run detailed input/output demo  
elixir examples/pool_v3_demo_detailed.exs

# Performance comparison
elixir examples/pool_comparison.exs

Load Testing

# Concurrent load test
tasks = for i <- 1..1000 do
  Task.async(fn ->
    DSPex.Python.Pool.execute("ping", %{id: i, data: "load_test"})
  end)
end

results = Task.await_many(tasks, 60_000)
success_rate = Enum.count(results, &match?({:ok, _}, &1)) / 1000

Integration Testing

# Full integration tests
TEST_MODE=full_integration mix test test/dspex/python/

# Pool-specific tests
mix test test/dspex/python/pool_test.exs
mix test test/dspex/python/worker_test.exs

๐Ÿ”ง Migration from V2 to V3

Configuration Updates

# Before (V2)
config :dspex, DSPex.PythonBridge.SessionPoolV2,
  pool_size: 4,
  worker_module: DSPex.PythonBridge.PoolWorkerV2

# After (V3)  
config :dspex, :pool_config, %{
  v2_enabled: false,
  v3_enabled: true,
  pool_size: 8
}

API Updates

# Before (V2)
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
  session_id, :create_program, args
)

# After (V3)
DSPex.Python.Pool.execute_in_session(
  session_id, "create_program", args
)

Session Store Compatibility

The V3 pool maintains full compatibility with existing session data:

  • Session Structure: Unchanged
  • Program Storage: Enhanced with get_program/2 function
  • Global Programs: Backward compatible
  • TTL Management: Improved efficiency

๐Ÿš€ Production Deployment

# Production config
config :dspex, :pool_config, %{
  v3_enabled: true,
  pool_size: System.schedulers_online() * 2  # Optimal for most workloads
}

config :dspex, DSPex.Python.Pool,
  startup_timeout: 30_000,    # Longer timeout for production
  queue_timeout: 10_000       # More generous queue timeout

config :dspex, DSPex.PythonBridge.SessionStore,
  cleanup_interval: 300_000,  # 5 minutes
  default_ttl: 7200          # 2 hours

Monitoring and Observability

# Add telemetry for pool metrics
:telemetry.attach_many(
  "dspex-pool-metrics",
  [
    [:dspex, :pool, :request, :start],
    [:dspex, :pool, :request, :stop],
    [:dspex, :pool, :worker, :checkout],
    [:dspex, :pool, :worker, :return]
  ],
  &MyApp.Telemetry.handle_event/4,
  nil
)

# Regular health checks
defmodule PoolHealthCheck do
  def check_pool_health do
    stats = DSPex.Python.Pool.get_stats()
    
    cond do
      stats.available == 0 -> {:warning, "No available workers"}
      stats.errors / stats.requests > 0.05 -> {:error, "High error rate"}
      stats.queue_timeouts > 0 -> {:warning, "Queue timeouts detected"}
      true -> {:ok, "Pool healthy"}
    end
  end
end

Scaling Considerations

  • Worker Count: Start with System.schedulers_online() * 2
  • Queue Timeouts: Monitor and adjust based on workload
  • Session TTL: Balance memory usage vs. session persistence
  • Memory Management: Monitor ETS table sizes
  • Python Process Resources: Consider Python memory usage per worker

๐Ÿ” Troubleshooting

Common Issues

1. Slow Pool Initialization

Problem: Pool startup taking longer than expected
Solution: Check Python environment, increase startup_timeout

2. Worker Startup Failures

Problem: Some workers fail to start
Solution: Verify Python dependencies, check GEMINI_API_KEY

3. Queue Timeouts

Problem: Requests timing out in queue
Solution: Increase pool_size or queue_timeout

4. Session Store Issues

Problem: Programs not found in sessions
Solution: Verify get_program/2 function, check session creation

Debug Commands

# Check worker status
iex> DSPex.Python.Pool.get_stats()

# List active workers  
iex> DSPex.Python.Pool.list_workers()

# Check session store
iex> DSPex.PythonBridge.SessionStore.get_stats()

# Monitor worker processes
iex> DSPex.Python.Registry.list_all_workers()

๐Ÿ“Š Performance Benchmarks

Initialization Performance

  • V2 Sequential: 16,000ms (8 workers)
  • V3 Concurrent: 10-30ms (8 workers)
  • Improvement: 1000x+ faster

Execution Performance

  • Throughput: 1300+ requests/second
  • Latency: <10ms for simple operations
  • Concurrency: Handles 100+ concurrent requests efficiently

Memory Usage

  • Worker Memory: ~50MB per Python worker
  • Session Store: ~1KB per session in ETS
  • Pool Overhead: <1MB for pool management

๐ŸŽฏ Future Enhancements

Planned Features

  • Adaptive Pool Sizing: Dynamic worker scaling based on load
  • Worker Health Monitoring: Automatic unhealthy worker replacement
  • Advanced Load Balancing: Weighted round-robin, least-connections
  • Metrics Dashboard: Real-time pool performance visualization
  • Circuit Breaker Integration: Automatic failure protection

Research Areas

  • Worker Affinity: Session-to-worker binding for state optimization
  • Predictive Scaling: ML-based worker count optimization
  • Cross-Pool Load Balancing: Multiple pool coordination
  • Streaming Execution: Long-running Python process support

๐Ÿ“‹ Summary

The DSPex V3 Pool delivers revolutionary performance improvements through:

  • ๐Ÿš€ 1000x+ Faster Initialization: Concurrent worker startup
  • โšก High Throughput: 1300+ requests/second capacity
  • ๐Ÿ”„ Intelligent Queueing: Non-blocking request management
  • ๐Ÿ“Š Session Continuity: Enhanced session and program management
  • ๐Ÿ”ง Production Ready: Comprehensive monitoring and fault tolerance

The V3 architecture positions DSPex as a production-grade solution for integrating Elixir’s concurrent capabilities with Python’s DSPy framework, enabling scalable LLM applications with enterprise-level performance and reliability.