Stage 1 Prompt 10: Integration Testing and Validation
OBJECTIVE
Implement comprehensive integration testing and validation for the complete Stage 1 DSPy-Ash integration system. This includes end-to-end testing workflows, cross-component validation, performance benchmarking, system monitoring, deployment readiness verification, and final Stage 1 completion criteria validation that ensures all components work together seamlessly.
COMPLETE IMPLEMENTATION CONTEXT
INTEGRATION TESTING ARCHITECTURE OVERVIEW
From Stage 1 implementation requirements and Elixir integration testing patterns:
┌─────────────────────────────────────────────────────────────┐
│ Integration Testing Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐│
│ │ End-to-End │ │ Cross-Component │ │ Performance ││
│ │ Testing │ │ Integration │ │ Benchmarking ││
│ │ - Full workflow │ │ - API contracts │ │ - Throughput ││
│ │ - Real scenarios│ │ - Data flow │ │ - Latency ││
│ │ - User journeys │ │ - Error prop │ │ - Memory ││
│ └─────────────────┘ └─────────────────┘ └──────────────┘│
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐│
│ │ System │ │ Deployment │ │ Stage 1 ││
│ │ Monitoring │ │ Validation │ │ Completion ││
│ │ - Health checks │ │ - Config valid │ │ - Criteria ││
│ │ - Metrics │ │ - Dependencies │ │ - Handoff ││
│ │ - Alerting │ │ - Environment │ │ - Documentation││
│ └─────────────────┘ └─────────────────┘ └──────────────┘│
└─────────────────────────────────────────────────────────────┘
STAGE 1 COMPONENTS TO INTEGRATE
From all previous Stage 1 prompts:
Core Components Built:
- Signature System - Native syntax compilation, type validation, JSON schema generation
- Python Bridge - Port communication, wire protocol, health monitoring
- Adapter Pattern - Pluggable backends, type conversion, error handling
- Ash Resources - Domain modeling, resource definitions, manual actions
- Type System - Comprehensive validation, serialization, ExDantic integration
- Manual Actions - Complex ML workflows, program execution lifecycle
- Wire Protocol - Binary framing, compression, multi-provider schemas
- Testing Infrastructure - Unit tests, property-based testing, mocks
- Configuration - Environment-specific configs, validation, secrets
COMPREHENSIVE END-TO-END TESTING FRAMEWORK
Complete Integration Test Suite:
defmodule DSPex.Integration.EndToEndTest do
@moduledoc """
Comprehensive end-to-end integration tests for the complete DSPy-Ash system.
Tests real workflows from signature definition through program execution.
"""
use DSPex.TestSupport
alias DSPex.ML.{Domain, Signature, Program, Execution}
alias DSPex.Adapters.{Registry, PythonPort}
alias DSPex.PythonBridge.{Bridge, Health}
alias DSPex.Types.Validator
@moduletag :integration
@moduletag timeout: 60_000 # 1 minute timeout for integration tests
describe "complete signature-to-execution workflow" do
setup do
# Start all necessary services
{:ok, bridge_pid} = start_supervised(Bridge)
{:ok, health_pid} = start_supervised(Health)
# Wait for Python bridge to be ready
wait_for_bridge_ready(bridge_pid, 30_000)
# Ensure clean state
cleanup_test_resources()
%{
bridge: bridge_pid,
health: health_pid
}
end
test "simple question answering workflow", %{bridge: bridge} do
# Step 1: Define signature using native syntax
signature_def = """
signature question_answering:
:string question ->
:string answer
"""
# Step 2: Compile and validate signature
assert {:ok, signature} = create_signature(signature_def, %{
name: "question_answering",
description: "Basic question answering"
})
# Step 3: Create program using the signature
assert {:ok, program} = create_program(signature, %{
name: "simple_qa",
adapter: :python_port,
config: %{
model: "gpt-3.5-turbo",
temperature: 0.7
}
})
# Step 4: Execute program with real input
assert {:ok, execution} = execute_program(program, %{
question: "What is the capital of France?"
})
# Step 5: Validate execution results
assert execution.status == :completed
assert is_binary(execution.result.answer)
assert String.length(execution.result.answer) > 0
assert execution.metrics.execution_time < 30_000 # 30 seconds
# Step 6: Verify complete audit trail
assert_audit_trail_complete(execution)
end
test "complex multi-step reasoning workflow", %{bridge: bridge} do
# Multi-step signature with reasoning chain
signature_def = """
signature complex_reasoning:
:string problem,
:list(:string) context ->
:reasoning_chain thoughts,
:string conclusion,
:confidence_score confidence
"""
assert {:ok, signature} = create_signature(signature_def, %{
name: "complex_reasoning",
description: "Multi-step reasoning with chain of thought"
})
assert {:ok, program} = create_program(signature, %{
name: "reasoning_program",
adapter: :python_port,
config: %{
model: "gpt-4",
temperature: 0.3,
max_tokens: 2000
}
})
# Execute with complex input
assert {:ok, execution} = execute_program(program, %{
problem: "How would you solve world hunger?",
context: [
"Agricultural technology advances",
"Food distribution challenges",
"Economic inequality factors",
"Climate change impacts"
]
})
# Validate complex output structure
assert execution.status == :completed
assert is_list(execution.result.thoughts)
assert length(execution.result.thoughts) > 1
assert is_binary(execution.result.conclusion)
assert execution.result.confidence >= 0.0
assert execution.result.confidence <= 1.0
# Verify reasoning chain structure
Enum.each(execution.result.thoughts, fn thought ->
assert is_map(thought)
assert Map.has_key?(thought, "step")
assert Map.has_key?(thought, "reasoning")
end)
end
test "error handling and recovery workflow", %{bridge: bridge} do
# Test various error scenarios and recovery
# Scenario 1: Invalid signature syntax
invalid_signature = """
signature broken: invalid_syntax ->
"""
assert {:error, %{type: :syntax_error}} = create_signature(invalid_signature, %{
name: "broken"
})
# Scenario 2: Type validation errors
valid_signature_def = """
signature typed_test: :integer number -> :string result
"""
assert {:ok, signature} = create_signature(valid_signature_def, %{
name: "typed_test"
})
assert {:ok, program} = create_program(signature, %{
name: "type_test",
adapter: :python_port
})
# Execute with wrong input type
assert {:error, %{type: :validation_error}} = execute_program(program, %{
number: "not_a_number" # String instead of integer
})
# Scenario 3: Python bridge failure recovery
# Simulate bridge failure
simulate_bridge_failure(bridge)
# Verify automatic recovery
assert wait_for_bridge_recovery(bridge, 10_000)
# Verify system still works after recovery
assert {:ok, execution} = execute_program(program, %{
number: 42
})
assert execution.status == :completed
end
test "performance and scalability validation", %{bridge: bridge} do
signature_def = """
signature performance_test: :string input -> :string output
"""
assert {:ok, signature} = create_signature(signature_def, %{
name: "performance_test"
})
assert {:ok, program} = create_program(signature, %{
name: "perf_program",
adapter: :python_port,
config: %{model: "gpt-3.5-turbo"}
})
# Test concurrent executions
tasks = Enum.map(1..10, fn i ->
Task.async(fn ->
execute_program(program, %{
input: "Test input #{i}"
})
end)
end)
results = Task.await_many(tasks, 60_000)
# Verify all executions succeeded
successful_results = Enum.filter(results, fn
{:ok, _} -> true
_ -> false
end)
assert length(successful_results) == 10
# Verify performance metrics
execution_times = Enum.map(successful_results, fn {:ok, execution} ->
execution.metrics.execution_time
end)
avg_time = Enum.sum(execution_times) / length(execution_times)
max_time = Enum.max(execution_times)
# Performance assertions
assert avg_time < 15_000 # Average under 15 seconds
assert max_time < 30_000 # Max under 30 seconds
# Verify resource cleanup
assert_resources_cleaned_up()
end
end
describe "cross-component integration validation" do
test "signature system integration with type validation" do
# Complex signature with all type system features
signature_def = """
signature comprehensive_types:
:string question,
:list(:string) context,
:map options,
:optional(:integer) max_length ->
:string answer,
:confidence_score confidence,
:list(:reasoning_chain) reasoning,
:map metadata
"""
assert {:ok, signature} = create_signature(signature_def, %{
name: "comprehensive_types",
constraints: %{
max_length: %{min: 1, max: 1000},
confidence: %{min: 0.0, max: 1.0}
}
})
# Verify signature compilation created proper type validators
assert signature.compiled_validators != nil
assert length(signature.input_fields) == 4
assert length(signature.output_fields) == 4
# Test type validation integration
valid_input = %{
question: "Test question",
context: ["context1", "context2"],
options: %{temperature: 0.7},
max_length: 500
}
assert {:ok, validated_input} = Validator.validate_input(signature, valid_input)
assert validated_input.max_length == 500
# Test constraint validation
invalid_input = %{
question: "Test question",
context: ["context1"],
options: %{temperature: 0.7},
max_length: 2000 # Violates max constraint
}
assert {:error, %{type: :constraint_violation}} = Validator.validate_input(signature, invalid_input)
end
test "adapter pattern integration with python bridge" do
# Test adapter switching and fallback
signature_def = """
signature adapter_test: :string input -> :string output
"""
assert {:ok, signature} = create_signature(signature_def, %{
name: "adapter_test"
})
# Test PythonPort adapter
assert {:ok, program_python} = create_program(signature, %{
name: "python_adapter_test",
adapter: :python_port
})
assert {:ok, execution} = execute_program(program_python, %{
input: "test input"
})
assert execution.adapter_used == :python_port
assert execution.status == :completed
# Test adapter registry
assert Registry.list_adapters() |> Enum.member?(:python_port)
assert Registry.get_adapter(:python_port) == DSPex.Adapters.PythonPort
# Test adapter capabilities
capabilities = Registry.get_capabilities(:python_port)
assert capabilities.supports_streaming == true
assert capabilities.supports_functions == true
end
test "manual actions integration with ash resources" do
# Test complex manual action workflows
signature_def = """
signature manual_action_test:
:string operation,
:map parameters ->
:string result,
:map execution_metrics
"""
assert {:ok, signature} = create_signature(signature_def, %{
name: "manual_action_test"
})
# Create program that uses manual actions
assert {:ok, program} = Program.create(%{
name: "manual_workflow",
signature_id: signature.id,
adapter: :python_port,
manual_actions: [:complex_validation, :resource_management]
})
# Execute through manual action workflow
assert {:ok, execution} = Execution.execute_with_manual_actions(program, %{
operation: "complex_operation",
parameters: %{
complexity: "high",
validation_level: "strict"
}
})
# Verify manual action results
assert execution.manual_action_results != nil
assert Map.has_key?(execution.manual_action_results, :complex_validation)
assert Map.has_key?(execution.manual_action_results, :resource_management)
assert execution.status == :completed
end
end
describe "system monitoring and health validation" do
test "comprehensive health check system" do
# System health validation
health_status = Health.comprehensive_check()
assert health_status.overall_status == :healthy
assert health_status.components.python_bridge == :healthy
assert health_status.components.signature_system == :healthy
assert health_status.components.type_system == :healthy
assert health_status.components.adapter_registry == :healthy
# Performance metrics validation
metrics = Health.get_performance_metrics()
assert metrics.signature_compilation_avg_time < 100 # ms
assert metrics.type_validation_avg_time < 10 # ms
assert metrics.adapter_call_avg_time < 5000 # ms
assert metrics.memory_usage_mb < 500 # MB
# Resource utilization validation
resources = Health.get_resource_utilization()
assert resources.process_count < 100
assert resources.memory_efficiency > 0.8
assert resources.cpu_utilization < 0.5
end
test "monitoring and alerting integration" do
# Configure test monitoring
monitor_config = %{
thresholds: %{
execution_time: 30_000,
error_rate: 0.05,
memory_usage: 1000
},
alerts: [:email, :log]
}
Health.configure_monitoring(monitor_config)
# Simulate high load to trigger monitoring
Enum.each(1..100, fn _ ->
signature_def = """
signature load_test: :string input -> :string output
"""
{:ok, signature} = create_signature(signature_def, %{
name: "load_test_#{:rand.uniform(1000)}"
})
end)
# Check monitoring detected the load
monitoring_report = Health.get_monitoring_report()
assert monitoring_report.events_detected > 0
assert monitoring_report.signature_compilation_events > 90
end
end
describe "deployment readiness validation" do
test "configuration validation across environments" do
# Test development configuration
dev_config = Application.get_env(:dspex, :development)
assert validate_environment_config(:development, dev_config)
# Test production configuration requirements
prod_requirements = [
:secret_key_base,
:database_url,
:python_bridge_config,
:adapter_timeouts,
:monitoring_config
]
Enum.each(prod_requirements, fn requirement ->
assert configuration_has_requirement?(requirement)
end)
end
test "dependency verification" do
# Verify all required dependencies are available
required_deps = [
{:python, "3.8+"},
{:dspy, "latest"},
{:postgresql, "12+"},
{:elixir, "1.14+"},
{:ash, "2.0+"},
{:jason, "latest"}
]
Enum.each(required_deps, fn {dep, version} ->
assert dependency_available?(dep, version)
end)
end
test "security validation" do
# Security configuration checks
security_checks = [
:secret_management,
:input_validation,
:sql_injection_protection,
:xss_protection,
:csrf_protection,
:rate_limiting
]
Enum.each(security_checks, fn check ->
assert security_check_passes?(check)
end)
end
end
# Helper Functions
defp create_signature(definition, attrs) do
Signature.create(%{
definition: definition,
name: attrs.name,
description: attrs[:description],
constraints: attrs[:constraints] || %{}
})
end
defp create_program(signature, attrs) do
Program.create(%{
name: attrs.name,
signature_id: signature.id,
adapter: attrs.adapter,
config: attrs[:config] || %{},
manual_actions: attrs[:manual_actions] || []
})
end
defp execute_program(program, inputs) do
Execution.create(%{
program_id: program.id,
inputs: inputs,
execution_options: %{
timeout: 30_000,
validate_inputs: true,
track_metrics: true
}
})
end
defp wait_for_bridge_ready(bridge_pid, timeout) do
end_time = System.monotonic_time(:millisecond) + timeout
Stream.repeatedly(fn ->
case Bridge.health_check(bridge_pid) do
{:ok, :healthy} -> :ready
_ -> :not_ready
end
end)
|> Stream.take_while(fn status ->
status == :not_ready and System.monotonic_time(:millisecond) < end_time
end)
|> Enum.take(-1)
case Bridge.health_check(bridge_pid) do
{:ok, :healthy} -> :ok
_ -> {:error, :timeout}
end
end
defp simulate_bridge_failure(bridge_pid) do
# Simulate failure by stopping the bridge process
Process.exit(bridge_pid, :kill)
end
defp wait_for_bridge_recovery(bridge_pid, timeout) do
# Wait for supervisor to restart the bridge
Process.sleep(1000)
wait_for_bridge_ready(bridge_pid, timeout)
end
defp assert_audit_trail_complete(execution) do
assert execution.audit_trail != nil
assert Map.has_key?(execution.audit_trail, :signature_compilation)
assert Map.has_key?(execution.audit_trail, :input_validation)
assert Map.has_key?(execution.audit_trail, :adapter_execution)
assert Map.has_key?(execution.audit_trail, :output_validation)
assert Map.has_key?(execution.audit_trail, :result_serialization)
end
defp assert_resources_cleaned_up do
# Verify no leaked resources
process_count_before = length(Process.list())
# Trigger garbage collection
:erlang.garbage_collect()
Process.sleep(100)
process_count_after = length(Process.list())
# Should not have significant process leaks
assert abs(process_count_after - process_count_before) < 5
end
defp cleanup_test_resources do
# Clean up any test data
Execution.destroy_all(%{test_execution: true})
Program.destroy_all(%{test_program: true})
Signature.destroy_all(%{test_signature: true})
end
defp validate_environment_config(env, config) do
case env do
:development ->
required_keys = [:debug_mode, :hot_reload, :test_adapters]
Enum.all?(required_keys, &Map.has_key?(config, &1))
:production ->
required_keys = [:secret_key_base, :database_url, :monitoring]
Enum.all?(required_keys, &Map.has_key?(config, &1))
:test ->
required_keys = [:test_database, :mock_adapters]
Enum.all?(required_keys, &Map.has_key?(config, &1))
end
end
defp configuration_has_requirement?(requirement) do
config = Application.get_all_env(:dspex)
case requirement do
:secret_key_base ->
config[:secret_key_base] != nil
:database_url ->
config[:database_url] != nil
:python_bridge_config ->
config[:python_bridge] != nil
:adapter_timeouts ->
config[:adapter_timeout] != nil
:monitoring_config ->
config[:monitoring] != nil
end
end
defp dependency_available?(dep, version) do
case dep do
:python ->
case System.cmd("python3", ["--version"]) do
{output, 0} -> version_matches?(output, version)
_ -> false
end
:postgresql ->
case System.cmd("psql", ["--version"]) do
{output, 0} -> version_matches?(output, version)
_ -> false
end
:elixir ->
version_matches?(System.version(), version)
_ ->
# Check if dependency is in mix.exs
deps = Mix.Project.config()[:deps] || []
Enum.any?(deps, fn
{^dep, _} -> true
{^dep, _, _} -> true
_ -> false
end)
end
end
defp version_matches?(actual, required) do
# Simple version matching - in production use a proper version library
String.contains?(actual, required) or required == "latest"
end
defp security_check_passes?(check) do
case check do
:secret_management ->
# Verify secrets are not hardcoded
config = Application.get_all_env(:dspex)
not Enum.any?(config, fn {_key, value} ->
is_binary(value) and String.contains?(value, "password")
end)
:input_validation ->
# Verify input validation is enabled
Application.get_env(:dspex, :validate_inputs, false)
:sql_injection_protection ->
# Verify parameterized queries are used
true # Ash provides this by default
:xss_protection ->
# Verify XSS protection is enabled
true # Not applicable for API-only application
:csrf_protection ->
# Verify CSRF protection is configured
true # Not applicable for API-only application
:rate_limiting ->
# Verify rate limiting is configured
Application.get_env(:dspex, :rate_limiting) != nil
end
end
end
PERFORMANCE BENCHMARKING FRAMEWORK
Comprehensive Performance Testing:
defmodule DSPex.Integration.PerformanceBenchmarks do
@moduledoc """
Performance benchmarking and load testing for the DSPy-Ash integration.
Measures throughput, latency, memory usage, and scalability limits.
"""
use DSPex.TestSupport
alias DSPex.ML.{Signature, Program, Execution}
alias DSPex.Performance.{Metrics, Monitor}
@benchmark_duration 60_000 # 1 minute
@warmup_duration 10_000 # 10 seconds
describe "signature compilation performance" do
@tag :performance
test "signature compilation throughput" do
# Warm up
warmup_signature_compilation()
# Benchmark signature compilation
start_time = System.monotonic_time(:microsecond)
results = Enum.map(1..1000, fn i ->
signature_def = """
signature perf_test_#{i}: :string input -> :string output
"""
{time, result} = :timer.tc(fn ->
create_signature(signature_def, %{name: "perf_test_#{i}"})
end)
{time, result}
end)
end_time = System.monotonic_time(:microsecond)
total_time = end_time - start_time
# Analyze results
successful_compilations = Enum.count(results, fn {_time, result} ->
match?({:ok, _}, result)
end)
compilation_times = Enum.map(results, fn {time, _result} -> time end)
avg_time = Enum.sum(compilation_times) / length(compilation_times)
p95_time = percentile(compilation_times, 95)
p99_time = percentile(compilation_times, 99)
# Performance assertions
assert successful_compilations == 1000
assert avg_time < 1000 # Average under 1ms
assert p95_time < 5000 # 95th percentile under 5ms
assert p99_time < 10000 # 99th percentile under 10ms
throughput = successful_compilations / (total_time / 1_000_000)
assert throughput > 100 # At least 100 compilations per second
Logger.info("Signature compilation performance: #{throughput} compilations/sec, avg: #{avg_time}μs")
end
end
describe "execution performance" do
@tag :performance
test "program execution throughput" do
# Setup test program
signature_def = """
signature throughput_test: :string input -> :string output
"""
{:ok, signature} = create_signature(signature_def, %{name: "throughput_test"})
{:ok, program} = create_program(signature, %{name: "throughput_program", adapter: :python_port})
# Warm up
warmup_program_execution(program)
# Benchmark concurrent executions
concurrency_levels = [1, 5, 10, 20, 50]
results = Enum.map(concurrency_levels, fn concurrency ->
{concurrency, benchmark_execution_concurrency(program, concurrency)}
end)
# Analyze results
Enum.each(results, fn {concurrency, {throughput, avg_latency, error_rate}} ->
Logger.info("Concurrency #{concurrency}: #{throughput} req/sec, #{avg_latency}ms avg, #{error_rate}% errors")
# Performance requirements
assert throughput > concurrency * 0.5 # At least 0.5 req/sec per concurrent request
assert avg_latency < 5000 # Average latency under 5 seconds
assert error_rate < 0.01 # Error rate under 1%
end)
end
test "memory usage and garbage collection" do
signature_def = """
signature memory_test: :string input -> :string output
"""
{:ok, signature} = create_signature(signature_def, %{name: "memory_test"})
{:ok, program} = create_program(signature, %{name: "memory_program", adapter: :python_port})
# Baseline memory usage
:erlang.garbage_collect()
baseline_memory = :erlang.memory(:total)
# Execute many programs to test memory management
Enum.each(1..1000, fn i ->
execute_program(program, %{input: "test input #{i}"})
# Periodic GC to prevent excessive buildup
if rem(i, 100) == 0 do
:erlang.garbage_collect()
end
end)
# Final memory check
:erlang.garbage_collect()
final_memory = :erlang.memory(:total)
memory_growth = final_memory - baseline_memory
# Memory growth should be reasonable
growth_percentage = memory_growth / baseline_memory
assert growth_percentage < 0.5 # Memory growth under 50%
Logger.info("Memory growth: #{memory_growth} bytes (#{Float.round(growth_percentage * 100, 2)}%)")
end
end
describe "system scalability limits" do
@tag :performance
@tag :slow
test "maximum concurrent executions" do
signature_def = """
signature scalability_test: :string input -> :string output
"""
{:ok, signature} = create_signature(signature_def, %{name: "scalability_test"})
{:ok, program} = create_program(signature, %{name: "scalability_program", adapter: :python_port})
# Test increasing load until system limits
max_concurrency = find_maximum_concurrency(program)
assert max_concurrency >= 50 # Should handle at least 50 concurrent requests
Logger.info("Maximum sustainable concurrency: #{max_concurrency}")
end
test "sustained load handling" do
signature_def = """
signature sustained_test: :string input -> :string output
"""
{:ok, signature} = create_signature(signature_def, %{name: "sustained_test"})
{:ok, program} = create_program(signature, %{name: "sustained_program", adapter: :python_port})
# Run sustained load for extended period
duration = 300_000 # 5 minutes
target_rps = 10 # 10 requests per second
results = run_sustained_load(program, duration, target_rps)
# Analyze sustained performance
assert results.total_requests >= target_rps * (duration / 1000) * 0.95 # 95% of target
assert results.error_rate < 0.02 # Error rate under 2%
assert results.avg_response_time < 3000 # Average under 3 seconds
# System should remain stable
assert results.memory_stable == true
assert results.performance_degradation < 0.1 # Less than 10% degradation
Logger.info("Sustained load test: #{results.total_requests} requests, #{results.error_rate}% errors")
end
end
# Helper functions for benchmarking
defp warmup_signature_compilation do
Enum.each(1..100, fn i ->
signature_def = """
signature warmup_#{i}: :string input -> :string output
"""
create_signature(signature_def, %{name: "warmup_#{i}"})
end)
end
defp warmup_program_execution(program) do
Enum.each(1..10, fn i ->
execute_program(program, %{input: "warmup #{i}"})
end)
end
defp benchmark_execution_concurrency(program, concurrency) do
requests_per_worker = 50
total_requests = concurrency * requests_per_worker
start_time = System.monotonic_time(:millisecond)
# Start concurrent workers
tasks = Enum.map(1..concurrency, fn worker_id ->
Task.async(fn ->
Enum.map(1..requests_per_worker, fn request_id ->
{time, result} = :timer.tc(fn ->
execute_program(program, %{input: "worker #{worker_id} request #{request_id}"})
end)
{time, result}
end)
end)
end)
# Collect results
all_results = Task.await_many(tasks, 120_000) # 2 minute timeout
flat_results = List.flatten(all_results)
end_time = System.monotonic_time(:millisecond)
total_duration = end_time - start_time
# Calculate metrics
successful_requests = Enum.count(flat_results, fn {_time, result} ->
match?({:ok, _}, result)
end)
error_rate = (total_requests - successful_requests) / total_requests
throughput = successful_requests / (total_duration / 1000)
response_times = Enum.map(flat_results, fn {time, _result} -> time / 1000 end) # Convert to ms
avg_latency = Enum.sum(response_times) / length(response_times)
{throughput, avg_latency, error_rate}
end
defp find_maximum_concurrency(program) do
test_concurrency_level(program, 10, 200, nil)
end
defp test_concurrency_level(program, min, max, last_successful) when min > max do
last_successful || min
end
defp test_concurrency_level(program, min, max, last_successful) do
mid = div(min + max, 2)
case test_concurrency_success(program, mid) do
true ->
test_concurrency_level(program, mid + 1, max, mid)
false ->
test_concurrency_level(program, min, mid - 1, last_successful)
end
end
defp test_concurrency_success(program, concurrency) do
{_throughput, avg_latency, error_rate} = benchmark_execution_concurrency(program, concurrency)
# Success criteria
avg_latency < 10_000 and error_rate < 0.05
end
defp run_sustained_load(program, duration, target_rps) do
interval = div(1000, target_rps) # ms between requests
end_time = System.monotonic_time(:millisecond) + duration
start_memory = :erlang.memory(:total)
start_time = System.monotonic_time(:millisecond)
initial_response_times = []
{final_stats, response_times} = sustained_load_loop(program, end_time, interval, [], [])
final_memory = :erlang.memory(:total)
memory_stable = abs(final_memory - start_memory) / start_memory < 0.2
# Calculate performance metrics
total_requests = length(response_times)
successful_requests = length(Enum.filter(response_times, &(&1 > 0)))
error_rate = (total_requests - successful_requests) / total_requests
avg_response_time = Enum.sum(response_times) / length(response_times)
# Performance degradation check
early_times = Enum.take(response_times, min(100, div(length(response_times), 4)))
late_times = Enum.take(response_times, -min(100, div(length(response_times), 4)))
early_avg = Enum.sum(early_times) / length(early_times)
late_avg = Enum.sum(late_times) / length(late_times)
performance_degradation = if early_avg > 0, do: (late_avg - early_avg) / early_avg, else: 0
%{
total_requests: total_requests,
successful_requests: successful_requests,
error_rate: error_rate,
avg_response_time: avg_response_time,
memory_stable: memory_stable,
performance_degradation: performance_degradation
}
end
defp sustained_load_loop(program, end_time, interval, stats, response_times) do
current_time = System.monotonic_time(:millisecond)
if current_time >= end_time do
{stats, response_times}
else
# Execute request
{time, result} = :timer.tc(fn ->
execute_program(program, %{input: "sustained load #{current_time}"})
end)
response_time = case result do
{:ok, _} -> time / 1000 # Convert to ms
{:error, _} -> -1 # Mark as error
end
# Wait for next interval
Process.sleep(max(0, interval - div(time, 1000)))
sustained_load_loop(program, end_time, interval, stats, [response_time | response_times])
end
end
defp percentile(list, p) do
sorted = Enum.sort(list)
index = Float.ceil(length(sorted) * p / 100) - 1
Enum.at(sorted, max(0, round(index)))
end
end
STAGE 1 COMPLETION VALIDATION
Completion Criteria and Verification:
defmodule DSPex.Integration.Stage1Completion do
@moduledoc """
Stage 1 completion validation and verification system.
Ensures all requirements are met before progressing to Stage 2.
"""
use DSPex.TestSupport
@stage1_requirements [
:signature_system_complete,
:python_bridge_functional,
:adapter_pattern_implemented,
:ash_resources_functional,
:type_system_comprehensive,
:manual_actions_working,
:wire_protocol_stable,
:testing_infrastructure_complete,
:configuration_production_ready,
:integration_tests_passing,
:performance_requirements_met,
:documentation_complete
]
describe "Stage 1 completion verification" do
test "all core requirements are satisfied" do
completion_report = generate_completion_report()
assert completion_report.overall_status == :complete
assert completion_report.requirements_met == length(@stage1_requirements)
assert completion_report.critical_issues == []
Logger.info("Stage 1 Completion Report:")
Logger.info("Requirements Met: #{completion_report.requirements_met}/#{length(@stage1_requirements)}")
Logger.info("Critical Issues: #{length(completion_report.critical_issues)}")
Logger.info("Performance Score: #{completion_report.performance_score}/100")
end
test "signature system completion" do
signature_status = verify_signature_system()
assert signature_status.compilation_working == true
assert signature_status.type_parsing_complete == true
assert signature_status.validation_functional == true
assert signature_status.json_schema_generation == true
assert signature_status.error_handling_robust == true
# Test signature creation and compilation
test_signature = """
signature completion_test:
:string input,
:integer count ->
:string output,
:confidence_score confidence
"""
assert {:ok, signature} = Signature.create(%{
definition: test_signature,
name: "completion_test"
})
assert signature.compilation_status == :success
assert length(signature.input_fields) == 2
assert length(signature.output_fields) == 2
end
test "python bridge functionality" do
bridge_status = verify_python_bridge()
assert bridge_status.bridge_running == true
assert bridge_status.communication_stable == true
assert bridge_status.error_recovery_working == true
assert bridge_status.performance_acceptable == true
# Test bridge communication
{:ok, response} = DSPex.PythonBridge.Bridge.execute_command(:health_check, %{})
assert response.status == "healthy"
end
test "adapter pattern implementation" do
adapter_status = verify_adapter_pattern()
assert adapter_status.registry_functional == true
assert adapter_status.python_port_adapter_working == true
assert adapter_status.type_conversion_complete == true
assert adapter_status.error_handling_comprehensive == true
# Test adapter switching
adapters = DSPex.Adapters.Registry.list_adapters()
assert Enum.member?(adapters, :python_port)
end
test "ash resources functionality" do
resources_status = verify_ash_resources()
assert resources_status.domain_configured == true
assert resources_status.signature_resource_working == true
assert resources_status.program_resource_working == true
assert resources_status.execution_resource_working == true
assert resources_status.manual_actions_functional == true
# Test resource operations
{:ok, signature} = DSPex.ML.Signature.create(%{
name: "resource_test",
definition: "signature test: :string input -> :string output"
})
assert signature.id != nil
assert signature.name == "resource_test"
end
test "system integration readiness" do
integration_status = verify_system_integration()
assert integration_status.end_to_end_workflows == true
assert integration_status.error_propagation_correct == true
assert integration_status.performance_acceptable == true
assert integration_status.resource_management_stable == true
# Test complete workflow
workflow_result = execute_complete_workflow()
assert workflow_result.success == true
assert workflow_result.execution_time < 30_000
end
end
describe "stage 2 readiness assessment" do
test "foundation ready for native implementation" do
readiness = assess_stage2_readiness()
assert readiness.adapter_pattern_extensible == true
assert readiness.type_system_expandable == true
assert readiness.signature_system_modular == true
assert readiness.testing_infrastructure_scalable == true
# Foundation should support multiple adapters
assert readiness.supports_multiple_adapters == true
assert readiness.supports_native_elixir == true
end
test "documentation completeness" do
doc_status = verify_documentation()
assert doc_status.api_documented == true
assert doc_status.examples_comprehensive == true
assert doc_status.integration_guides_complete == true
assert doc_status.troubleshooting_available == true
end
end
# Verification helper functions
defp generate_completion_report do
requirements_status = Enum.map(@stage1_requirements, fn requirement ->
{requirement, verify_requirement(requirement)}
end)
requirements_met = Enum.count(requirements_status, fn {_req, status} -> status end)
critical_issues = find_critical_issues(requirements_status)
performance_score = calculate_performance_score()
overall_status = if requirements_met == length(@stage1_requirements) and length(critical_issues) == 0 do
:complete
else
:incomplete
end
%{
overall_status: overall_status,
requirements_met: requirements_met,
critical_issues: critical_issues,
performance_score: performance_score,
requirements_status: requirements_status
}
end
defp verify_requirement(requirement) do
case requirement do
:signature_system_complete -> verify_signature_system().compilation_working
:python_bridge_functional -> verify_python_bridge().bridge_running
:adapter_pattern_implemented -> verify_adapter_pattern().registry_functional
:ash_resources_functional -> verify_ash_resources().domain_configured
:type_system_comprehensive -> verify_type_system().validation_complete
:manual_actions_working -> verify_manual_actions().basic_actions_functional
:wire_protocol_stable -> verify_wire_protocol().protocol_stable
:testing_infrastructure_complete -> verify_testing_infrastructure().coverage_adequate
:configuration_production_ready -> verify_configuration().production_ready
:integration_tests_passing -> verify_integration_tests().all_passing
:performance_requirements_met -> verify_performance().requirements_met
:documentation_complete -> verify_documentation().api_documented
end
end
defp verify_signature_system do
%{
compilation_working: test_signature_compilation(),
type_parsing_complete: test_type_parsing(),
validation_functional: test_signature_validation(),
json_schema_generation: test_json_schema_generation(),
error_handling_robust: test_signature_error_handling()
}
end
defp verify_python_bridge do
%{
bridge_running: bridge_process_running?(),
communication_stable: test_bridge_communication(),
error_recovery_working: test_bridge_recovery(),
performance_acceptable: test_bridge_performance()
}
end
defp verify_adapter_pattern do
%{
registry_functional: test_adapter_registry(),
python_port_adapter_working: test_python_port_adapter(),
type_conversion_complete: test_adapter_type_conversion(),
error_handling_comprehensive: test_adapter_error_handling()
}
end
defp verify_ash_resources do
%{
domain_configured: test_domain_configuration(),
signature_resource_working: test_signature_resource(),
program_resource_working: test_program_resource(),
execution_resource_working: test_execution_resource(),
manual_actions_functional: test_manual_actions()
}
end
defp verify_system_integration do
%{
end_to_end_workflows: test_end_to_end_workflows(),
error_propagation_correct: test_error_propagation(),
performance_acceptable: test_system_performance(),
resource_management_stable: test_resource_management()
}
end
defp assess_stage2_readiness do
%{
adapter_pattern_extensible: test_adapter_extensibility(),
type_system_expandable: test_type_system_extensibility(),
signature_system_modular: test_signature_modularity(),
testing_infrastructure_scalable: test_testing_scalability(),
supports_multiple_adapters: test_multiple_adapter_support(),
supports_native_elixir: test_native_elixir_readiness()
}
end
defp execute_complete_workflow do
start_time = System.monotonic_time(:millisecond)
try do
# Complete signature-to-execution workflow
signature_def = """
signature workflow_test: :string input -> :string output
"""
{:ok, signature} = DSPex.ML.Signature.create(%{
name: "workflow_test",
definition: signature_def
})
{:ok, program} = DSPex.ML.Program.create(%{
name: "workflow_program",
signature_id: signature.id,
adapter: :python_port
})
{:ok, execution} = DSPex.ML.Execution.create(%{
program_id: program.id,
inputs: %{input: "test workflow"}
})
end_time = System.monotonic_time(:millisecond)
%{
success: execution.status == :completed,
execution_time: end_time - start_time,
result: execution.result
}
rescue
error ->
end_time = System.monotonic_time(:millisecond)
%{
success: false,
execution_time: end_time - start_time,
error: inspect(error)
}
end
end
# Individual test functions (simplified for brevity)
defp test_signature_compilation, do: true # Implementation would test actual compilation
defp test_type_parsing, do: true
defp test_signature_validation, do: true
defp test_json_schema_generation, do: true
defp test_signature_error_handling, do: true
defp bridge_process_running?, do: true
defp test_bridge_communication, do: true
defp test_bridge_recovery, do: true
defp test_bridge_performance, do: true
defp test_adapter_registry, do: true
defp test_python_port_adapter, do: true
defp test_adapter_type_conversion, do: true
defp test_adapter_error_handling, do: true
defp test_domain_configuration, do: true
defp test_signature_resource, do: true
defp test_program_resource, do: true
defp test_execution_resource, do: true
defp test_manual_actions, do: true
defp test_end_to_end_workflows, do: true
defp test_error_propagation, do: true
defp test_system_performance, do: true
defp test_resource_management, do: true
defp test_adapter_extensibility, do: true
defp test_type_system_extensibility, do: true
defp test_signature_modularity, do: true
defp test_testing_scalability, do: true
defp test_multiple_adapter_support, do: true
defp test_native_elixir_readiness, do: true
defp verify_type_system, do: %{validation_complete: true}
defp verify_manual_actions, do: %{basic_actions_functional: true}
defp verify_wire_protocol, do: %{protocol_stable: true}
defp verify_testing_infrastructure, do: %{coverage_adequate: true}
defp verify_configuration, do: %{production_ready: true}
defp verify_integration_tests, do: %{all_passing: true}
defp verify_performance, do: %{requirements_met: true}
defp verify_documentation, do: %{api_documented: true}
defp find_critical_issues(requirements_status) do
Enum.filter(requirements_status, fn {_req, status} -> not status end)
|> Enum.map(fn {req, _status} -> req end)
end
defp calculate_performance_score do
# Calculate based on performance metrics
85 # Example score
end
end
IMPLEMENTATION REQUIREMENTS
SUCCESS CRITERIA
Stage 1 Integration Testing Must Achieve:
- End-to-End Functionality - Complete workflows from signature definition through program execution work flawlessly
- Performance Standards - System meets all performance benchmarks for throughput, latency, and resource usage
- Error Handling Robustness - Comprehensive error scenarios are handled gracefully with proper recovery
- Cross-Component Integration - All Stage 1 components work together seamlessly without integration issues
- Production Readiness - System is ready for production deployment with proper monitoring and configuration
- Stage 2 Foundation - Foundation is solid and extensible for Stage 2 native implementation
TESTING COVERAGE REQUIREMENTS
Comprehensive Test Coverage Must Include:
- Unit Tests: All individual components thoroughly tested
- Integration Tests: Cross-component interactions validated
- Performance Tests: Throughput, latency, memory, and scalability benchmarks
- End-to-End Tests: Complete user workflows validated
- Error Handling Tests: All error scenarios and recovery paths tested
- Security Tests: Input validation, data sanitization, access control verified
- Configuration Tests: All environment configurations validated
- Deployment Tests: System deployment and startup procedures verified
DOCUMENTATION REQUIREMENTS
Complete Documentation Must Cover:
- API Documentation: All public interfaces documented with examples
- Integration Guides: Step-by-step integration instructions
- Configuration Reference: All configuration options explained
- Troubleshooting Guide: Common issues and solutions documented
- Performance Tuning: Optimization recommendations provided
- Security Guidelines: Security best practices documented
PERFORMANCE BENCHMARKS
System Must Meet:
- Signature Compilation: >100 compilations/second, <1ms average
- Program Execution: >10 executions/second/core, <5s average latency
- Concurrent Users: Support 50+ concurrent executions
- Memory Usage: <500MB baseline, <50% growth under load
- Error Rate: <1% under normal load, <5% under stress
- Recovery Time: <10s for automatic error recovery
DEPLOYMENT READINESS
Production Deployment Requirements:
- Configuration Management: Environment-specific configurations validated
- Security Hardening: All security best practices implemented
- Monitoring Integration: Health checks, metrics, and alerting configured
- Dependency Management: All dependencies verified and documented
- Backup and Recovery: Data backup and system recovery procedures tested
- Load Testing: System validated under expected production load
EXPECTED DELIVERABLES
PRIMARY DELIVERABLES
- Complete Integration Test Suite - Comprehensive testing framework covering all Stage 1 components
- Performance Benchmarking System - Automated performance testing and monitoring
- Stage 1 Completion Validation - Verification system ensuring all requirements are met
- Production Deployment Guide - Complete deployment and configuration documentation
- Stage 2 Readiness Assessment - Foundation analysis for next stage implementation
VERIFICATION AND VALIDATION
All Stage 1 Components Verified:
- Signature system functionality and performance
- Python bridge stability and communication
- Adapter pattern flexibility and extensibility
- Ash resources integration and manual actions
- Type system comprehensive validation
- Wire protocol efficiency and reliability
- Configuration management and security
- Testing infrastructure completeness
System Integration Validated:
- End-to-end workflows execute successfully
- Error handling and recovery work properly
- Performance meets all benchmarks
- Resource management is stable
- Security requirements are satisfied
- Documentation is complete and accurate
This comprehensive integration testing and validation framework ensures Stage 1 is complete, robust, and ready for production deployment while establishing a solid foundation for Stage 2 native implementation development.