V2 Pool Technical Design Series: Document 5 - Test Infrastructure Overhaul
Overview
This document details the comprehensive test infrastructure overhaul for Phase 4. It addresses test isolation, deterministic execution, performance benchmarking, and provides patterns for reliable concurrent testing of the pool system.
Test Architecture Principles
Core Principles
- Complete Isolation - Each test runs in its own supervision tree
- Deterministic Execution - Predictable timing and ordering
- Resource Management - Proper cleanup and resource limits
- Observability - Comprehensive logging and metrics
- Performance Tracking - Benchmark regression detection
Test Layers
graph TD
subgraph "Test Layers"
U[Unit Tests
Worker/Module Level] I[Integration Tests
Pool Operations] S[System Tests
Full Stack] P[Performance Tests
Load & Stress] C[Chaos Tests
Failure Scenarios] end U --> I I --> S S --> P S --> C
Worker/Module Level] I[Integration Tests
Pool Operations] S[System Tests
Full Stack] P[Performance Tests
Load & Stress] C[Chaos Tests
Failure Scenarios] end U --> I I --> S S --> P S --> C
Test Isolation Framework
File: test/support/test_isolation.ex
(new file)
defmodule DSPex.Test.TestIsolation do
@moduledoc """
Provides complete test isolation with dedicated supervision trees.
"""
import ExUnit.Callbacks
require Logger
@doc """
Starts an isolated test environment with its own supervision tree.
"""
defmacro isolated_test(name, opts \\ [], do: block) do
quote do
test unquote(name), context do
test_id = "test_#{System.unique_integer([:positive])}"
# Start isolated supervision tree
{:ok, supervisor} = DSPex.Test.TestIsolation.start_isolated_supervisor(
test_id,
unquote(opts)
)
# Ensure cleanup on exit
on_exit(fn ->
DSPex.Test.TestIsolation.cleanup_test_environment(test_id, supervisor)
end)
# Add test environment to context
context = Map.merge(context, %{
test_id: test_id,
supervisor: supervisor,
test_env: DSPex.Test.TestIsolation.build_test_env(test_id, unquote(opts))
})
# Execute test block
unquote(block)
end
end
end
@doc """
Starts an isolated supervisor for a test.
"""
def start_isolated_supervisor(test_id, opts) do
children = build_supervision_tree(test_id, opts)
Supervisor.start_link(
children,
strategy: :one_for_one,
name: :"test_supervisor_#{test_id}"
)
end
@doc """
Builds test environment configuration.
"""
def build_test_env(test_id, opts) do
layer = Keyword.get(opts, :layer, :layer_1)
pool_config = Keyword.get(opts, :pool_config, %{})
%{
test_id: test_id,
layer: layer,
pool_name: :"test_pool_#{test_id}",
registry_name: :"test_registry_#{test_id}",
pool_config: Map.merge(default_pool_config(layer), pool_config),
adapters: configure_adapters(layer, test_id)
}
end
@doc """
Cleans up test environment.
"""
def cleanup_test_environment(test_id, supervisor) do
# Stop supervisor gracefully
Supervisor.stop(supervisor, :shutdown)
# Clean up any remaining processes
cleanup_orphaned_processes(test_id)
# Clean up ETS tables
cleanup_ets_tables(test_id)
# Clean up files
cleanup_test_files(test_id)
Logger.debug("Test environment #{test_id} cleaned up")
end
defp build_supervision_tree(test_id, opts) do
layer = Keyword.get(opts, :layer, :layer_1)
base_children = [
# Test-specific registry
{Registry, keys: :unique, name: :"test_registry_#{test_id}"},
# Error recovery orchestrator
{DSPex.PythonBridge.ErrorRecoveryOrchestrator, name: :"error_orchestrator_#{test_id}"},
# Circuit breaker
{DSPex.PythonBridge.CircuitBreaker, name: :"circuit_breaker_#{test_id}"}
]
# Add layer-specific children
layer_children = case layer do
:layer_1 ->
# Mock only
[]
:layer_2 ->
# Bridge mock
[
{DSPex.Test.BridgeMock, name: :"bridge_mock_#{test_id}"}
]
:layer_3 ->
# Full integration
pool_config = Keyword.get(opts, :pool_config, %{})
[
{DSPex.PythonBridge.SessionPoolV2,
Keyword.merge([
name: :"pool_genserver_#{test_id}",
pool_name: :"test_pool_#{test_id}",
lazy: false, # Eager init for tests
pool_size: 2,
max_overflow: 0
], pool_config)}
]
end
base_children ++ layer_children
end
defp default_pool_config(:layer_1), do: %{timeout: 1_000}
defp default_pool_config(:layer_2), do: %{timeout: 5_000}
defp default_pool_config(:layer_3), do: %{timeout: 10_000, init_timeout: 15_000}
defp configure_adapters(layer, test_id) do
case layer do
:layer_1 ->
%{primary: DSPex.Adapters.Mock}
:layer_2 ->
%{primary: DSPex.Adapters.BridgeMock}
:layer_3 ->
%{primary: DSPex.Adapters.PythonPoolV2}
end
end
defp cleanup_orphaned_processes(test_id) do
# Find all processes with test_id in their name
Process.list()
|> Enum.filter(fn pid ->
case Process.info(pid, :registered_name) do
{:registered_name, name} when is_atom(name) ->
String.contains?(Atom.to_string(name), test_id)
_ ->
false
end
end)
|> Enum.each(&Process.exit(&1, :kill))
end
defp cleanup_ets_tables(test_id) do
:ets.all()
|> Enum.filter(fn table ->
case :ets.info(table, :name) do
name when is_atom(name) ->
String.contains?(Atom.to_string(name), test_id)
_ ->
false
end
end)
|> Enum.each(&:ets.delete/1)
end
defp cleanup_test_files(test_id) do
# Clean up any temporary files created during test
test_dir = Path.join(System.tmp_dir!(), "dspex_test_#{test_id}")
File.rm_rf(test_dir)
end
end
Deterministic Test Helpers
File: test/support/deterministic_helpers.ex
(new file)
defmodule DSPex.Test.DeterministicHelpers do
@moduledoc """
Helpers for deterministic test execution.
"""
@doc """
Waits for a condition with timeout and polling.
"""
def wait_for(condition_fn, timeout \\ 5_000, poll_interval \\ 50) do
deadline = System.monotonic_time(:millisecond) + timeout
wait_loop(condition_fn, deadline, poll_interval)
end
defp wait_loop(condition_fn, deadline, poll_interval) do
if condition_fn.() do
:ok
else
now = System.monotonic_time(:millisecond)
if now < deadline do
Process.sleep(poll_interval)
wait_loop(condition_fn, deadline, poll_interval)
else
{:error, :timeout}
end
end
end
@doc """
Synchronizes multiple concurrent operations.
"""
def synchronize(operations) when is_list(operations) do
parent = self()
ref = make_ref()
# Start all operations
pids = Enum.map(operations, fn {name, fun} ->
spawn_link(fn ->
# Wait for go signal
receive do
{:go, ^ref} ->
result = fun.()
send(parent, {:result, ref, name, result})
end
end)
end)
# Give all processes time to reach receive
Process.sleep(10)
# Send go signal to all
Enum.each(pids, &send(&1, {:go, ref}))
# Collect results
results = for _ <- operations do
receive do
{:result, ^ref, name, result} -> {name, result}
after
5_000 -> {:error, :timeout}
end
end
Enum.into(results, %{})
end
@doc """
Ensures pool has minimum workers ready.
"""
def ensure_pool_ready(pool_name, min_workers \\ 1, timeout \\ 10_000) do
wait_for(
fn ->
case get_pool_stats(pool_name) do
{:ok, stats} ->
stats.ready_workers >= min_workers
_ ->
false
end
end,
timeout
)
end
@doc """
Simulates controlled failures.
"""
def inject_failure(type, target, opts \\ []) do
case type do
:port_crash ->
simulate_port_crash(target)
:timeout ->
simulate_timeout(target, Keyword.get(opts, :duration, 5_000))
:network_partition ->
simulate_network_partition(target, Keyword.get(opts, :duration, 1_000))
:resource_exhaustion ->
simulate_resource_exhaustion(target)
end
end
defp simulate_port_crash(worker_id) do
# Find worker process and kill its port
case find_worker_process(worker_id) do
{:ok, pid} ->
send(pid, {:port_crash, :simulated})
:ok
_ ->
{:error, :worker_not_found}
end
end
defp simulate_timeout(target, duration) do
# Block target process
spawn(fn ->
Process.sleep(duration)
send(target, {:unblock, :timeout_simulation})
end)
send(target, {:block, :timeout_simulation, duration})
end
defp simulate_network_partition(target, duration) do
# Temporarily break communication
:ok
end
defp simulate_resource_exhaustion(target) do
# Consume resources
:ok
end
defp find_worker_process(worker_id) do
# Implementation specific to pool
{:ok, self()} # Placeholder
end
defp get_pool_stats(pool_name) do
# Implementation specific to pool monitoring
{:ok, %{ready_workers: 2}} # Placeholder
end
end
Pool-Specific Test Helpers
File: test/support/pool_test_helpers.ex
(update existing)
defmodule DSPex.Test.PoolTestHelpers do
@moduledoc """
Enhanced helpers for pool-specific testing.
"""
import DSPex.Test.{TestIsolation, DeterministicHelpers}
@doc """
Starts a test pool with full isolation and monitoring.
"""
def start_isolated_pool(opts \\ []) do
test_id = Keyword.get(opts, :test_id, "pool_test_#{System.unique_integer()}")
pool_opts = Keyword.merge([
name: :"test_pool_genserver_#{test_id}",
pool_name: :"test_pool_#{test_id}",
lazy: false,
pool_size: 2,
max_overflow: 0,
init_timeout: 10_000,
strategy: :lifo
], opts)
# Start with supervisor
{:ok, supervisor} = start_isolated_supervisor(test_id, layer: :layer_3, pool_config: pool_opts)
# Wait for pool to be ready
:ok = ensure_pool_ready(pool_opts[:pool_name], pool_opts[:pool_size])
%{
supervisor: supervisor,
pool_name: pool_opts[:pool_name],
genserver_name: pool_opts[:name],
test_id: test_id,
pool_opts: pool_opts
}
end
@doc """
Executes operations concurrently on the pool.
"""
def concurrent_pool_operations(pool_name, operations) do
operations
|> Enum.map(fn {name, {session_id, command, args}} ->
{name, fn ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
session_id,
command,
args,
pool_name: pool_name
)
end}
end)
|> synchronize()
end
@doc """
Monitors pool health during test execution.
"""
def with_pool_monitoring(pool_name, fun) do
# Start monitoring
monitor_ref = start_pool_monitor(pool_name)
try do
# Execute test
result = fun.()
# Get monitoring results
stats = stop_pool_monitor(monitor_ref)
{result, stats}
after
# Ensure cleanup
stop_pool_monitor(monitor_ref)
end
end
@doc """
Simulates various pool failure scenarios.
"""
def simulate_pool_failure(pool_name, scenario, opts \\ []) do
case scenario do
:worker_cascade_failure ->
# Kill workers in sequence
simulate_cascade_failure(pool_name, opts)
:thundering_herd ->
# Many simultaneous requests
simulate_thundering_herd(pool_name, opts)
:slow_worker ->
# Make one worker slow
simulate_slow_worker(pool_name, opts)
:memory_leak ->
# Simulate memory growth
simulate_memory_leak(pool_name, opts)
end
end
defp start_pool_monitor(pool_name) do
parent = self()
ref = make_ref()
spawn(fn ->
stats = monitor_loop(pool_name, %{
checkouts: 0,
checkins: 0,
errors: 0,
max_queue_depth: 0,
worker_restarts: 0
})
send(parent, {ref, :stats, stats})
end)
ref
end
defp monitor_loop(pool_name, stats) do
receive do
:stop ->
stats
{:pool_event, event, metadata} ->
updated_stats = update_stats(stats, event, metadata)
monitor_loop(pool_name, updated_stats)
after
100 ->
# Poll pool state
monitor_loop(pool_name, stats)
end
end
defp stop_pool_monitor(ref) do
receive do
{^ref, :stats, stats} -> stats
after
1_000 -> %{}
end
end
defp update_stats(stats, event, _metadata) do
case event do
:checkout -> Map.update!(stats, :checkouts, &(&1 + 1))
:checkin -> Map.update!(stats, :checkins, &(&1 + 1))
:error -> Map.update!(stats, :errors, &(&1 + 1))
_ -> stats
end
end
defp simulate_cascade_failure(pool_name, opts) do
count = Keyword.get(opts, :worker_count, 2)
delay = Keyword.get(opts, :delay, 100)
for i <- 1..count do
inject_failure(:port_crash, {:worker_index, i - 1})
Process.sleep(delay)
end
end
defp simulate_thundering_herd(pool_name, opts) do
request_count = Keyword.get(opts, :requests, 100)
tasks = for i <- 1..request_count do
Task.async(fn ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"herd_session_#{i}",
:test_command,
%{index: i},
pool_name: pool_name
)
end)
end
Task.await_many(tasks, 30_000)
end
defp simulate_slow_worker(_pool_name, _opts) do
# Implementation
:ok
end
defp simulate_memory_leak(_pool_name, _opts) do
# Implementation
:ok
end
end
Test Scenarios
Unit Test Example
defmodule DSPex.PythonBridge.PoolWorkerV2Test do
use ExUnit.Case
import DSPex.Test.TestIsolation
alias DSPex.PythonBridge.PoolWorkerV2Enhanced
describe "worker lifecycle" do
isolated_test "initializes worker correctly", layer: :layer_1 do
# Mock port for unit testing
port = make_ref()
# Test init_worker
assert {:ok, worker, pool_state} = PoolWorkerV2Enhanced.init_worker(%{})
assert worker.state_machine.state == :ready
assert worker.health_check_failures == 0
end
isolated_test "handles checkout state transitions", layer: :layer_1 do
worker = build_test_worker()
# Test checkout
{:ok, client_state, updated_worker, pool_state} =
PoolWorkerV2Enhanced.handle_checkout(
{:session, "test_session"},
{self(), make_ref()},
worker,
%{}
)
assert updated_worker.state_machine.state == :busy
assert updated_worker.current_session == "test_session"
end
end
defp build_test_worker do
%PoolWorkerV2Enhanced{
port: make_ref(),
worker_id: "test_worker",
state_machine: DSPex.PythonBridge.WorkerStateMachine.new("test_worker"),
health_check_failures: 0,
last_health_check: System.monotonic_time(:millisecond),
config: %{}
}
end
end
Integration Test Example
defmodule DSPex.PythonBridge.PoolIntegrationTest do
use ExUnit.Case
import DSPex.Test.{TestIsolation, PoolTestHelpers}
describe "pool operations" do
isolated_test "handles concurrent sessions", layer: :layer_3 do
pool_info = start_isolated_pool(pool_size: 3)
# Execute concurrent operations
results = concurrent_pool_operations(
pool_info.pool_name,
[
{:op1, {"session_1", :create_program, %{signature: test_signature()}}},
{:op2, {"session_2", :create_program, %{signature: test_signature()}}},
{:op3, {"session_3", :create_program, %{signature: test_signature()}}}
]
)
assert {:ok, _} = results.op1
assert {:ok, _} = results.op2
assert {:ok, _} = results.op3
end
isolated_test "recovers from worker failures", layer: :layer_3 do
pool_info = start_isolated_pool(pool_size: 2)
# Monitor pool during failure simulation
{result, stats} = with_pool_monitoring(pool_info.pool_name, fn ->
# Create initial session
{:ok, program_id} = DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"test_session",
:create_program,
%{signature: test_signature()},
pool_name: pool_info.pool_name
)
# Simulate worker failure
inject_failure(:port_crash, {:session, "test_session"})
# Try to use session again - should recover
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"test_session",
:execute,
%{program_id: program_id, inputs: %{}},
pool_name: pool_info.pool_name
)
end)
assert {:ok, _} = result
assert stats.worker_restarts > 0
end
end
defp test_signature do
%{
"inputs" => [%{"name" => "input", "type" => "string"}],
"outputs" => [%{"name" => "output", "type" => "string"}]
}
end
end
Performance Test Example
defmodule DSPex.PythonBridge.PoolPerformanceTest do
use ExUnit.Case
import DSPex.Test.{TestIsolation, PoolTestHelpers}
@tag :performance
@tag timeout: 300_000 # 5 minutes
describe "performance benchmarks" do
isolated_test "throughput under load", layer: :layer_3 do
pool_info = start_isolated_pool(
pool_size: 10,
max_overflow: 5
)
# Warm up pool
warmup_pool(pool_info.pool_name, 100)
# Benchmark
results = benchmark_pool_operations(pool_info.pool_name, %{
duration: 60_000, # 1 minute
concurrent_clients: 50,
operation_mix: [
{70, :read_operation},
{20, :write_operation},
{10, :complex_operation}
]
})
# Assert performance targets
assert results.avg_latency < 100 # ms
assert results.p99_latency < 500 # ms
assert results.throughput > 1000 # ops/sec
assert results.error_rate < 0.01 # 1%
end
isolated_test "latency distribution", layer: :layer_3 do
pool_info = start_isolated_pool(pool_size: 5)
latencies = measure_latency_distribution(
pool_info.pool_name,
1000 # operations
)
# Calculate percentiles
percentiles = calculate_percentiles(latencies, [50, 90, 95, 99, 99.9])
assert percentiles[50] < 50 # median
assert percentiles[90] < 100 # p90
assert percentiles[95] < 200 # p95
assert percentiles[99] < 500 # p99
end
end
defp warmup_pool(pool_name, operations) do
tasks = for i <- 1..operations do
Task.async(fn ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"warmup_#{i}",
:health_check,
%{},
pool_name: pool_name
)
end)
end
Task.await_many(tasks, 30_000)
end
defp benchmark_pool_operations(pool_name, config) do
start_time = System.monotonic_time(:millisecond)
deadline = start_time + config.duration
# Start concurrent clients
clients = for i <- 1..config.concurrent_clients do
start_benchmark_client(pool_name, i, deadline, config.operation_mix)
end
# Wait for completion
results = Task.await_many(clients, config.duration + 10_000)
# Aggregate results
aggregate_benchmark_results(results)
end
defp start_benchmark_client(pool_name, client_id, deadline, operation_mix) do
Task.async(fn ->
run_client_operations(pool_name, client_id, deadline, operation_mix, %{
operations: 0,
errors: 0,
latencies: []
})
end)
end
defp run_client_operations(pool_name, client_id, deadline, operation_mix, stats) do
if System.monotonic_time(:millisecond) < deadline do
# Select operation based on mix
operation = select_operation(operation_mix)
# Execute and measure
start = System.monotonic_time(:microsecond)
result = execute_benchmark_operation(pool_name, client_id, operation)
latency = System.monotonic_time(:microsecond) - start
# Update stats
updated_stats = %{
operations: stats.operations + 1,
errors: stats.errors + if(match?({:error, _}, result), do: 1, else: 0),
latencies: [latency | stats.latencies]
}
run_client_operations(pool_name, client_id, deadline, operation_mix, updated_stats)
else
stats
end
end
defp select_operation(operation_mix) do
total_weight = Enum.reduce(operation_mix, 0, fn {weight, _}, sum -> sum + weight end)
random = :rand.uniform() * total_weight
Enum.reduce_while(operation_mix, random, fn {weight, op}, remaining ->
if remaining <= weight do
{:halt, op}
else
{:cont, remaining - weight}
end
end)
end
defp execute_benchmark_operation(pool_name, client_id, operation) do
session_id = "bench_#{client_id}"
case operation do
:read_operation ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
session_id,
:list_programs,
%{},
pool_name: pool_name
)
:write_operation ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
session_id,
:create_program,
%{signature: simple_signature()},
pool_name: pool_name
)
:complex_operation ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
session_id,
:execute,
%{program_id: "test", inputs: %{data: generate_data()}},
pool_name: pool_name
)
end
end
defp aggregate_benchmark_results(client_results) do
all_latencies = Enum.flat_map(client_results, & &1.latencies)
total_operations = Enum.sum(Enum.map(client_results, & &1.operations))
total_errors = Enum.sum(Enum.map(client_results, & &1.errors))
%{
avg_latency: Enum.sum(all_latencies) / length(all_latencies) / 1000, # to ms
p99_latency: calculate_percentile(all_latencies, 99) / 1000,
throughput: total_operations / 60, # per second
error_rate: total_errors / total_operations
}
end
defp calculate_percentiles(values, percentiles) do
sorted = Enum.sort(values)
count = length(sorted)
Enum.into(percentiles, %{}, fn p ->
index = round(count * p / 100) - 1
{p, Enum.at(sorted, max(0, index))}
end)
end
defp calculate_percentile(values, percentile) do
sorted = Enum.sort(values)
index = round(length(sorted) * percentile / 100) - 1
Enum.at(sorted, max(0, index))
end
defp simple_signature do
%{"inputs" => [], "outputs" => [%{"name" => "result", "type" => "string"}]}
end
defp generate_data do
:crypto.strong_rand_bytes(100) |> Base.encode64()
end
defp measure_latency_distribution(pool_name, operation_count) do
tasks = for i <- 1..operation_count do
Task.async(fn ->
start = System.monotonic_time(:microsecond)
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"latency_test_#{i}",
:health_check,
%{},
pool_name: pool_name
)
System.monotonic_time(:microsecond) - start
end)
end
Task.await_many(tasks, 60_000)
end
end
Chaos Test Example
defmodule DSPex.PythonBridge.PoolChaosTest do
use ExUnit.Case
import DSPex.Test.{TestIsolation, PoolTestHelpers}
@tag :chaos
@tag timeout: 120_000
describe "chaos engineering" do
isolated_test "survives cascading failures", layer: :layer_3 do
pool_info = start_isolated_pool(
pool_size: 5,
max_overflow: 2
)
# Start background load
load_task = Task.async(fn ->
generate_continuous_load(pool_info.pool_name, 60_000)
end)
# Inject failures
Process.sleep(5_000) # Let load stabilize
simulate_pool_failure(pool_info.pool_name, :worker_cascade_failure,
worker_count: 3,
delay: 1_000
)
# Continue load during recovery
Process.sleep(10_000)
# Stop load and check results
results = Task.await(load_task, 70_000)
# Should have recovered
assert results.success_rate > 0.95 # 95% success despite failures
assert results.recovered_count > 0 # Some operations recovered
end
isolated_test "handles resource exhaustion", layer: :layer_3 do
pool_info = start_isolated_pool(
pool_size: 3,
max_overflow: 1
)
# Exhaust pool
blocking_tasks = for i <- 1..4 do
Task.async(fn ->
DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"blocking_#{i}",
:long_running_operation,
%{duration: 10_000},
pool_name: pool_info.pool_name
)
end)
end
# Try additional operations - should queue or fail gracefully
Process.sleep(100)
result = DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"additional",
:health_check,
%{},
pool_name: pool_info.pool_name,
timeout: 1_000
)
assert {:error, _} = result # Should timeout or queue limit
# Wait for blocking tasks
Task.await_many(blocking_tasks, 15_000)
end
end
defp generate_continuous_load(pool_name, duration) do
deadline = System.monotonic_time(:millisecond) + duration
do_generate_load(pool_name, deadline, %{
total: 0,
success: 0,
errors: 0,
recovered_count: 0
})
end
defp do_generate_load(pool_name, deadline, stats) do
if System.monotonic_time(:millisecond) < deadline do
result = DSPex.PythonBridge.SessionPoolV2.execute_in_session(
"load_session_#{:rand.uniform(100)}",
:test_operation,
%{data: "test"},
pool_name: pool_name,
max_retries: 3
)
updated_stats = case result do
{:ok, _} ->
%{stats | total: stats.total + 1, success: stats.success + 1}
{:error, %{context: %{attempt: attempts}}} when attempts > 1 ->
%{stats |
total: stats.total + 1,
success: stats.success + 1,
recovered_count: stats.recovered_count + 1
}
{:error, _} ->
%{stats | total: stats.total + 1, errors: stats.errors + 1}
end
# Small delay between operations
Process.sleep(10)
do_generate_load(pool_name, deadline, updated_stats)
else
Map.put(stats, :success_rate, stats.success / max(stats.total, 1))
end
end
end
Test Configuration
File: config/test.exs
(updates)
config :dspex,
# Test-specific configuration
test_mode: String.to_atom(System.get_env("TEST_MODE", "mock_adapter")),
# Pool test configuration
test_pool_config: %{
layer_1: %{
pool_size: 1,
max_overflow: 0,
timeout: 1_000
},
layer_2: %{
pool_size: 2,
max_overflow: 1,
timeout: 5_000
},
layer_3: %{
pool_size: 3,
max_overflow: 2,
timeout: 10_000,
init_timeout: 15_000
}
},
# Test isolation
test_isolation: true,
cleanup_on_exit: true,
# Performance test thresholds
performance_thresholds: %{
avg_latency_ms: 100,
p99_latency_ms: 500,
min_throughput_ops: 1000,
max_error_rate: 0.01
}
# Logger configuration for tests
config :logger, :console,
level: :warn,
metadata: [:test_id, :pool_name, :worker_id]
CI/CD Integration
File: .github/workflows/test.yml
(example)
name: Test Suite
on: [push, pull_request]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
elixir-version: '1.14'
otp-version: '25'
- name: Install dependencies
run: mix deps.get
- name: Run unit tests
run: TEST_MODE=mock_adapter mix test --only unit
integration-tests:
runs-on: ubuntu-latest
strategy:
matrix:
layer: [layer_1, layer_2, layer_3]
steps:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
- name: Install Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install Python dependencies
run: pip install -r requirements.txt
- name: Run integration tests
run: TEST_MODE=${{ matrix.layer }} mix test --only integration
performance-tests:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
- name: Run performance tests
run: TEST_MODE=full_integration mix test --only performance
- name: Upload results
uses: actions/upload-artifact@v2
with:
name: performance-results
path: test/performance_results/
Test Execution Strategy
Local Development
# Run all tests for a layer
TEST_MODE=layer_1 mix test
# Run specific test file
TEST_MODE=layer_3 mix test test/pool_v2_test.exs
# Run with coverage
MIX_ENV=test mix coveralls
# Run performance tests
TEST_MODE=full_integration mix test --only performance
CI Environment
- Parallel execution by test layer
- Performance regression detection
- Automatic cleanup verification
- Resource usage monitoring
Next Steps
Proceed to Document 6: “Performance Optimization and Monitoring” for production optimization strategies.