← Back to Docs

V2 POOL TECHNICAL DESIGN 2 IMMEDIATE FIXES

Documentation for V2_POOL_TECHNICAL_DESIGN_2_IMMEDIATE_FIXES from the Dspex repository.

V2 Pool Technical Design Series: Document 2 - Immediate Fixes Implementation Guide

Overview

This document provides detailed implementation steps for Phase 1 immediate fixes. These changes address critical NimblePool contract violations and test failures identified in the analysis. All changes are designed to be backward compatible and can be deployed immediately.

Fix 1: NimblePool Return Value Corrections

Issue

handle_checkout callbacks return {:error, reason} which violates NimblePool’s contract.

Implementation

File: lib/dspex/python_bridge/pool_worker_v2.ex

Lines 205-206 (handle_session_checkout)

# BEFORE:
{:error, reason} ->
  {:error, reason}

# AFTER:
{:error, reason} ->
  Logger.error("[#{worker_state.worker_id}] Session checkout failed: #{inspect(reason)}")
  {:remove, {:checkout_failed, reason}, pool_state}

Lines 234-235 (handle_anonymous_checkout)

# BEFORE:
{:error, reason} ->
  {:error, reason}

# AFTER:
{:error, reason} ->
  Logger.error("[#{worker_state.worker_id}] Anonymous checkout failed: #{inspect(reason)}")
  {:remove, {:checkout_failed, reason}, pool_state}

Lines 212-220 (Port.connect error handling)

# BEFORE:
catch
  :error, reason ->
    Logger.error("[#{worker_state.worker_id}] Failed to connect port to PID #{inspect(pid)} (alive? #{Process.alive?(pid)}): #{inspect(reason)}")
    {:remove, {:connect_failed, reason}, pool_state}
end

# AFTER:
catch
  :error, reason ->
    Logger.error("[#{worker_state.worker_id}] Failed to connect port: #{inspect(reason)}")
    {:remove, {:connect_failed, reason}, pool_state}
  
  :exit, reason ->
    Logger.error("[#{worker_state.worker_id}] Port connect exited: #{inspect(reason)}")
    {:remove, {:connect_exited, reason}, pool_state}
  
  kind, reason ->
    Logger.error("[#{worker_state.worker_id}] Unexpected error in connect: #{kind} - #{inspect(reason)}")
    {:remove, {:connect_error, {kind, reason}}, pool_state}
end

Fix 2: Port Validation Enhancement

Issue

Missing Port.info() validation before attempting Port.connect().

Implementation

File: lib/dspex/python_bridge/pool_worker_v2.ex

Add new helper function after line 380:

@doc """
Validates that a port is open and ready for connection.
Returns {:ok, port_info} or {:error, reason}
"""
defp validate_port(port) do
  cond do
    not is_port(port) ->
      {:error, :not_a_port}
      
    true ->
      case Port.info(port) do
        nil ->
          {:error, :port_closed}
          
        info ->
          # Check if port is connected to current process
          case Keyword.get(info, :connected) do
            pid when pid == self() ->
              {:ok, info}
            _ ->
              {:error, :port_not_owned}
          end
      end
  end
end

@doc """
Safely connects a port to a target process with validation.
"""
defp safe_port_connect(port, target_pid, worker_id) do
  with {:ok, _port_info} <- validate_port(port),
       true <- Process.alive?(target_pid) do
    try do
      Port.connect(port, target_pid)
      :ok
    catch
      :error, :badarg ->
        # Process died between alive? check and connect
        {:error, :process_died}
      :error, reason ->
        {:error, {:connect_failed, reason}}
    end
  else
    false ->
      {:error, :target_process_dead}
    {:error, reason} ->
      {:error, reason}
  end
end

Update handle_session_checkout (lines 196-208):

# BEFORE:
if port_valid and pid_alive do
  Port.connect(worker_state.port, pid)
  # ... rest of the code
else
  reason = 
    cond do
      not port_valid -> :invalid_port
      not pid_alive -> :process_not_alive
      true -> :unknown_error
    end
  {:error, reason}
end

# AFTER:
case safe_port_connect(worker_state.port, pid, worker_state.worker_id) do
  :ok ->
    Logger.debug("[#{worker_state.worker_id}] Connected to session '#{session_id}' for PID #{inspect(pid)}")
    
    updated_state = %{worker_state | 
      current_session: session_id,
      stats: Map.update!(worker_state.stats, :checkouts, &(&1 + 1))
    }
    
    {:ok, :state, updated_state, pool_state}
    
  {:error, reason} ->
    Logger.error("[#{worker_state.worker_id}] Failed to connect port for session '#{session_id}': #{inspect(reason)}")
    {:remove, {:connect_failed, reason}, pool_state}
end

Fix 3: Test Assertion Corrections

Issue

Test expects programs to be a list but receives a map with “programs” key.

Implementation

File: test/pool_v2_concurrent_test.exs

Line 155

# BEFORE:
assert is_list(programs)

# AFTER:
assert is_map(result)
assert Map.has_key?(result, "programs")
programs = result["programs"]
assert is_list(programs)

Line 170

# BEFORE:
assert length(programs) == 10

# AFTER:
assert result["total_count"] == 10
assert length(result["programs"]) == 10

Fix 4: Test Configuration Guards

Issue

Tests fail when run without proper TEST_MODE environment variable.

Implementation

File: test/pool_fixed_test.exs

Add module attribute and setup block after line 6:

@moduletag :layer_3

setup do
  test_mode = Application.get_env(:dspex, :test_mode, :mock_adapter)
  pooling_enabled = Application.get_env(:dspex, :pooling_enabled, false)
  
  unless test_mode == :full_integration and pooling_enabled do
    skip("This test requires TEST_MODE=full_integration with pooling enabled")
  end
  
  :ok
end

Remove lines 13-15 (Application.put_env calls) as they’re ineffective after app start.

Fix 5: Service Detection Improvement

Issue

Registry lookups fail during test initialization causing “Python bridge not available” errors.

Implementation

File: lib/dspex/adapters/python_port.ex

Replace detect_running_service function (lines 55-68):

# BEFORE:
defp detect_running_service do
  pool_running = match?({:ok, _}, Registry.lookup(Registry.DSPex, SessionPool))
  bridge_running = match?({:ok, _}, Registry.lookup(Registry.DSPex, Bridge))
  
  case {pool_running, bridge_running} do
    {true, _} -> {:pool, SessionPool}
    {false, true} -> {:bridge, Bridge}
    _ -> {:error, "Python bridge not available"}
  end
end

# AFTER:
defp detect_running_service do
  # Use Process.whereis for more reliable detection
  pool_pid = Process.whereis(DSPex.PythonBridge.SessionPool)
  bridge_pid = Process.whereis(DSPex.PythonBridge.Bridge)
  
  cond do
    is_pid(pool_pid) and Process.alive?(pool_pid) ->
      {:pool, DSPex.PythonBridge.SessionPool}
      
    is_pid(bridge_pid) and Process.alive?(bridge_pid) ->
      {:bridge, DSPex.PythonBridge.Bridge}
      
    true ->
      # Try Registry as fallback
      case {Registry.lookup(Registry.DSPex, SessionPool), 
            Registry.lookup(Registry.DSPex, Bridge)} do
        {[{_, _}], _} -> {:pool, SessionPool}
        {_, [{_, _}]} -> {:bridge, Bridge}
        _ -> {:error, "Python bridge not available"}
      end
  end
end

Fix 6: Worker State Update Safety

Issue

Worker state updates don’t handle edge cases properly.

Implementation

File: lib/dspex/python_bridge/pool_worker_v2.ex

Add after handle_checkin (around line 130):

# Helper to safely update worker stats
defp update_worker_stats(worker_state, checkin_type) do
  stats_update = case checkin_type do
    :ok -> 
      %{successful_checkins: &(&1 + 1), last_activity: System.monotonic_time(:millisecond)}
    {:error, _} -> 
      %{error_checkins: &(&1 + 1), last_activity: System.monotonic_time(:millisecond)}
    :close -> 
      %{last_activity: System.monotonic_time(:millisecond)}
    _ -> 
      %{last_activity: System.monotonic_time(:millisecond)}
  end
  
  updated_stats = Enum.reduce(stats_update, worker_state.stats, fn {key, updater}, stats ->
    case updater do
      fun when is_function(fun, 1) ->
        Map.update!(stats, key, fun)
      value ->
        Map.put(stats, key, value)
    end
  end)
  
  %{worker_state | stats: updated_stats}
end

Update handle_checkin to use the helper:

# Line 117-118
# BEFORE:
stats = Map.update!(worker_state.stats, :successful_checkins, &(&1 + 1))
{:ok, %{worker_state | stats: stats, current_session: nil}, pool_state}

# AFTER:
updated_state = worker_state
  |> update_worker_stats(:ok)
  |> Map.put(:current_session, nil)
{:ok, updated_state, pool_state}

Fix 7: Initialize Pool Eagerly for Tests

Issue

Lazy pool initialization causes timeouts in tests.

Implementation

File: test/support/pool_test_helpers.ex (create new file)

defmodule DSPex.Test.PoolTestHelpers do
  @moduledoc """
  Helper functions for pool-related tests.
  """
  
  @doc """
  Starts an isolated pool for testing with eager initialization.
  """
  def start_test_pool(opts \\ []) do
    pool_name = :"test_pool_#{System.unique_integer([:positive])}"
    genserver_name = :"genserver_#{pool_name}"
    
    config = Keyword.merge([
      name: genserver_name,
      pool_name: pool_name,
      lazy: false,  # Eager initialization for tests
      pool_size: 2,
      max_overflow: 0,
      init_timeout: 10_000  # Longer timeout for test stability
    ], opts)
    
    case start_supervised({DSPex.PythonBridge.SessionPoolV2, config}) do
      {:ok, pid} ->
        # Wait for pool to be ready
        wait_for_pool_ready(pool_name, 5_000)
        {:ok, %{pid: pid, pool_name: pool_name, genserver_name: genserver_name}}
        
      error ->
        error
    end
  end
  
  @doc """
  Waits for a pool to have at least one available worker.
  """
  def wait_for_pool_ready(pool_name, timeout) do
    deadline = System.monotonic_time(:millisecond) + timeout
    
    Stream.repeatedly(fn ->
      case get_pool_status(pool_name) do
        {:ok, %{available_workers: n}} when n > 0 ->
          :ready
        _ ->
          Process.sleep(100)
          :waiting
      end
    end)
    |> Stream.take_while(fn status ->
      status == :waiting and System.monotonic_time(:millisecond) < deadline
    end)
    |> Enum.to_list()
    
    :ok
  end
  
  defp get_pool_status(pool_name) do
    # Implementation depends on pool monitoring API
    {:ok, %{available_workers: 2}}  # Placeholder
  end
end

Testing Guide

Running Fixed Tests

  1. Individual test files:

    TEST_MODE=full_integration mix test test/pool_v2_concurrent_test.exs
    TEST_MODE=mock_adapter mix test test/pool_v2_test.exs --only layer_1
    
  2. All pool tests:

    TEST_MODE=full_integration mix test test/pool_v2*.exs
    
  3. Verify fixes:

    # Run the specific failing tests
    mix test test/pool_v2_concurrent_test.exs:93
    mix test test/pool_fixed_test.exs:7
    

Expected Results

After implementing these fixes:

  1. NimblePool errors - RESOLVED

    • No more RuntimeError: unexpected return errors
    • Proper worker removal on failures
  2. Test assertions - RESOLVED

    • Tests correctly handle map responses
    • No more type mismatches
  3. Port connection - IMPROVED

    • Reduced :badarg errors
    • Better error messages for debugging
  4. Test configuration - RESOLVED

    • Tests skip gracefully when environment mismatched
    • Clear error messages about requirements

Rollback Plan

If any fix causes regression:

  1. Individual fixes are independent - Can be reverted separately
  2. Git tags for each fix - Tag before applying each fix
  3. Feature flags - Major changes can be feature-flagged:
    if Application.get_env(:dspex, :use_safe_port_connect, true) do
      safe_port_connect(port, pid, worker_id)
    else
      Port.connect(port, pid)  # Old behavior
    end
    

Next Steps

After implementing and testing these immediate fixes:

  1. Monitor test stability for 24 hours
  2. Collect metrics on worker failures
  3. Proceed to Document 3: “Worker Lifecycle Management Design”

Appendix: Quick Reference

Files Modified

  • lib/dspex/python_bridge/pool_worker_v2.ex - 7 changes
  • lib/dspex/adapters/python_port.ex - 1 change
  • test/pool_v2_concurrent_test.exs - 2 changes
  • test/pool_fixed_test.exs - 1 change
  • test/support/pool_test_helpers.ex - New file

Key Functions Added

  • validate_port/1 - Port validation
  • safe_port_connect/3 - Safe port connection
  • update_worker_stats/2 - Stats helper
  • start_test_pool/1 - Test helper

Error Types Introduced

  • {:checkout_failed, reason}
  • {:connect_failed, reason}
  • {:connect_exited, reason}
  • {:connect_error, {kind, reason}}