Prompt: Update Elixir Client with Stdout-Based Readiness Detection
Objective
Update the Elixir GRPCWorker to use stdout-based readiness detection instead of TCP polling, and implement the client-side protocol for the unified bridge.
Context
The current implementation uses TCP polling which has race conditions. The new approach monitors stdout for a “GRPC_READY:port” message, which is more robust and faster.
Requirements
Core Updates
GRPCWorker GenServer
- Replace TCP polling with stdout monitoring
- Parse “GRPC_READY:port” message
- Handle both stdout and stderr properly
- Maintain backward compatibility
gRPC Client Module
- Implement all new RPC methods
- Handle streaming responses
- Proper error handling
- Connection pooling
Integration Updates
- Update existing tool execution
- Add variable management calls
- Implement streaming handlers
Implementation Steps
1. Update GRPCWorker with Stdout Monitoring
# File: snakepit/lib/snakepit/grpc/worker.ex
defmodule Snakepit.GRPC.Worker do
use GenServer
require Logger
defstruct [:port, :channel, :python_port, :ready, :buffer]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
python_path = opts[:python_path] || find_python()
server_script = opts[:server_script] || server_script_path()
# Start Python process with Port
port_opts = [
:binary,
:exit_status,
{:line, 1024}, # Line-buffered for easier parsing
{:args, [server_script]},
{:env, [{'PYTHONUNBUFFERED', '1'}]} # Force unbuffered output
]
port = Port.open({:spawn_executable, python_path}, port_opts)
state = %__MODULE__{
python_port: port,
ready: false,
buffer: ""
}
{:ok, state}
end
@impl true
def handle_info({port, {:data, {:eol, line}}}, %{python_port: port} = state) do
# Process complete line
handle_output_line(line, state)
end
@impl true
def handle_info({port, {:data, {:noeol, partial}}}, %{python_port: port} = state) do
# Buffer partial line
{:noreply, %{state | buffer: state.buffer <> partial}}
end
@impl true
def handle_info({port, {:exit_status, status}}, %{python_port: port} = state) do
Logger.error("Python process exited with status: #{status}")
{:stop, {:python_exit, status}, state}
end
defp handle_output_line(line, state) do
full_line = state.buffer <> line
case parse_ready_message(full_line) do
{:ok, port_number} ->
# Server is ready!
Logger.info("Python gRPC server ready on port #{port_number}")
# Connect to gRPC
{:ok, channel} = GRPC.Stub.connect("localhost:#{port_number}")
new_state = %{state |
port: port_number,
channel: channel,
ready: true,
buffer: ""
}
# Notify any waiters
notify_ready(new_state)
{:noreply, new_state}
:not_ready ->
# Log other output for debugging
Logger.debug("Python output: #{full_line}")
{:noreply, %{state | buffer: ""}}
end
end
defp parse_ready_message(line) do
case Regex.run(~r/^GRPC_READY:(\d+)/, line) do
[_, port_str] ->
case Integer.parse(port_str) do
{port, ""} -> {:ok, port}
_ -> :not_ready
end
_ ->
:not_ready
end
end
defp notify_ready(state) do
# Send notification to any processes waiting for ready state
Registry.dispatch(Snakepit.Registry, :grpc_ready, fn entries ->
for {pid, _} <- entries do
send(pid, {:grpc_ready, state.channel})
end
end)
end
@doc """
Wait for the gRPC server to be ready.
"""
def await_ready(timeout \\ 30_000) do
case GenServer.call(__MODULE__, :get_channel, timeout) do
{:ok, channel} -> {:ok, channel}
error -> error
end
end
@impl true
def handle_call(:get_channel, from, state) do
if state.ready do
{:reply, {:ok, state.channel}, state}
else
# Register caller to be notified when ready
Registry.register(Snakepit.Registry, :grpc_ready, from)
{:noreply, state}
end
end
end
2. Implement gRPC Client Module
# File: snakepit/lib/snakepit/grpc/client.ex
defmodule Snakepit.GRPC.Client do
@moduledoc """
Client for the unified bridge protocol.
"""
alias Snakepit.Bridge.Proto.{
BridgeService,
RegisterVariableRequest,
GetVariableRequest,
SetVariableRequest,
WatchVariablesRequest,
Variable
}
require Logger
@doc """
Registers a new variable in a session.
"""
def register_variable(channel, session_id, name, type, initial_value, opts \\ []) do
request = RegisterVariableRequest.new(
session_id: session_id,
name: name,
type: type,
initial_value: serialize_value(initial_value, type),
constraints: build_constraints(type, opts[:constraints] || %{}),
metadata: opts[:metadata] || %{}
)
case BridgeService.Stub.register_variable(channel, request) do
{:ok, response} ->
{:ok, response.variable_id, deserialize_variable(response.variable)}
{:error, %GRPC.RPCError{} = error} ->
{:error, format_error(error)}
end
end
@doc """
Gets a variable's current value.
"""
def get_variable(channel, session_id, identifier) do
request = GetVariableRequest.new(
session_id: session_id,
identifier: identifier
)
case BridgeService.Stub.get_variable(channel, request) do
{:ok, response} ->
variable = deserialize_variable(response.variable)
{:ok, variable}
{:error, %GRPC.RPCError{} = error} ->
{:error, format_error(error)}
end
end
@doc """
Updates a variable's value.
"""
def set_variable(channel, session_id, identifier, value, metadata \\ %{}) do
# First get the variable to know its type
case get_variable(channel, session_id, identifier) do
{:ok, variable} ->
request = SetVariableRequest.new(
session_id: session_id,
identifier: identifier,
value: serialize_value(value, variable.type),
metadata: metadata
)
case BridgeService.Stub.set_variable(channel, request) do
{:ok, _response} -> :ok
{:error, error} -> {:error, format_error(error)}
end
error ->
error
end
end
@doc """
Watches variables for changes (streaming).
"""
def watch_variables(channel, session_id, identifiers, opts \\ []) do
request = WatchVariablesRequest.new(
session_id: session_id,
variable_identifiers: identifiers,
include_initial_values: Keyword.get(opts, :include_initial, true)
)
case BridgeService.Stub.watch_variables(channel, request) do
{:ok, stream} ->
{:ok, stream}
{:error, error} ->
{:error, format_error(error)}
end
end
# Serialization helpers
defp serialize_value(value, type) do
json = case type do
:float -> Jason.encode!(value)
:integer -> Jason.encode!(value)
:string -> Jason.encode!(value)
:boolean -> Jason.encode!(value)
:choice -> Jason.encode!(value)
:module -> Jason.encode!(value)
:embedding -> Jason.encode!(value)
:tensor ->
# Handle tensor serialization
Jason.encode!(%{
shape: tensor_shape(value),
data: tensor_data(value)
})
_ ->
Jason.encode!(value)
end
Google.Protobuf.Any.new(
type_url: "dspex.variables/#{type}",
value: json
)
end
defp deserialize_variable(proto_var) do
%{
id: proto_var.id,
name: proto_var.name,
type: proto_var.type,
value: deserialize_value(proto_var.value, proto_var.type),
constraints: proto_var.constraints,
metadata: proto_var.metadata,
version: proto_var.version
}
end
defp deserialize_value(any_value, type) do
json = any_value.value
case type do
:float -> Jason.decode!(json)
:integer -> Jason.decode!(json)
:string -> Jason.decode!(json)
:boolean -> Jason.decode!(json)
:choice -> Jason.decode!(json)
:module -> Jason.decode!(json)
:embedding -> Jason.decode!(json)
:tensor ->
data = Jason.decode!(json)
# Could reconstruct tensor type if needed
data
_ ->
Jason.decode!(json)
end
end
defp build_constraints(:choice, user_constraints) do
# Build choice constraints
%{choices: Map.get(user_constraints, :choices, [])}
end
defp build_constraints(:float, user_constraints) do
# Build numeric constraints
%{
min: Map.get(user_constraints, :min),
max: Map.get(user_constraints, :max)
}
end
defp build_constraints(_type, user_constraints) do
user_constraints
end
defp format_error(%GRPC.RPCError{status: status, message: message}) do
"gRPC error #{status}: #{message}"
end
end
3. Create Stream Consumer for Variable Watching
# File: snakepit/lib/snakepit/grpc/stream_handler.ex
defmodule Snakepit.GRPC.StreamHandler do
@moduledoc """
Handles gRPC streams for variable watching.
"""
use Task
require Logger
def start_link(stream, callback) do
Task.start_link(__MODULE__, :consume_stream, [stream, callback])
end
def consume_stream(stream, callback) do
stream
|> Enum.each(fn
{:ok, update} ->
handle_update(update, callback)
{:error, reason} ->
Logger.error("Stream error: #{inspect(reason)}")
end)
end
defp handle_update(update, callback) do
case update.update_type do
"heartbeat" ->
# Ignore heartbeats
:ok
"initial_value" ->
# Handle initial value
callback.(update.variable.name, nil, deserialize_value(update.variable.value), %{initial: true})
"value_change" ->
# Handle value change
old_value = if update.old_value, do: deserialize_value(update.old_value), else: nil
new_value = deserialize_value(update.variable.value)
metadata = Map.merge(update.update_metadata, %{
source: update.update_source,
timestamp: update.timestamp
})
callback.(update.variable.name, old_value, new_value, metadata)
other ->
Logger.warning("Unknown update type: #{other}")
end
rescue
e ->
Logger.error("Error in stream callback: #{inspect(e)}")
end
defp deserialize_value(any_value) do
# Reuse from client module
Snakepit.GRPC.Client.deserialize_value(any_value, nil)
end
end
4. Update Module Configuration
# File: snakepit/lib/snakepit.ex
defmodule Snakepit do
use Application
def start(_type, _args) do
children = [
# Registry for coordination
{Registry, keys: :duplicate, name: Snakepit.Registry},
# Start gRPC worker
Snakepit.GRPC.Worker,
# Other supervisors...
]
opts = [strategy: :one_for_one, name: Snakepit.Supervisor]
Supervisor.start_link(children, opts)
end
end
Testing Steps
1. Test Server Startup
# In IEx
{:ok, _} = Snakepit.GRPC.Worker.start_link()
{:ok, channel} = Snakepit.GRPC.Worker.await_ready()
2. Test Variable Operations
# Register a variable
{:ok, var_id, var} = Snakepit.GRPC.Client.register_variable(
channel,
"test_session",
"temperature",
:float,
0.7
)
# Get variable
{:ok, var} = Snakepit.GRPC.Client.get_variable(channel, "test_session", "temperature")
# Set variable
:ok = Snakepit.GRPC.Client.set_variable(
channel,
"test_session",
"temperature",
0.9,
%{"source" => "test"}
)
3. Test Streaming
# Watch variables
{:ok, stream} = Snakepit.GRPC.Client.watch_variables(
channel,
"test_session",
["temperature"]
)
# Start consumer
Snakepit.GRPC.StreamHandler.start_link(stream, fn name, old, new, meta ->
IO.puts("#{name} changed from #{old} to #{new}")
end)
Critical Implementation Notes
- Port Management: Use Erlang Ports for reliable process management
- Output Parsing: Handle both complete and partial lines from stdout
- Error Handling: Gracefully handle Python process crashes
- Streaming: Ensure streams are properly terminated on client disconnect
- Backward Compatibility: Maintain existing tool execution functionality
Files to Create/Modify
- Update:
snakepit/lib/snakepit/grpc/worker.ex
- Create:
snakepit/lib/snakepit/grpc/client.ex
- Create:
snakepit/lib/snakepit/grpc/stream_handler.ex
- Update:
snakepit/lib/snakepit.ex
- Update:
snakepit/mix.exs
(ensure dependencies)
Next Steps
After implementing the Elixir client:
- Test the stdout-based startup detection
- Verify all RPC methods work correctly
- Test streaming stability
- Proceed to implement serialization (next prompt)