Looking at this NimblePool V2 implementation issue, I can see the core problem: the worker initialization is timing out because the initialization ping isn’t receiving a response. Let me analyze the specific technical details and provide you with concrete solutions.
Root Cause Analysis
The main issue is in the PoolWorkerV2.send_initialization_ping/1
function. Looking at the code:
- Worker starts Python process with pool-worker mode
- Sends initialization ping with
request_id = 0
- Waits for response but times out after 5 seconds
- Python process appears to be running but not responding
Key Problems Identified
1. Port Message Format Issue
The current code sends the message like this:
send(worker_state.port, {self(), {:command, request}})
But with {:packet, 4}
mode, the port already handles length prefixing. The issue might be that request
is already encoded as JSON string, but needs to be binary.
2. Python Side Expectations
Looking at the Python bridge code, in pool-worker mode it expects to receive messages in the same format as standalone mode, but the initialization sequence might be different.
Solutions
Solution 1: Fix Port Communication
In pool_worker_v2.ex
, modify the send_initialization_ping
function:
defp send_initialization_ping(worker_state) do
request_id = 0 # Special ID for init ping
request = Protocol.encode_request(request_id, :ping, %{
initialization: true,
worker_id: worker_state.worker_id
})
# Ensure request is binary
request_binary = :erlang.iolist_to_binary(request)
Logger.info("Sending init ping request (#{byte_size(request_binary)} bytes): #{inspect(request_binary)}")
try do
# Send the binary data
send(worker_state.port, {self(), {:command, request_binary}})
receive do
{port, {:data, data}} when port == worker_state.port ->
Logger.debug("Received init response data: #{inspect(data, limit: :infinity)}")
# ... rest of the response handling
Solution 2: Check Python Script Arguments
The Python script might not be properly handling the pool-worker mode. Make sure the Python side is receiving and processing the arguments correctly:
# In dspy_bridge.py, verify the argument parsing:
def main():
parser = argparse.ArgumentParser(description='DSPy Bridge for Elixir Integration')
parser.add_argument('--mode', choices=['standalone', 'pool-worker'], default='standalone',
help='Bridge operation mode')
parser.add_argument('--worker-id', type=str, help='Worker ID for pool-worker mode')
args = parser.parse_args()
# Add debug logging
print(f"Starting in {args.mode} mode with worker ID: {args.worker_id}", file=sys.stderr)
sys.stderr.flush() # Ensure immediate output
Solution 3: Capture Python Stderr for Debugging
Since you added :stderr_to_stdout
, you should be able to see Python errors. Add more detailed logging:
# In pool_worker_v2.ex init_worker/1
port_opts = [
:binary,
:exit_status,
{:packet, 4},
:stderr_to_stdout,
{:args, [script_path, "--mode", "pool-worker", "--worker-id", worker_id]}
]
# After starting the port, immediately check if it's alive
port = Port.open({:spawn_executable, python_path}, port_opts)
port_info = Port.info(port)
Logger.info("Port started with info: #{inspect(port_info)}")
Solution 4: Test Port Communication in Isolation
Create a simple test to verify port communication works:
defmodule PortTest do
def test_direct_communication do
# Start Python process
python_path = System.find_executable("python3")
script_path = Path.join(:code.priv_dir(:dspex), "python/dspy_bridge.py")
port = Port.open({:spawn_executable, python_path}, [
:binary,
:exit_status,
{:packet, 4},
:stderr_to_stdout,
{:args, [script_path, "--mode", "pool-worker", "--worker-id", "test"]}
])
# Send a simple ping
request = Jason.encode!(%{
"id" => 0,
"command" => "ping",
"args" => %{"initialization" => true}
})
send(port, {self(), {:command, request}})
# Wait for response
receive do
{^port, {:data, data}} ->
IO.puts("Received: #{inspect(data)}")
Jason.decode!(data)
{^port, {:exit_status, status}} ->
{:error, "Port exited with status: #{status}"}
other ->
{:error, "Unexpected message: #{inspect(other)}"}
after
5000 ->
{:error, "Timeout"}
end
end
end
Solution 5: Fix Process Ownership During Init
The issue might be that during init_worker
, the port is owned by the NimblePool process, not the worker. You might need to explicitly connect the port:
# In init_worker, after creating the port:
port = Port.open({:spawn_executable, python_path}, port_opts)
# Ensure we own the port
Port.connect(port, self())
Recommended Approach
- Start with Solution 4 - Test port communication in isolation to verify the basic setup works
- Apply Solution 1 - Ensure binary format for messages
- Use Solution 3 - Add comprehensive logging to see what’s happening
- If still failing, try Solution 5 - Fix process ownership
Alternative: Simplified V2 Without Full Refactor
If the full V2 refactoring proves too complex, consider a minimal fix to V1:
# In SessionPool, modify execute_in_session to spawn a Task:
def execute_in_session(session_id, command, args, opts \\ []) do
task = Task.async(fn ->
GenServer.call(__MODULE__, {:execute_in_session, session_id, command, args, opts})
end)
Task.await(task, Keyword.get(opts, :timeout, @default_operation_timeout))
end
This would move the blocking operation out of client GenServer.calls while keeping most of the V1 architecture intact.