Unified gRPC Bridge: Implementation Guide
Overview
This guide provides a step-by-step implementation path for the unified gRPC bridge that seamlessly integrates both tools and variables. We’ll build incrementally, ensuring each phase delivers working functionality.
Implementation Phases
Phase 1: Enhanced SessionStore (Week 1)
The foundation is extending the existing SessionStore to manage variables alongside tools.
Step 1.1: Extend SessionStore State
# lib/dspex/bridge/session_store.ex
defmodule DSPex.Bridge.SessionStore do
use GenServer
require Logger
# Extended state to include variables
defstruct [
:session_id,
:tools, # Existing
:variables, # NEW: Map of var_id => Variable struct
:variable_observers, # NEW: Map of var_id => MapSet of PIDs
:variable_locks, # NEW: Map of var_id => optimizer_pid
:metadata,
:created_at,
:last_accessed_at
]
# Add to init/1
def init(session_id) do
state = %__MODULE__{
session_id: session_id,
tools: %{},
variables: %{},
variable_observers: %{},
variable_locks: %{},
metadata: %{},
created_at: DateTime.utc_now(),
last_accessed_at: DateTime.utc_now()
}
# Set up session monitoring
:timer.send_interval(@cleanup_interval, :check_session_timeout)
{:ok, state}
end
Step 1.2: Implement Variable Registration
# Add to session_store.ex
@doc """
Register a variable in the session with type validation.
"""
def register_variable(session_id, name, type, initial_value, opts \\ []) do
GenServer.call(__MODULE__, {:register_variable, session_id, name, type, initial_value, opts})
end
# Add handler
def handle_call({:register_variable, session_id, name, type, initial_value, opts}, _from, state) do
with {:ok, session} <- get_session_state(state, session_id),
{:ok, type_module} <- validate_variable_type(type),
{:ok, validated_value} <- type_module.validate(initial_value) do
var_id = generate_variable_id(name)
variable = %Variable{
id: var_id,
name: name,
type: type,
value: validated_value,
constraints: opts[:constraints] || %{},
metadata: %{
created_at: DateTime.utc_now(),
source: :elixir,
description: opts[:description]
},
last_updated_at: DateTime.utc_now()
}
updated_session = %{session |
variables: Map.put(session.variables, var_id, variable),
last_accessed_at: DateTime.utc_now()
}
new_state = Map.put(state, session_id, updated_session)
Logger.info("Registered variable #{name} (#{var_id}) in session #{session_id}")
{:reply, {:ok, var_id}, new_state}
else
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
Step 1.3: Implement Variable Access
# Variable getter with type information
def handle_call({:get_variable, session_id, var_id}, _from, state) do
with {:ok, session} <- get_session_state(state, session_id),
{:ok, variable} <- Map.fetch(session.variables, var_id) do
# Update last accessed time
updated_session = %{session | last_accessed_at: DateTime.utc_now()}
new_state = Map.put(state, session_id, updated_session)
{:reply, {:ok, variable}, new_state}
else
:error -> {:reply, {:error, :variable_not_found}, state}
error -> {:reply, error, state}
end
end
# Variable setter with validation and notifications
def handle_call({:update_variable, session_id, var_id, new_value, metadata}, _from, state) do
with {:ok, session} <- get_session_state(state, session_id),
{:ok, variable} <- Map.fetch(session.variables, var_id),
{:ok, type_module} <- validate_variable_type(variable.type),
{:ok, validated_value} <- type_module.validate(new_value) do
updated_variable = %{variable |
value: validated_value,
last_updated_at: DateTime.utc_now(),
metadata: Map.merge(variable.metadata, metadata)
}
updated_session = %{session |
variables: Map.put(session.variables, var_id, updated_variable),
last_accessed_at: DateTime.utc_now()
}
new_state = Map.put(state, session_id, updated_session)
# Notify observers
notify_variable_observers(session_id, var_id, updated_variable,
Map.get(session.variable_observers, var_id, MapSet.new()))
{:reply, :ok, new_state}
else
error -> {:reply, error, state}
end
end
Phase 2: gRPC Protocol Extensions (Week 1-2)
Step 2.1: Update Protocol Buffers
// protos/snakepit_bridge.proto
// Add to existing service
service SnakepitBridge {
// ... existing RPCs ...
// Variable operations
rpc GetVariable(GetVariableRequest) returns (GetVariableResponse);
rpc SetVariable(SetVariableRequest) returns (SetVariableResponse);
rpc ListVariables(ListVariablesRequest) returns (ListVariablesResponse);
rpc WatchVariables(WatchVariablesRequest) returns (stream VariableUpdate);
}
// Variable messages
message Variable {
string id = 1;
string name = 2;
string type = 3;
google.protobuf.Any value = 4;
string constraints_json = 5;
map<string, string> metadata = 6;
enum Source {
ELIXIR = 0;
PYTHON = 1;
}
Source source = 7;
int64 last_updated_at = 8;
}
message GetVariableRequest {
string session_id = 1;
string variable_id = 2;
}
message GetVariableResponse {
Variable variable = 1;
}
message SetVariableRequest {
string session_id = 1;
string variable_id = 2;
google.protobuf.Any value = 3;
map<string, string> metadata = 4;
}
message SetVariableResponse {
bool success = 1;
string error_message = 2;
}
message WatchVariablesRequest {
string session_id = 1;
repeated string variable_ids = 2;
}
message VariableUpdate {
string variable_id = 1;
Variable variable = 2;
string update_source = 3;
map<string, string> update_metadata = 4;
}
Step 2.2: Implement gRPC Handlers
# lib/dspex/bridge/grpc_server.ex
defmodule DSPex.Bridge.GRPCServer do
use GRPC.Server, service: DSPex.Bridge.SnakepitBridge.Service
# Add variable handlers
@impl true
def get_variable(request, _stream) do
case SessionStore.get_variable(request.session_id, request.variable_id) do
{:ok, variable} ->
%GetVariableResponse{
variable: variable_to_proto(variable)
}
{:error, reason} ->
raise GRPC.RPCError, status: :not_found, message: "Variable not found: #{reason}"
end
end
@impl true
def set_variable(request, _stream) do
with {:ok, value} <- deserialize_any(request.value),
:ok <- SessionStore.update_variable(
request.session_id,
request.variable_id,
value,
request.metadata
) do
%SetVariableResponse{success: true}
else
{:error, reason} ->
%SetVariableResponse{
success: false,
error_message: to_string(reason)
}
end
end
@impl true
def watch_variables(request, stream) do
# Set up variable watching
{:ok, watcher} = VariableWatcher.start_link(
session_id: request.session_id,
variable_ids: request.variable_ids,
stream: stream
)
# Keep connection alive
ref = Process.monitor(watcher)
receive do
{:DOWN, ^ref, :process, ^watcher, _reason} ->
# Watcher terminated, end stream
:ok
end
end
# Helper to convert Variable to protobuf
defp variable_to_proto(%Variable{} = var) do
%DSPex.Bridge.Proto.Variable{
id: var.id,
name: to_string(var.name),
type: to_string(var.type),
value: serialize_value(var.value, var.type),
constraints_json: Jason.encode!(var.constraints),
metadata: var.metadata,
source: if(var.metadata[:source] == :python, do: :PYTHON, else: :ELIXIR),
last_updated_at: DateTime.to_unix(var.last_updated_at, :millisecond)
}
end
end
Phase 3: Python SessionContext Enhancement (Week 2)
Step 3.1: Extend SessionContext
# python/dspex_bridge/session_context.py
import asyncio
import time
from typing import Dict, Any, List, Optional, Tuple, AsyncIterator
import grpc
from google.protobuf import any_pb2
from .proto import snakepit_bridge_pb2 as pb2
from .proto import snakepit_bridge_pb2_grpc as pb2_grpc
from .serialization import serialize_value, deserialize_value
class SessionContext:
"""Enhanced session context with unified tool and variable access."""
def __init__(self, session_id: str, channel: grpc.aio.Channel):
self.session_id = session_id
self.channel = channel
self.stub = pb2_grpc.SnakepitBridgeStub(channel)
# Tool management (existing)
self._tools: Dict[str, AsyncGRPCProxyTool] = {}
# Variable management (new)
self._variable_cache: Dict[str, Tuple[Any, float]] = {}
self._cache_ttl = 1.0 # 1 second TTL
self._variable_watchers: Dict[str, asyncio.Task] = {}
async def get_variable(self, name: str, bypass_cache: bool = False) -> Any:
"""
Get a variable value from the session.
Args:
name: Variable name or ID
bypass_cache: Force fetch from server
Returns:
The variable value, properly typed
"""
# Check cache unless bypassed
if not bypass_cache and name in self._variable_cache:
value, timestamp = self._variable_cache[name]
if time.time() - timestamp < self._cache_ttl:
return value
# Fetch from server
request = pb2.GetVariableRequest(
session_id=self.session_id,
variable_id=name
)
try:
response = await self.stub.GetVariable(request)
variable = response.variable
# Deserialize value based on type
value = deserialize_value(variable.value, variable.type)
# Update cache
self._variable_cache[name] = (value, time.time())
return value
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
raise KeyError(f"Variable '{name}' not found in session")
raise
async def set_variable(self, name: str, value: Any,
metadata: Optional[Dict[str, str]] = None) -> None:
"""
Set a variable value in the session.
Args:
name: Variable name or ID
value: New value (will be type-checked on server)
metadata: Optional metadata for the update
"""
# Serialize value
serialized = serialize_value(value)
request = pb2.SetVariableRequest(
session_id=self.session_id,
variable_id=name,
value=serialized,
metadata=metadata or {}
)
response = await self.stub.SetVariable(request)
if not response.success:
raise ValueError(f"Failed to set variable: {response.error_message}")
# Update cache
self._variable_cache[name] = (value, time.time())
async def list_variables(self) -> Dict[str, Any]:
"""List all variables in the session."""
request = pb2.ListVariablesRequest(session_id=self.session_id)
response = await self.stub.ListVariables(request)
variables = {}
for var in response.variables:
value = deserialize_value(var.value, var.type)
variables[var.name] = {
'id': var.id,
'type': var.type,
'value': value,
'metadata': dict(var.metadata)
}
return variables
async def watch_variables(self, variable_names: List[str]) -> AsyncIterator[Dict[str, Any]]:
"""
Watch variables for changes.
Yields:
Dict containing variable_id, new_value, and metadata
"""
request = pb2.WatchVariablesRequest(
session_id=self.session_id,
variable_ids=variable_names
)
async for update in self.stub.WatchVariables(request):
# Update cache
value = deserialize_value(update.variable.value, update.variable.type)
self._variable_cache[update.variable_id] = (value, time.time())
yield {
'variable_id': update.variable_id,
'value': value,
'type': update.variable.type,
'metadata': dict(update.update_metadata),
'source': update.update_source
}
Step 3.2: Create Variable-Aware Tools
# python/dspex_bridge/variable_aware_tools.py
from typing import List, Dict, Any, Optional
from .proxy_tool import AsyncGRPCProxyTool
from .session_context import SessionContext
class VariableAwareProxyTool(AsyncGRPCProxyTool):
"""
Enhanced proxy tool that automatically fetches and injects variables.
"""
def __init__(self, tool_spec: Dict[str, Any], session_context: SessionContext,
variable_bindings: Optional[Dict[str, str]] = None):
"""
Args:
tool_spec: Tool specification from server
session_context: Session context for variable access
variable_bindings: Map of parameter_name -> variable_name
"""
super().__init__(tool_spec, session_context)
self.variable_bindings = variable_bindings or {}
async def __call__(self, *args, **kwargs) -> Any:
"""Execute tool with automatic variable injection."""
# Fetch and inject bound variables
for param_name, var_name in self.variable_bindings.items():
if param_name not in kwargs: # Don't override explicit args
try:
kwargs[param_name] = await self.session_context.get_variable(var_name)
except KeyError:
# Variable doesn't exist, skip
pass
# Add all variables as context if requested
if kwargs.get('_include_all_variables', False):
kwargs['_variables'] = await self.session_context.list_variables()
# Execute tool with enriched kwargs
return await super().__call__(*args, **kwargs)
def bind_variable(self, param_name: str, variable_name: str) -> 'VariableAwareProxyTool':
"""Bind a parameter to a variable."""
self.variable_bindings[param_name] = variable_name
return self
Phase 4: DSPy Integration (Week 2-3)
Step 4.1: Variable-Aware DSPy Modules
# python/dspex_bridge/dspy_integration.py
import dspy
from typing import Dict, Any, List, Optional
from .session_context import SessionContext
class VariableAwareMixin:
"""
Mixin to make any DSPy module variable-aware.
"""
def __init__(self, *args, session_context: SessionContext = None, **kwargs):
super().__init__(*args, **kwargs)
self.session_context = session_context
self._variable_bindings: Dict[str, str] = {}
async def bind_to_variable(self, attribute: str, variable_name: str) -> None:
"""
Bind a module attribute to a session variable.
Example:
await module.bind_to_variable('temperature', 'global_temperature')
"""
if not self.session_context:
raise RuntimeError("No session context available")
# Get current value
value = await self.session_context.get_variable(variable_name)
# Set attribute
setattr(self, attribute, value)
# Remember binding
self._variable_bindings[attribute] = variable_name
async def sync_variables(self) -> None:
"""Sync all bound variables from session."""
if not self.session_context:
return
for attr, var_name in self._variable_bindings.items():
try:
value = await self.session_context.get_variable(var_name)
setattr(self, attr, value)
except KeyError:
# Variable was removed, skip
pass
async def forward_with_variables(self, *args, **kwargs):
"""Forward method that syncs variables before execution."""
await self.sync_variables()
return super().forward(*args, **kwargs)
# Example: Variable-aware Predict module
class VariableAwarePredict(VariableAwareMixin, dspy.Predict):
"""Predict module with variable support."""
async def forward(self, *args, **kwargs):
"""Override forward to sync variables."""
await self.sync_variables()
return super().forward(*args, **kwargs)
# Example: Variable-aware ChainOfThought
class VariableAwareChainOfThought(VariableAwareMixin, dspy.ChainOfThought):
"""ChainOfThought module with variable support."""
async def forward(self, *args, **kwargs):
await self.sync_variables()
return super().forward(*args, **kwargs)
Step 4.2: Module-Type Variables
# python/dspex_bridge/module_variables.py
from typing import Type, Dict, Any
import dspy
from .session_context import SessionContext
class ModuleVariableResolver:
"""Resolves module-type variables to actual DSPy module classes."""
# Registry of available modules
MODULE_REGISTRY = {
'Predict': dspy.Predict,
'ChainOfThought': dspy.ChainOfThought,
'ReAct': dspy.ReAct,
'ProgramOfThought': dspy.ProgramOfThought,
}
def __init__(self, session_context: SessionContext):
self.session_context = session_context
async def resolve_module(self, variable_name: str) -> Type[dspy.Module]:
"""
Resolve a module-type variable to a DSPy module class.
Args:
variable_name: Name of the module-type variable
Returns:
DSPy module class
"""
module_name = await self.session_context.get_variable(variable_name)
if module_name not in self.MODULE_REGISTRY:
raise ValueError(f"Unknown module type: {module_name}")
return self.MODULE_REGISTRY[module_name]
async def create_module(self, variable_name: str, *args, **kwargs) -> dspy.Module:
"""
Create a module instance from a module-type variable.
Args:
variable_name: Name of the module-type variable
*args, **kwargs: Arguments for module constructor
Returns:
Module instance
"""
module_class = await self.resolve_module(variable_name)
# Make it variable-aware if possible
if hasattr(module_class, '__name__'):
# Try to get variable-aware version
var_aware_name = f"VariableAware{module_class.__name__}"
var_aware_class = globals().get(var_aware_name, module_class)
if var_aware_class != module_class:
return var_aware_class(*args, session_context=self.session_context, **kwargs)
return module_class(*args, **kwargs)
Phase 5: Complete Example (Week 3)
Here’s a complete example showing the unified system in action:
# example_unified_usage.py
import asyncio
import grpc
from dspex_bridge import SessionContext, VariableAwareChainOfThought
async def main():
# Connect to DSPex bridge
channel = grpc.aio.insecure_channel('localhost:50051')
# Initialize session (assume session created in Elixir)
session = SessionContext('session_123', channel)
# Set some variables
await session.set_variable('temperature', 0.7)
await session.set_variable('reasoning_style', 'detailed')
await session.set_variable('max_tokens', 256)
# Create variable-aware DSPy module
cot = VariableAwareChainOfThought(
"question -> reasoning, answer",
session_context=session
)
# Bind module parameters to variables
await cot.bind_to_variable('temperature', 'temperature')
await cot.bind_to_variable('max_tokens', 'max_tokens')
# Use the module - it automatically uses current variable values
result = await cot.forward(question="What causes rain?")
print(f"Reasoning: {result.reasoning}")
print(f"Answer: {result.answer}")
# Watch for variable changes
async def watch_temperature():
async for update in session.watch_variables(['temperature']):
print(f"Temperature changed to: {update['value']}")
# Module will use new value on next call
# Start watching in background
watch_task = asyncio.create_task(watch_temperature())
# From Elixir side, temperature could be updated by an optimizer
# The module will automatically use the new value
await asyncio.sleep(10) # Keep running to see updates
watch_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Elixir Side Usage:
# Create session with tools and variables
{:ok, session_id} = SessionStore.create_session()
# Register a tool
:ok = SessionStore.register_tool(session_id, "web_search", %{
name: "web_search",
description: "Search the web",
parameters: [...],
handler: &WebSearch.execute/2
})
# Register variables
{:ok, temp_id} = SessionStore.register_variable(
session_id, :temperature, :float, 0.7,
constraints: %{min: 0.0, max: 2.0}
)
{:ok, style_id} = SessionStore.register_variable(
session_id, :reasoning_style, :choice, "concise",
constraints: %{choices: ["concise", "detailed", "academic"]}
)
# Variables can be optimized
Task.start(fn ->
for temp <- [0.3, 0.5, 0.7, 0.9, 1.1] do
Process.sleep(5000)
SessionStore.update_variable(session_id, temp_id, temp, %{
source: "optimizer",
iteration: temp
})
end
end)
Testing During Implementation
Unit Tests for Each Component:
# test/session_store_variables_test.exs
defmodule DSPex.Bridge.SessionStoreVariablesTest do
use ExUnit.Case
test "register and retrieve variable" do
{:ok, session_id} = SessionStore.create_session()
{:ok, var_id} = SessionStore.register_variable(
session_id, :test_var, :float, 1.5
)
{:ok, variable} = SessionStore.get_variable(session_id, var_id)
assert variable.value == 1.5
assert variable.type == :float
end
end
Integration Tests:
# test_unified_bridge.py
import pytest
import asyncio
from dspex_bridge import SessionContext
@pytest.mark.asyncio
async def test_variable_tool_integration(session_context):
# Set a variable
await session_context.set_variable('quality_threshold', 0.8)
# Create a tool that uses the variable
search_tool = session_context.create_variable_aware_tool(
'search',
{'min_quality': 'quality_threshold'}
)
# Tool should use the variable value
results = await search_tool("DSPy framework")
assert len(results) > 0
Common Pitfalls and Solutions
- Cache Invalidation: Always invalidate cache when variables are updated externally
- Type Mismatches: Ensure consistent type handling between Elixir and Python
- Streaming Lifecycle: Properly handle streaming RPC cleanup
- Variable Scope: Remember variables are session-scoped, not global
Next Steps
After implementing the unified bridge:
- Add batch variable operations for performance
- Implement variable dependency tracking
- Add support for complex types (embeddings, tensors)
- Build optimization framework on top
- Create higher-level abstractions for common patterns