V2 Pool Technical Design Series: Document 2 - Immediate Fixes Implementation Guide
Overview
This document provides detailed implementation steps for Phase 1 immediate fixes. These changes address critical NimblePool contract violations and test failures identified in the analysis. All changes are designed to be backward compatible and can be deployed immediately.
Fix 1: NimblePool Return Value Corrections
Issue
handle_checkout
callbacks return {:error, reason}
which violates NimblePool’s contract.
Implementation
File: lib/dspex/python_bridge/pool_worker_v2.ex
Lines 205-206 (handle_session_checkout)
# BEFORE:
{:error, reason} ->
{:error, reason}
# AFTER:
{:error, reason} ->
Logger.error("[#{worker_state.worker_id}] Session checkout failed: #{inspect(reason)}")
{:remove, {:checkout_failed, reason}, pool_state}
Lines 234-235 (handle_anonymous_checkout)
# BEFORE:
{:error, reason} ->
{:error, reason}
# AFTER:
{:error, reason} ->
Logger.error("[#{worker_state.worker_id}] Anonymous checkout failed: #{inspect(reason)}")
{:remove, {:checkout_failed, reason}, pool_state}
Lines 212-220 (Port.connect error handling)
# BEFORE:
catch
:error, reason ->
Logger.error("[#{worker_state.worker_id}] Failed to connect port to PID #{inspect(pid)} (alive? #{Process.alive?(pid)}): #{inspect(reason)}")
{:remove, {:connect_failed, reason}, pool_state}
end
# AFTER:
catch
:error, reason ->
Logger.error("[#{worker_state.worker_id}] Failed to connect port: #{inspect(reason)}")
{:remove, {:connect_failed, reason}, pool_state}
:exit, reason ->
Logger.error("[#{worker_state.worker_id}] Port connect exited: #{inspect(reason)}")
{:remove, {:connect_exited, reason}, pool_state}
kind, reason ->
Logger.error("[#{worker_state.worker_id}] Unexpected error in connect: #{kind} - #{inspect(reason)}")
{:remove, {:connect_error, {kind, reason}}, pool_state}
end
Fix 2: Port Validation Enhancement
Issue
Missing Port.info()
validation before attempting Port.connect()
.
Implementation
File: lib/dspex/python_bridge/pool_worker_v2.ex
Add new helper function after line 380:
@doc """
Validates that a port is open and ready for connection.
Returns {:ok, port_info} or {:error, reason}
"""
defp validate_port(port) do
cond do
not is_port(port) ->
{:error, :not_a_port}
true ->
case Port.info(port) do
nil ->
{:error, :port_closed}
info ->
# Check if port is connected to current process
case Keyword.get(info, :connected) do
pid when pid == self() ->
{:ok, info}
_ ->
{:error, :port_not_owned}
end
end
end
end
@doc """
Safely connects a port to a target process with validation.
"""
defp safe_port_connect(port, target_pid, worker_id) do
with {:ok, _port_info} <- validate_port(port),
true <- Process.alive?(target_pid) do
try do
Port.connect(port, target_pid)
:ok
catch
:error, :badarg ->
# Process died between alive? check and connect
{:error, :process_died}
:error, reason ->
{:error, {:connect_failed, reason}}
end
else
false ->
{:error, :target_process_dead}
{:error, reason} ->
{:error, reason}
end
end
Update handle_session_checkout
(lines 196-208):
# BEFORE:
if port_valid and pid_alive do
Port.connect(worker_state.port, pid)
# ... rest of the code
else
reason =
cond do
not port_valid -> :invalid_port
not pid_alive -> :process_not_alive
true -> :unknown_error
end
{:error, reason}
end
# AFTER:
case safe_port_connect(worker_state.port, pid, worker_state.worker_id) do
:ok ->
Logger.debug("[#{worker_state.worker_id}] Connected to session '#{session_id}' for PID #{inspect(pid)}")
updated_state = %{worker_state |
current_session: session_id,
stats: Map.update!(worker_state.stats, :checkouts, &(&1 + 1))
}
{:ok, :state, updated_state, pool_state}
{:error, reason} ->
Logger.error("[#{worker_state.worker_id}] Failed to connect port for session '#{session_id}': #{inspect(reason)}")
{:remove, {:connect_failed, reason}, pool_state}
end
Fix 3: Test Assertion Corrections
Issue
Test expects programs
to be a list but receives a map with “programs” key.
Implementation
File: test/pool_v2_concurrent_test.exs
Line 155
# BEFORE:
assert is_list(programs)
# AFTER:
assert is_map(result)
assert Map.has_key?(result, "programs")
programs = result["programs"]
assert is_list(programs)
Line 170
# BEFORE:
assert length(programs) == 10
# AFTER:
assert result["total_count"] == 10
assert length(result["programs"]) == 10
Fix 4: Test Configuration Guards
Issue
Tests fail when run without proper TEST_MODE environment variable.
Implementation
File: test/pool_fixed_test.exs
Add module attribute and setup block after line 6:
@moduletag :layer_3
setup do
test_mode = Application.get_env(:dspex, :test_mode, :mock_adapter)
pooling_enabled = Application.get_env(:dspex, :pooling_enabled, false)
unless test_mode == :full_integration and pooling_enabled do
skip("This test requires TEST_MODE=full_integration with pooling enabled")
end
:ok
end
Remove lines 13-15 (Application.put_env calls) as they’re ineffective after app start.
Fix 5: Service Detection Improvement
Issue
Registry lookups fail during test initialization causing “Python bridge not available” errors.
Implementation
File: lib/dspex/adapters/python_port.ex
Replace detect_running_service
function (lines 55-68):
# BEFORE:
defp detect_running_service do
pool_running = match?({:ok, _}, Registry.lookup(Registry.DSPex, SessionPool))
bridge_running = match?({:ok, _}, Registry.lookup(Registry.DSPex, Bridge))
case {pool_running, bridge_running} do
{true, _} -> {:pool, SessionPool}
{false, true} -> {:bridge, Bridge}
_ -> {:error, "Python bridge not available"}
end
end
# AFTER:
defp detect_running_service do
# Use Process.whereis for more reliable detection
pool_pid = Process.whereis(DSPex.PythonBridge.SessionPool)
bridge_pid = Process.whereis(DSPex.PythonBridge.Bridge)
cond do
is_pid(pool_pid) and Process.alive?(pool_pid) ->
{:pool, DSPex.PythonBridge.SessionPool}
is_pid(bridge_pid) and Process.alive?(bridge_pid) ->
{:bridge, DSPex.PythonBridge.Bridge}
true ->
# Try Registry as fallback
case {Registry.lookup(Registry.DSPex, SessionPool),
Registry.lookup(Registry.DSPex, Bridge)} do
{[{_, _}], _} -> {:pool, SessionPool}
{_, [{_, _}]} -> {:bridge, Bridge}
_ -> {:error, "Python bridge not available"}
end
end
end
Fix 6: Worker State Update Safety
Issue
Worker state updates don’t handle edge cases properly.
Implementation
File: lib/dspex/python_bridge/pool_worker_v2.ex
Add after handle_checkin
(around line 130):
# Helper to safely update worker stats
defp update_worker_stats(worker_state, checkin_type) do
stats_update = case checkin_type do
:ok ->
%{successful_checkins: &(&1 + 1), last_activity: System.monotonic_time(:millisecond)}
{:error, _} ->
%{error_checkins: &(&1 + 1), last_activity: System.monotonic_time(:millisecond)}
:close ->
%{last_activity: System.monotonic_time(:millisecond)}
_ ->
%{last_activity: System.monotonic_time(:millisecond)}
end
updated_stats = Enum.reduce(stats_update, worker_state.stats, fn {key, updater}, stats ->
case updater do
fun when is_function(fun, 1) ->
Map.update!(stats, key, fun)
value ->
Map.put(stats, key, value)
end
end)
%{worker_state | stats: updated_stats}
end
Update handle_checkin
to use the helper:
# Line 117-118
# BEFORE:
stats = Map.update!(worker_state.stats, :successful_checkins, &(&1 + 1))
{:ok, %{worker_state | stats: stats, current_session: nil}, pool_state}
# AFTER:
updated_state = worker_state
|> update_worker_stats(:ok)
|> Map.put(:current_session, nil)
{:ok, updated_state, pool_state}
Fix 7: Initialize Pool Eagerly for Tests
Issue
Lazy pool initialization causes timeouts in tests.
Implementation
File: test/support/pool_test_helpers.ex
(create new file)
defmodule DSPex.Test.PoolTestHelpers do
@moduledoc """
Helper functions for pool-related tests.
"""
@doc """
Starts an isolated pool for testing with eager initialization.
"""
def start_test_pool(opts \\ []) do
pool_name = :"test_pool_#{System.unique_integer([:positive])}"
genserver_name = :"genserver_#{pool_name}"
config = Keyword.merge([
name: genserver_name,
pool_name: pool_name,
lazy: false, # Eager initialization for tests
pool_size: 2,
max_overflow: 0,
init_timeout: 10_000 # Longer timeout for test stability
], opts)
case start_supervised({DSPex.PythonBridge.SessionPoolV2, config}) do
{:ok, pid} ->
# Wait for pool to be ready
wait_for_pool_ready(pool_name, 5_000)
{:ok, %{pid: pid, pool_name: pool_name, genserver_name: genserver_name}}
error ->
error
end
end
@doc """
Waits for a pool to have at least one available worker.
"""
def wait_for_pool_ready(pool_name, timeout) do
deadline = System.monotonic_time(:millisecond) + timeout
Stream.repeatedly(fn ->
case get_pool_status(pool_name) do
{:ok, %{available_workers: n}} when n > 0 ->
:ready
_ ->
Process.sleep(100)
:waiting
end
end)
|> Stream.take_while(fn status ->
status == :waiting and System.monotonic_time(:millisecond) < deadline
end)
|> Enum.to_list()
:ok
end
defp get_pool_status(pool_name) do
# Implementation depends on pool monitoring API
{:ok, %{available_workers: 2}} # Placeholder
end
end
Testing Guide
Running Fixed Tests
Individual test files:
TEST_MODE=full_integration mix test test/pool_v2_concurrent_test.exs TEST_MODE=mock_adapter mix test test/pool_v2_test.exs --only layer_1
All pool tests:
TEST_MODE=full_integration mix test test/pool_v2*.exs
Verify fixes:
# Run the specific failing tests mix test test/pool_v2_concurrent_test.exs:93 mix test test/pool_fixed_test.exs:7
Expected Results
After implementing these fixes:
NimblePool errors - RESOLVED
- No more
RuntimeError: unexpected return
errors - Proper worker removal on failures
- No more
Test assertions - RESOLVED
- Tests correctly handle map responses
- No more type mismatches
Port connection - IMPROVED
- Reduced
:badarg
errors - Better error messages for debugging
- Reduced
Test configuration - RESOLVED
- Tests skip gracefully when environment mismatched
- Clear error messages about requirements
Rollback Plan
If any fix causes regression:
- Individual fixes are independent - Can be reverted separately
- Git tags for each fix - Tag before applying each fix
- Feature flags - Major changes can be feature-flagged:
if Application.get_env(:dspex, :use_safe_port_connect, true) do safe_port_connect(port, pid, worker_id) else Port.connect(port, pid) # Old behavior end
Next Steps
After implementing and testing these immediate fixes:
- Monitor test stability for 24 hours
- Collect metrics on worker failures
- Proceed to Document 3: “Worker Lifecycle Management Design”
Appendix: Quick Reference
Files Modified
lib/dspex/python_bridge/pool_worker_v2.ex
- 7 changeslib/dspex/adapters/python_port.ex
- 1 changetest/pool_v2_concurrent_test.exs
- 2 changestest/pool_fixed_test.exs
- 1 changetest/support/pool_test_helpers.ex
- New file
Key Functions Added
validate_port/1
- Port validationsafe_port_connect/3
- Safe port connectionupdate_worker_stats/2
- Stats helperstart_test_pool/1
- Test helper
Error Types Introduced
{:checkout_failed, reason}
{:connect_failed, reason}
{:connect_exited, reason}
{:connect_error, {kind, reason}}