Comprehensive Testing Strategy for DSPex Python Bridge Pool System
Overview
This document outlines a robust testing strategy for the NimblePool-based Python bridge implementation. The strategy covers unit tests, integration tests, performance tests, and chaos engineering to ensure production readiness.
Testing Philosophy
- Isolation: Tests should be independent and not affect each other
- Determinism: Tests should produce consistent results
- Coverage: Test both happy paths and edge cases
- Performance: Tests should run efficiently
- Observability: Tests should provide clear failure diagnostics
Test Architecture Layers
Layer 1: Unit Tests (Fast, Isolated)
- Mock all external dependencies
- Test individual functions and modules
- Sub-second execution time
- Run on every commit
Layer 2: Component Tests (Integration)
- Test component interactions
- Use test doubles for Python processes
- Focus on protocol correctness
- Run on every push
Layer 3: System Tests (Full Integration)
- Real Python processes
- End-to-end scenarios
- Performance validation
- Run on CI/CD pipeline
Layer 4: Stress & Chaos Tests
- Load testing
- Fault injection
- Resource exhaustion
- Run nightly or on-demand
Component Testing Strategy
1. PoolWorker Tests
Unit Tests
defmodule DSPex.PythonBridge.PoolWorkerTest do
use ExUnit.Case, async: true
alias DSPex.PythonBridge.PoolWorker
describe "init_worker/1" do
test "initializes worker with correct state" do
pool_state = %{worker_id: "test_1"}
{:ok, worker_state, updated_pool} = PoolWorker.init_worker(pool_state)
assert worker_state.worker_id == "test_1"
assert worker_state.status == :ready
assert is_port(worker_state.port)
end
test "handles initialization failure gracefully" do
# Mock port spawn failure
with_mock Port, [open: fn(_, _) -> {:error, :enoent} end] do
pool_state = %{worker_id: "test_fail"}
assert {:error, :worker_init_failed} = PoolWorker.init_worker(pool_state)
end
end
end
describe "handle_checkout/4" do
test "binds worker to session on checkout" do
worker_state = %{session_id: nil, status: :ready}
from = {:session, "user_123"}
{:ok, _, updated_state, _} = PoolWorker.handle_checkout(:checkout, from, worker_state, %{})
assert updated_state.session_id == "user_123"
assert updated_state.status == :busy
end
test "maintains session affinity" do
worker_state = %{session_id: "user_123", status: :ready}
from = {:session, "user_123"}
{:ok, _, updated_state, _} = PoolWorker.handle_checkout(:checkout, from, worker_state, %{})
assert updated_state.session_id == "user_123"
end
test "rejects checkout for different session when bound" do
worker_state = %{session_id: "user_123", status: :busy}
from = {:session, "user_456"}
{:error, :session_mismatch} = PoolWorker.handle_checkout(:checkout, from, worker_state, %{})
end
end
describe "send_command/4" do
setup do
# Create a mock port
{:ok, port} = MockPort.start_link()
worker_state = %{port: port, session_id: "test", request_id: 0}
{:ok, worker_state: worker_state}
end
test "sends command and receives response", %{worker_state: worker_state} do
MockPort.expect_command(worker_state.port, "create_program", %{"id" => "prog_1"})
{:ok, response, _} = PoolWorker.send_command(worker_state, :create_program, %{}, 5000)
assert response["result"]["id"] == "prog_1"
end
test "handles timeout correctly", %{worker_state: worker_state} do
MockPort.simulate_timeout(worker_state.port)
assert {:error, :timeout} = PoolWorker.send_command(worker_state, :slow_op, %{}, 100)
end
test "correlates requests and responses", %{worker_state: worker_state} do
# Send multiple concurrent requests
tasks = for i <- 1..10 do
Task.async(fn ->
PoolWorker.send_command(worker_state, :echo, %{value: i}, 5000)
end)
end
results = Task.await_many(tasks)
values = Enum.map(results, fn {:ok, resp, _} -> resp["result"]["value"] end)
assert Enum.sort(values) == Enum.to_list(1..10)
end
end
describe "health_check/1" do
test "reports healthy for responsive worker" do
worker_state = %{port: MockPort.healthy(), last_health_check: 0}
{:ok, :healthy, updated_state} = PoolWorker.health_check(worker_state)
assert updated_state.last_health_check > 0
end
test "reports unhealthy for unresponsive worker" do
worker_state = %{port: MockPort.unhealthy(), last_health_check: 0}
{:ok, :unhealthy, updated_state} = PoolWorker.health_check(worker_state)
assert updated_state.health_failures > 0
end
end
end
Integration Tests
defmodule DSPex.PythonBridge.PoolWorkerIntegrationTest do
use ExUnit.Case
@moduletag :integration
@moduletag timeout: 30_000
setup do
# Start real Python process
{:ok, worker_state, _} = PoolWorker.init_worker(%{worker_id: "integration_test"})
on_exit(fn ->
PoolWorker.terminate_worker(:shutdown, worker_state, %{})
end)
{:ok, worker: worker_state}
end
test "full lifecycle with real Python process", %{worker: worker} do
# Create program
{:ok, create_resp, worker} = PoolWorker.send_command(
worker,
:create_program,
%{"signature" => %{"inputs" => ["query"], "outputs" => ["answer"]}},
10_000
)
program_id = create_resp["result"]["program_id"]
assert is_binary(program_id)
# Execute program
{:ok, exec_resp, worker} = PoolWorker.send_command(
worker,
:execute_program,
%{"program_id" => program_id, "inputs" => %{"query" => "test"}},
10_000
)
assert exec_resp["result"]["answer"] != nil
# Cleanup
{:ok, _, _} = PoolWorker.send_command(
worker,
:delete_program,
%{"program_id" => program_id},
5_000
)
end
end
2. SessionPool Tests
Unit Tests
defmodule DSPex.PythonBridge.SessionPoolTest do
use ExUnit.Case
alias DSPex.PythonBridge.SessionPool
describe "session management" do
setup do
{:ok, _} = SessionPool.start_link(name: :test_pool, pool_size: 2)
:ok
end
test "tracks sessions correctly" do
assert :ok = GenServer.call(:test_pool, {:track_session, "session_1"})
assert :ok = GenServer.call(:test_pool, {:track_session, "session_2"})
sessions = GenServer.call(:test_pool, :get_sessions)
assert map_size(sessions) == 2
assert Map.has_key?(sessions, "session_1")
assert Map.has_key?(sessions, "session_2")
end
test "ends sessions and cleans up" do
GenServer.call(:test_pool, {:track_session, "temp_session"})
assert :ok = GenServer.call(:test_pool, {:end_session, "temp_session"})
sessions = GenServer.call(:test_pool, :get_sessions)
refute Map.has_key?(sessions, "temp_session")
end
test "handles non-existent session end" do
assert {:error, :session_not_found} =
GenServer.call(:test_pool, {:end_session, "ghost_session"})
end
end
describe "metrics tracking" do
test "updates metrics on operations" do
{:ok, pool} = SessionPool.start_link(pool_size: 1)
initial_status = SessionPool.get_pool_status()
assert initial_status.metrics.total_operations == 0
# Perform operations
SessionPool.execute_in_session("metric_test", :ping, %{})
SessionPool.execute_in_session("metric_test", :ping, %{})
updated_status = SessionPool.get_pool_status()
assert updated_status.metrics.total_operations >= 2
end
end
describe "pool overflow handling" do
test "queues requests when pool is exhausted" do
{:ok, _} = SessionPool.start_link(
name: :small_pool,
pool_size: 1,
overflow: 0
)
# Occupy the single worker
task1 = Task.async(fn ->
SessionPool.execute_in_session("session_1", :sleep, %{duration: 500})
end)
# This should queue
task2 = Task.async(fn ->
SessionPool.execute_in_session("session_2", :ping, %{})
end)
# Both should eventually complete
assert {:ok, _} = Task.await(task1, 1000)
assert {:ok, _} = Task.await(task2, 1000)
end
test "respects checkout timeout" do
{:ok, _} = SessionPool.start_link(
name: :timeout_pool,
pool_size: 1,
overflow: 0,
checkout_timeout: 100
)
# Occupy the worker
Task.async(fn ->
SessionPool.execute_in_session("blocker", :sleep, %{duration: 500})
end)
# This should timeout
assert {:error, :pool_timeout} =
SessionPool.execute_in_session("waiter", :ping, %{}, pool_timeout: 100)
end
end
end
Concurrent Operation Tests
defmodule DSPex.PythonBridge.SessionPoolConcurrencyTest do
use ExUnit.Case
@moduletag :integration
@moduletag timeout: 60_000
test "handles concurrent sessions without interference" do
{:ok, _} = SessionPool.start_link(pool_size: 4)
# Create multiple concurrent sessions
sessions = for i <- 1..10, do: "concurrent_#{i}"
tasks = Enum.map(sessions, fn session_id ->
Task.async(fn ->
# Each session creates its own program
{:ok, program_id} = SessionPool.execute_in_session(
session_id,
:create_program,
%{signature: TestSignatures.simple()}
)
# Execute multiple times
for j <- 1..5 do
{:ok, result} = SessionPool.execute_in_session(
session_id,
:execute_program,
%{program_id: program_id, inputs: %{value: j}}
)
assert result["value"] == j * 2 # Assuming doubler program
end
# Cleanup
SessionPool.end_session(session_id)
:ok
end)
end)
# All should complete successfully
results = Task.await_many(tasks, 30_000)
assert Enum.all?(results, &(&1 == :ok))
end
test "session isolation prevents cross-contamination" do
{:ok, _} = SessionPool.start_link(pool_size: 2)
# Session 1 creates a program
{:ok, prog1} = SessionPool.execute_in_session(
"user_1",
:create_program,
%{id: "prog_1", signature: TestSignatures.simple()}
)
# Session 2 creates a different program
{:ok, prog2} = SessionPool.execute_in_session(
"user_2",
:create_program,
%{id: "prog_2", signature: TestSignatures.complex()}
)
# Session 1 should not see Session 2's program
{:ok, programs1} = SessionPool.execute_in_session("user_1", :list_programs, %{})
assert prog1 in programs1
refute prog2 in programs1
# Session 2 should not see Session 1's program
{:ok, programs2} = SessionPool.execute_in_session("user_2", :list_programs, %{})
assert prog2 in programs2
refute prog1 in programs2
end
end
3. Adapter Tests
defmodule DSPex.Adapters.PythonPoolTest do
use ExUnit.Case
alias DSPex.Adapters.PythonPool
describe "adapter interface compliance" do
test "implements all required callbacks" do
callbacks = [
{:create_program, 1},
{:execute_program, 2},
{:execute_program, 3},
{:list_programs, 0},
{:delete_program, 1},
{:get_program_info, 1},
{:health_check, 0},
{:get_stats, 0},
{:supports_test_layer?, 1},
{:get_test_capabilities, 0}
]
for {func, arity} <- callbacks do
assert function_exported?(PythonPool, func, arity),
"Missing required function: #{func}/#{arity}"
end
end
end
describe "session adapter creation" do
test "creates bound adapter instance" do
adapter = PythonPool.session_adapter("test_user")
assert is_map(adapter)
assert adapter.session_id == "test_user"
assert is_function(adapter.create_program, 1)
assert is_function(adapter.execute_program, 3)
end
test "bound adapter maintains session context" do
adapter = PythonPool.session_adapter("bound_test")
with_mock SessionPool, [execute_in_session: fn(sid, _, _, _) ->
send(self(), {:session_id, sid})
{:ok, %{}}
end] do
adapter.create_program(%{})
assert_received {:session_id, "bound_test"}
end
end
end
end
4. Property-Based Tests
defmodule DSPex.PythonBridge.PropertyTest do
use ExUnit.Case
use ExUnitProperties
property "pool handles any valid session ID" do
check all session_id <- string(:alphanumeric, min_length: 1) do
assert :ok = SessionPool.execute_in_session(session_id, :ping, %{})
assert :ok = SessionPool.end_session(session_id)
end
end
property "worker state transitions are valid" do
check all commands <- list_of(command_generator(), min_length: 1, max_length: 20) do
worker_state = %{status: :ready, session_id: nil}
final_state = Enum.reduce(commands, worker_state, fn cmd, state ->
case PoolWorker.handle_checkout(cmd.type, cmd.from, state, %{}) do
{:ok, _, new_state, _} -> new_state
{:error, _} -> state
end
end)
assert final_state.status in [:ready, :busy]
end
end
defp command_generator do
gen all type <- member_of([:checkout, :checkin]),
session <- string(:alphanumeric, min_length: 1, max_length: 10) do
%{type: type, from: {:session, session}}
end
end
end
Stress & Performance Testing
Load Testing
defmodule DSPex.PythonBridge.LoadTest do
use ExUnit.Case
@moduletag :load_test
@moduletag timeout: :infinity
test "handles specified load profile" do
config = %{
duration_seconds: 60,
concurrent_users: 100,
operations_per_second: 10,
pool_size: System.schedulers_online() * 2
}
{:ok, _} = SessionPool.start_link(pool_size: config.pool_size)
results = LoadRunner.run(config)
assert results.success_rate > 0.99
assert results.p95_latency < 100 # milliseconds
assert results.p99_latency < 500
assert results.errors == []
end
end
Chaos Engineering Tests
defmodule DSPex.PythonBridge.ChaosTest do
use ExUnit.Case
@moduletag :chaos
@moduletag timeout: 120_000
describe "fault injection" do
test "recovers from worker crashes" do
{:ok, _} = SessionPool.start_link(pool_size: 4)
# Start normal operations
operation_task = Task.async(fn ->
for i <- 1..100 do
SessionPool.execute_in_session("chaos_#{i}", :ping, %{})
Process.sleep(100)
end
end)
# Inject faults
Process.sleep(5_000)
ChaosMonkey.kill_random_worker()
Process.sleep(5_000)
ChaosMonkey.corrupt_worker_state()
# Operations should continue
assert Task.await(operation_task, 60_000)
# Pool should be healthy
assert {:ok, :healthy, _} = SessionPool.health_check()
end
test "handles memory pressure" do
{:ok, _} = SessionPool.start_link(pool_size: 2)
# Create memory pressure
ChaosMonkey.consume_memory(0.8) # Use 80% of available memory
# Pool should still function
assert {:ok, _} = SessionPool.execute_in_session("memory_test", :ping, %{})
end
test "handles network delays" do
{:ok, _} = SessionPool.start_link(pool_size: 2)
# Inject network delays
ChaosMonkey.add_latency(:port_communication, 500) # 500ms delay
# Operations should complete, just slower
assert {:ok, _} = SessionPool.execute_in_session(
"latency_test",
:ping,
%{},
timeout: 2000
)
end
end
end
Test Helpers and Fixtures
Mock Port Implementation
defmodule MockPort do
use GenServer
def start_link(opts \\\\ []) do
GenServer.start_link(__MODULE__, opts)
end
def expect_command(port, command, response) do
GenServer.call(port, {:expect, command, response})
end
def simulate_timeout(port) do
GenServer.call(port, :simulate_timeout)
end
def healthy do
{:ok, port} = start_link()
expect_command(port, "ping", %{"status" => "ok"})
port
end
def unhealthy do
{:ok, port} = start_link()
simulate_timeout(port)
port
end
# GenServer implementation...
end
Test Data Generators
defmodule TestDataGenerators do
def session_id do
"test_#{:rand.uniform(10000)}_#{System.unique_integer([:positive])}"
end
def program_config do
%{
id: "test_program_#{System.unique_integer([:positive])}",
signature: Enum.random([
TestSignatures.simple(),
TestSignatures.complex(),
TestSignatures.chain()
])
}
end
def bulk_operations(count) do
for _ <- 1..count do
%{
type: Enum.random([:create, :execute, :list, :delete]),
data: random_operation_data()
}
end
end
end
Performance Measurement
defmodule PerformanceHelpers do
def measure_operation(fun) do
start = System.monotonic_time(:microsecond)
result = fun.()
duration = System.monotonic_time(:microsecond) - start
{result, duration}
end
def benchmark_pool(config) do
warmup_iterations = config[:warmup] || 100
test_iterations = config[:iterations] || 1000
# Warmup
for _ <- 1..warmup_iterations do
config.operation.()
end
# Measure
timings = for _ <- 1..test_iterations do
{_, duration} = measure_operation(config.operation)
duration
end
%{
min: Enum.min(timings),
max: Enum.max(timings),
mean: mean(timings),
median: median(timings),
p95: percentile(timings, 95),
p99: percentile(timings, 99)
}
end
end
Test Isolation Strategies
1. Process Isolation
defmodule IsolatedTest do
use ExUnit.Case
setup do
# Start isolated supervision tree
{:ok, sup} = IsolatedSupervisor.start_link()
on_exit(fn ->
Supervisor.stop(sup)
end)
{:ok, supervisor: sup}
end
end
2. Port Isolation
defmodule PortIsolation do
def with_isolated_port(fun) do
# Use unique port names
port_id = "test_port_#{System.unique_integer([:positive])}"
# Ensure cleanup
try do
fun.(port_id)
after
cleanup_port(port_id)
end
end
end
3. Configuration Isolation
defmodule ConfigIsolation do
def with_config(config, fun) do
original = Application.get_all_env(:dspex)
try do
Enum.each(config, fn {key, value} ->
Application.put_env(:dspex, key, value)
end)
fun.()
after
# Restore original config
Enum.each(original, fn {key, value} ->
Application.put_env(:dspex, key, value)
end)
end
end
end
Continuous Integration Setup
Test Matrix
test:
strategy:
matrix:
elixir: ['1.17', '1.18']
otp: ['26', '27']
python: ['3.11', '3.12']
pool_size: [1, 4, 16]
steps:
- name: Unit Tests
run: mix test.fast
- name: Integration Tests
run: mix test.protocol
- name: Full System Tests
run: mix test.integration
- name: Load Tests
if: matrix.pool_size == 16
run: mix test --only load_test
- name: Chaos Tests
if: github.event_name == 'schedule'
run: mix test --only chaos
Performance Benchmarks
defmodule Benchmarks do
def run do
Benchee.run(%{
"single_session" => fn ->
SessionPool.execute_in_session("bench", :ping, %{})
end,
"concurrent_sessions" => fn ->
tasks = for i <- 1..10 do
Task.async(fn ->
SessionPool.execute_in_session("bench_#{i}", :ping, %{})
end)
end
Task.await_many(tasks)
end,
"session_creation" => fn ->
session = "bench_#{System.unique_integer()}"
SessionPool.execute_in_session(session, :ping, %{})
SessionPool.end_session(session)
end
})
end
end
Test Execution Strategy
Development
# Fast feedback loop
mix test.fast
# Before commit
mix test.protocol
CI/CD Pipeline
# PR validation
mix check.ci
# Nightly
mix test.all
mix test --only load_test
mix test --only chaos
Release Testing
# Full regression
mix test.all --cover
# Performance validation
mix run benchmarks.exs
# Stress testing
mix test --only stress --timeout infinity
Summary
This comprehensive testing strategy ensures:
- Unit Test Coverage: Every component tested in isolation
- Integration Validation: Components work together correctly
- Concurrency Safety: No race conditions or deadlocks
- Performance Goals: Meet latency and throughput requirements
- Fault Tolerance: System recovers from failures
- Production Readiness: Confidence in deployment
The strategy emphasizes automated testing at multiple levels with clear isolation boundaries and comprehensive coverage of both happy paths and edge cases.