Implementation Examples
Overview
This document provides concrete implementation examples for different ML framework bridges using the modular architecture. Each example shows how to create a complete bridge for a specific framework.
Example 1: DSPy Bridge (Refactored)
Python Implementation
# priv/python/dspy_bridge.py
import dspy
from base_bridge import BaseBridge
from typing import Dict, Any, Optional
import uuid
class DSPyBridge(BaseBridge):
"""Bridge implementation for DSPy framework"""
def _initialize_framework(self) -> None:
"""Initialize DSPy-specific components"""
self.programs = {}
self.lm_configured = False
def _register_handlers(self) -> Dict[str, Callable]:
"""Register DSPy-specific command handlers"""
return {
# Common handlers from base
'ping': self.ping,
'get_stats': self.get_stats,
'get_info': self.get_info,
'cleanup': self.cleanup,
# DSPy-specific handlers
'configure_lm': self.configure_lm,
'create_program': self.create_program,
'execute_program': self.execute_program,
'create_signature': self.create_signature,
'list_programs': self.list_programs,
'delete_program': self.delete_program
}
def get_framework_info(self) -> Dict[str, Any]:
"""Return DSPy framework information"""
return {
'name': 'dspy',
'version': dspy.__version__,
'capabilities': [
'signatures',
'programs',
'language_models',
'chain_of_thought',
'retrieval'
],
'supported_models': [
'gemini',
'openai',
'anthropic',
'cohere'
]
}
def configure_lm(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Configure DSPy language model"""
lm_type = args.get('type', 'gemini')
if lm_type == 'gemini':
lm = dspy.Google(
model=args.get('model', 'gemini-1.5-flash'),
api_key=args.get('api_key'),
temperature=args.get('temperature', 0.7)
)
elif lm_type == 'openai':
lm = dspy.OpenAI(
model=args.get('model', 'gpt-4'),
api_key=args.get('api_key'),
temperature=args.get('temperature', 0.7)
)
else:
raise ValueError(f"Unsupported LM type: {lm_type}")
dspy.settings.configure(lm=lm)
self.lm_configured = True
return {
'status': 'configured',
'lm_type': lm_type,
'model': args.get('model')
}
def create_signature(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a DSPy signature dynamically"""
signature_config = args['signature']
# Create signature class
class_name = signature_config['name']
fields = {}
# Add input fields
for field_name, field_config in signature_config.get('inputs', {}).items():
fields[field_name] = dspy.InputField(
desc=field_config.get('description', ''),
prefix=field_config.get('prefix'),
format=field_config.get('format')
)
# Add output fields
for field_name, field_config in signature_config.get('outputs', {}).items():
fields[field_name] = dspy.OutputField(
desc=field_config.get('description', ''),
prefix=field_config.get('prefix'),
format=field_config.get('format')
)
# Create signature class
signature_class = type(class_name, (dspy.Signature,), fields)
# Store for later use
signature_id = str(uuid.uuid4())
self.signatures[signature_id] = signature_class
return {'signature_id': signature_id}
def create_program(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a DSPy program"""
if not self.lm_configured:
raise RuntimeError("Language model not configured")
signature = args.get('signature')
program_type = args.get('type', 'predict')
if program_type == 'predict':
if isinstance(signature, str):
# Use stored signature
signature_class = self.signatures.get(signature)
if not signature_class:
raise ValueError(f"Signature not found: {signature}")
else:
# Create inline signature
signature_class = self._create_signature_class(signature)
program = dspy.Predict(signature_class)
elif program_type == 'chain_of_thought':
program = dspy.ChainOfThought(signature_class)
elif program_type == 'retrieve':
# Retrieval-augmented program
retriever = self._create_retriever(args.get('retriever_config', {}))
program = dspy.Retrieve(retriever)
else:
raise ValueError(f"Unsupported program type: {program_type}")
# Store program
program_id = str(uuid.uuid4())
self.programs[program_id] = program
return {
'program_id': program_id,
'type': program_type
}
def execute_program(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a DSPy program"""
program_id = args['program_id']
inputs = args['inputs']
program = self.programs.get(program_id)
if not program:
raise ValueError(f"Program not found: {program_id}")
# Execute program
result = program(**inputs)
# Extract outputs
outputs = {}
for key, value in result.items():
if hasattr(value, '__dict__'):
outputs[key] = str(value)
else:
outputs[key] = value
return {
'outputs': outputs,
'program_id': program_id
}
def list_programs(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""List all programs"""
return {
'programs': list(self.programs.keys()),
'count': len(self.programs)
}
def delete_program(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Delete a program"""
program_id = args['program_id']
if program_id in self.programs:
del self.programs[program_id]
return {'status': 'deleted', 'program_id': program_id}
else:
raise ValueError(f"Program not found: {program_id}")
def cleanup(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Cleanup resources"""
program_count = len(self.programs)
self.programs.clear()
self.signatures.clear()
return {
'status': 'cleaned',
'programs_cleared': program_count
}
# Main entry point
if __name__ == "__main__":
import sys
# Check if running as pool worker
mode = "standalone"
worker_id = None
if len(sys.argv) > 1 and sys.argv[1] == "--pool-worker":
mode = "pool_worker"
if len(sys.argv) > 2:
worker_id = sys.argv[2]
# Create and run bridge
bridge = DSPyBridge(mode=mode, worker_id=worker_id)
bridge.run()
Elixir Adapter
defmodule DSPex.Adapters.DSPyAdapter do
@moduledoc """
Adapter for DSPy ML framework
"""
use DSPex.Adapters.BaseMLAdapter
alias DSPex.PythonBridge.SessionPoolV2
# DSPy-specific types
@type signature :: %{
name: String.t(),
inputs: map(),
outputs: map()
}
@type program_type :: :predict | :chain_of_thought | :retrieve
# Implement required callbacks
@impl true
def get_framework_info do
call_bridge("get_info", %{})
end
@impl true
def validate_environment do
# Check for required API keys
case System.get_env("GEMINI_API_KEY") do
nil -> {:error, "GEMINI_API_KEY environment variable not set"}
_ -> :ok
end
end
@impl true
def initialize(options) do
# Configure default LM if API key is available
if api_key = System.get_env("GEMINI_API_KEY") do
configure_lm(%{
type: "gemini",
model: Keyword.get(options, :model, "gemini-1.5-flash"),
api_key: api_key,
temperature: Keyword.get(options, :temperature, 0.7)
})
end
{:ok, %{initialized: true}}
end
# DSPy-specific functions
@doc """
Configure the language model for DSPy
"""
def configure_lm(config, options \\ []) do
call_bridge("configure_lm", config, options)
end
@doc """
Create a DSPy signature
"""
def create_signature(signature, options \\ []) do
call_bridge("create_signature", %{signature: signature}, options)
end
@doc """
Create a DSPy program from a signature
"""
def create_program(signature, type \\ :predict, options \\ []) do
args = %{
signature: signature,
type: type
}
call_bridge("create_program", args, options)
end
@doc """
Execute a DSPy program
"""
def execute_program(program_id, inputs, options \\ []) do
args = %{
program_id: program_id,
inputs: inputs
}
with {:ok, result} <- call_bridge("execute_program", args, options) do
{:ok, result["outputs"]}
end
end
@doc """
List all programs
"""
def list_programs(options \\ []) do
call_bridge("list_programs", %{}, options)
end
@doc """
Delete a program
"""
def delete_program(program_id, options \\ []) do
call_bridge("delete_program", %{program_id: program_id}, options)
end
# High-level convenience functions
@doc """
Create and execute a program in one call
"""
def predict(signature, inputs, options \\ []) do
with {:ok, %{"program_id" => program_id}} <- create_program(signature, :predict, options),
{:ok, outputs} <- execute_program(program_id, inputs, options),
{:ok, _} <- delete_program(program_id, options) do
{:ok, outputs}
end
end
@doc """
Chain of thought reasoning
"""
def chain_of_thought(signature, inputs, options \\ []) do
with {:ok, %{"program_id" => program_id}} <- create_program(signature, :chain_of_thought, options),
{:ok, outputs} <- execute_program(program_id, inputs, options),
{:ok, _} <- delete_program(program_id, options) do
{:ok, outputs}
end
end
end
Example 2: LangChain Bridge
Python Implementation
# priv/python/langchain_bridge.py
from langchain import __version__ as langchain_version
from langchain.chat_models import ChatOpenAI, ChatAnthropic
from langchain.chains import LLMChain, ConversationChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.agents import initialize_agent, Tool
from langchain.tools import DuckDuckGoSearchRun
from base_bridge import BaseBridge
from typing import Dict, Any, Optional
import uuid
class LangChainBridge(BaseBridge):
"""Bridge implementation for LangChain framework"""
def _initialize_framework(self) -> None:
"""Initialize LangChain-specific components"""
self.chains = {}
self.agents = {}
self.memories = {}
self.tools = {}
self.llm = None
def _register_handlers(self) -> Dict[str, Callable]:
"""Register LangChain-specific command handlers"""
return {
# Common handlers
'ping': self.ping,
'get_stats': self.get_stats,
'get_info': self.get_info,
'cleanup': self.cleanup,
# LangChain-specific handlers
'configure_llm': self.configure_llm,
'create_chain': self.create_chain,
'execute_chain': self.execute_chain,
'create_agent': self.create_agent,
'execute_agent': self.execute_agent,
'create_memory': self.create_memory,
'add_tool': self.add_tool,
'list_chains': self.list_chains,
'list_agents': self.list_agents,
'delete_chain': self.delete_chain,
'delete_agent': self.delete_agent
}
def get_framework_info(self) -> Dict[str, Any]:
"""Return LangChain framework information"""
return {
'name': 'langchain',
'version': langchain_version,
'capabilities': [
'chains',
'agents',
'tools',
'memory',
'prompts',
'streaming'
],
'supported_models': [
'openai',
'anthropic',
'huggingface',
'cohere'
]
}
def configure_llm(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Configure LangChain LLM"""
llm_type = args.get('type', 'openai')
if llm_type == 'openai':
self.llm = ChatOpenAI(
model_name=args.get('model', 'gpt-4'),
temperature=args.get('temperature', 0.7),
openai_api_key=args.get('api_key'),
streaming=args.get('streaming', False)
)
elif llm_type == 'anthropic':
self.llm = ChatAnthropic(
model=args.get('model', 'claude-2'),
temperature=args.get('temperature', 0.7),
anthropic_api_key=args.get('api_key')
)
else:
raise ValueError(f"Unsupported LLM type: {llm_type}")
return {
'status': 'configured',
'llm_type': llm_type,
'model': args.get('model')
}
def create_chain(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a LangChain chain"""
if not self.llm:
raise RuntimeError("LLM not configured")
chain_type = args.get('type', 'llm')
if chain_type == 'llm':
# Basic LLM chain
prompt = PromptTemplate(
input_variables=args.get('input_variables', ['input']),
template=args.get('template', '{input}')
)
chain = LLMChain(llm=self.llm, prompt=prompt)
elif chain_type == 'conversation':
# Conversation chain with memory
memory_id = args.get('memory_id')
if memory_id:
memory = self.memories.get(memory_id)
else:
memory = ConversationBufferMemory()
chain = ConversationChain(
llm=self.llm,
memory=memory,
verbose=args.get('verbose', False)
)
else:
raise ValueError(f"Unsupported chain type: {chain_type}")
# Store chain
chain_id = str(uuid.uuid4())
self.chains[chain_id] = chain
return {
'chain_id': chain_id,
'type': chain_type
}
def execute_chain(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a LangChain chain"""
chain_id = args['chain_id']
inputs = args['inputs']
chain = self.chains.get(chain_id)
if not chain:
raise ValueError(f"Chain not found: {chain_id}")
# Execute chain
result = chain.run(**inputs)
return {
'output': result,
'chain_id': chain_id
}
def create_agent(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a LangChain agent"""
if not self.llm:
raise RuntimeError("LLM not configured")
agent_type = args.get('type', 'zero-shot-react-description')
tool_ids = args.get('tool_ids', [])
# Gather tools
tools = []
for tool_id in tool_ids:
if tool_id in self.tools:
tools.append(self.tools[tool_id])
# Create agent
agent = initialize_agent(
tools=tools,
llm=self.llm,
agent=agent_type,
verbose=args.get('verbose', False)
)
# Store agent
agent_id = str(uuid.uuid4())
self.agents[agent_id] = agent
return {
'agent_id': agent_id,
'type': agent_type,
'tools': tool_ids
}
def execute_agent(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a LangChain agent"""
agent_id = args['agent_id']
input_text = args['input']
agent = self.agents.get(agent_id)
if not agent:
raise ValueError(f"Agent not found: {agent_id}")
# Execute agent
result = agent.run(input_text)
return {
'output': result,
'agent_id': agent_id
}
def create_memory(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a memory instance"""
memory_type = args.get('type', 'buffer')
if memory_type == 'buffer':
memory = ConversationBufferMemory()
else:
raise ValueError(f"Unsupported memory type: {memory_type}")
memory_id = str(uuid.uuid4())
self.memories[memory_id] = memory
return {
'memory_id': memory_id,
'type': memory_type
}
def add_tool(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Add a tool for agents"""
tool_type = args.get('type')
if tool_type == 'search':
tool = Tool(
name="Search",
func=DuckDuckGoSearchRun().run,
description="Search the web for information"
)
else:
# Custom tool
tool = Tool(
name=args.get('name'),
func=lambda x: f"Mock result for: {x}", # Would be actual implementation
description=args.get('description')
)
tool_id = str(uuid.uuid4())
self.tools[tool_id] = tool
return {
'tool_id': tool_id,
'name': tool.name
}
def list_chains(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""List all chains"""
return {
'chains': list(self.chains.keys()),
'count': len(self.chains)
}
def list_agents(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""List all agents"""
return {
'agents': list(self.agents.keys()),
'count': len(self.agents)
}
def delete_chain(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Delete a chain"""
chain_id = args['chain_id']
if chain_id in self.chains:
del self.chains[chain_id]
return {'status': 'deleted', 'chain_id': chain_id}
else:
raise ValueError(f"Chain not found: {chain_id}")
def delete_agent(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Delete an agent"""
agent_id = args['agent_id']
if agent_id in self.agents:
del self.agents[agent_id]
return {'status': 'deleted', 'agent_id': agent_id}
else:
raise ValueError(f"Agent not found: {agent_id}")
def cleanup(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Cleanup resources"""
chains_count = len(self.chains)
agents_count = len(self.agents)
self.chains.clear()
self.agents.clear()
self.memories.clear()
self.tools.clear()
return {
'status': 'cleaned',
'chains_cleared': chains_count,
'agents_cleared': agents_count
}
if __name__ == "__main__":
import sys
mode = "standalone"
worker_id = None
if len(sys.argv) > 1 and sys.argv[1] == "--pool-worker":
mode = "pool_worker"
if len(sys.argv) > 2:
worker_id = sys.argv[2]
bridge = LangChainBridge(mode=mode, worker_id=worker_id)
bridge.run()
Elixir Adapter
defmodule DSPex.Adapters.LangChainAdapter do
@moduledoc """
Adapter for LangChain framework
"""
use DSPex.Adapters.BaseMLAdapter
# LangChain-specific types
@type chain_type :: :llm | :conversation | :sequential
@type agent_type :: :"zero-shot-react-description" | :"conversational-react-description"
@type memory_type :: :buffer | :window | :summary
@impl true
def get_framework_info do
call_bridge("get_info", %{})
end
@impl true
def validate_environment do
# Check for at least one API key
cond do
System.get_env("OPENAI_API_KEY") -> :ok
System.get_env("ANTHROPIC_API_KEY") -> :ok
true -> {:error, "No LLM API key found (OPENAI_API_KEY or ANTHROPIC_API_KEY)"}
end
end
@impl true
def initialize(options) do
# Configure default LLM if API key is available
llm_config =
cond do
api_key = System.get_env("OPENAI_API_KEY") ->
%{
type: "openai",
model: Keyword.get(options, :model, "gpt-4"),
api_key: api_key,
temperature: Keyword.get(options, :temperature, 0.7),
streaming: Keyword.get(options, :streaming, false)
}
api_key = System.get_env("ANTHROPIC_API_KEY") ->
%{
type: "anthropic",
model: Keyword.get(options, :model, "claude-2"),
api_key: api_key,
temperature: Keyword.get(options, :temperature, 0.7)
}
true ->
nil
end
if llm_config do
configure_llm(llm_config)
end
{:ok, %{initialized: true}}
end
# LangChain-specific functions
@doc """
Configure the LLM for LangChain
"""
def configure_llm(config, options \\ []) do
call_bridge("configure_llm", config, options)
end
@doc """
Create a LangChain chain
"""
def create_chain(type, config, options \\ []) do
args = Map.merge(config, %{type: type})
call_bridge("create_chain", args, options)
end
@doc """
Execute a chain
"""
def execute_chain(chain_id, inputs, options \\ []) do
args = %{
chain_id: chain_id,
inputs: inputs
}
with {:ok, result} <- call_bridge("execute_chain", args, options) do
{:ok, result["output"]}
end
end
@doc """
Create an agent with tools
"""
def create_agent(type, tool_ids, config \\ %{}, options \\ []) do
args = Map.merge(config, %{
type: type,
tool_ids: tool_ids
})
call_bridge("create_agent", args, options)
end
@doc """
Execute an agent
"""
def execute_agent(agent_id, input, options \\ []) do
args = %{
agent_id: agent_id,
input: input
}
with {:ok, result} <- call_bridge("execute_agent", args, options) do
{:ok, result["output"]}
end
end
@doc """
Create a memory instance
"""
def create_memory(type \\ :buffer, options \\ []) do
call_bridge("create_memory", %{type: type}, options)
end
@doc """
Add a tool for agents
"""
def add_tool(name, description, type \\ :custom, options \\ []) do
args = %{
name: name,
description: description,
type: type
}
call_bridge("add_tool", args, options)
end
@doc """
List all chains
"""
def list_chains(options \\ []) do
call_bridge("list_chains", %{}, options)
end
@doc """
List all agents
"""
def list_agents(options \\ []) do
call_bridge("list_agents", %{}, options)
end
# High-level convenience functions
@doc """
Simple question-answering
"""
def ask(question, options \\ []) do
with {:ok, %{"chain_id" => chain_id}} <- create_chain(:llm, %{
template: "{input}",
input_variables: ["input"]
}, options),
{:ok, answer} <- execute_chain(chain_id, %{input: question}, options),
{:ok, _} <- delete_chain(chain_id, options) do
{:ok, answer}
end
end
@doc """
Conversational chat with memory
"""
def chat(session_id, message, options \\ []) do
# Use session_id to maintain conversation memory
options = Keyword.put(options, :session_id, session_id)
# Check if conversation chain exists in session
case get_session_data(session_id, :chain_id) do
nil ->
# Create new conversation chain
with {:ok, %{"memory_id" => memory_id}} <- create_memory(:buffer, options),
{:ok, %{"chain_id" => chain_id}} <- create_chain(:conversation, %{
memory_id: memory_id
}, options) do
# Store in session
put_session_data(session_id, :chain_id, chain_id)
put_session_data(session_id, :memory_id, memory_id)
# Execute
execute_chain(chain_id, %{input: message}, options)
end
chain_id ->
# Use existing chain
execute_chain(chain_id, %{input: message}, options)
end
end
@doc """
Research agent with web search
"""
def research(topic, options \\ []) do
with {:ok, %{"tool_id" => search_tool}} <- add_tool(
"Web Search",
"Search the web for information",
:search,
options
),
{:ok, %{"agent_id" => agent_id}} <- create_agent(
:"zero-shot-react-description",
[search_tool],
%{verbose: true},
options
),
{:ok, result} <- execute_agent(
agent_id,
"Research the following topic and provide a summary: #{topic}",
options
),
{:ok, _} <- delete_agent(agent_id, options) do
{:ok, result}
end
end
# Helper functions
defp get_session_data(session_id, key) do
# Would integrate with DSPex session management
DSPex.PythonBridge.SessionStore.get(session_id, key)
end
defp put_session_data(session_id, key, value) do
DSPex.PythonBridge.SessionStore.put(session_id, key, value)
end
defp delete_chain(chain_id, options) do
call_bridge("delete_chain", %{chain_id: chain_id}, options)
end
defp delete_agent(agent_id, options) do
call_bridge("delete_agent", %{agent_id: agent_id}, options)
end
end
Example 3: Custom ML Bridge Template
Python Implementation
# priv/python/custom_ml_bridge.py
from base_bridge import BaseBridge
from typing import Dict, Any, Optional, Callable
import uuid
class CustomMLBridge(BaseBridge):
"""
Template for creating custom ML framework bridges.
This example shows how to integrate a hypothetical ML framework
that deals with custom models and predictions.
"""
def _initialize_framework(self) -> None:
"""Initialize your framework-specific components"""
# Example: Initialize model registry, connections, etc.
self.models = {}
self.predictions = {}
self.datasets = {}
# Initialize your ML framework here
# import your_ml_framework
# self.framework = your_ml_framework.initialize()
def _register_handlers(self) -> Dict[str, Callable]:
"""Register your framework-specific command handlers"""
return {
# Required common handlers
'ping': self.ping,
'get_stats': self.get_stats,
'get_info': self.get_info,
'cleanup': self.cleanup,
# Your framework-specific handlers
'load_model': self.load_model,
'train_model': self.train_model,
'predict': self.predict,
'evaluate': self.evaluate,
'save_model': self.save_model,
'load_dataset': self.load_dataset,
'preprocess': self.preprocess,
'list_models': self.list_models,
'delete_model': self.delete_model,
# Add more handlers as needed
}
def get_framework_info(self) -> Dict[str, Any]:
"""Return information about your framework"""
return {
'name': 'custom_ml',
'version': '1.0.0', # Your framework version
'capabilities': [
'model_training',
'prediction',
'evaluation',
'preprocessing',
'model_persistence'
],
'supported_models': [
'linear_regression',
'random_forest',
'neural_network',
'custom_model'
],
'requirements': [
'numpy',
'scikit-learn',
'your-ml-library'
]
}
def load_model(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Load a pre-trained model"""
model_path = args.get('path')
model_type = args.get('type', 'auto')
# Example implementation
try:
# Load your model here
# model = your_framework.load_model(model_path, model_type)
# For demo purposes, create a mock model
model = {
'type': model_type,
'path': model_path,
'loaded_at': datetime.utcnow().isoformat()
}
model_id = str(uuid.uuid4())
self.models[model_id] = model
return {
'model_id': model_id,
'type': model_type,
'status': 'loaded'
}
except Exception as e:
raise RuntimeError(f"Failed to load model: {str(e)}")
def train_model(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Train a new model"""
model_type = args.get('type')
dataset_id = args.get('dataset_id')
hyperparameters = args.get('hyperparameters', {})
# Validate dataset exists
if dataset_id not in self.datasets:
raise ValueError(f"Dataset not found: {dataset_id}")
dataset = self.datasets[dataset_id]
# Train model
# model = your_framework.train(
# model_type=model_type,
# data=dataset,
# **hyperparameters
# )
# Mock implementation
model = {
'type': model_type,
'dataset_id': dataset_id,
'hyperparameters': hyperparameters,
'trained_at': datetime.utcnow().isoformat(),
'metrics': {
'accuracy': 0.95,
'loss': 0.05
}
}
model_id = str(uuid.uuid4())
self.models[model_id] = model
return {
'model_id': model_id,
'metrics': model['metrics'],
'status': 'trained'
}
def predict(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Make predictions using a model"""
model_id = args.get('model_id')
inputs = args.get('inputs')
options = args.get('options', {})
# Validate model exists
if model_id not in self.models:
raise ValueError(f"Model not found: {model_id}")
model = self.models[model_id]
# Make predictions
# predictions = model.predict(inputs, **options)
# Mock implementation
predictions = {
'values': [0.8, 0.2] if isinstance(inputs, list) else 0.8,
'confidence': 0.95,
'model_id': model_id
}
# Store prediction for tracking
prediction_id = str(uuid.uuid4())
self.predictions[prediction_id] = {
'model_id': model_id,
'inputs': inputs,
'outputs': predictions,
'timestamp': datetime.utcnow().isoformat()
}
return {
'prediction_id': prediction_id,
'predictions': predictions['values'],
'confidence': predictions['confidence']
}
def evaluate(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate model performance"""
model_id = args.get('model_id')
dataset_id = args.get('dataset_id')
metrics = args.get('metrics', ['accuracy', 'precision', 'recall'])
# Validate inputs
if model_id not in self.models:
raise ValueError(f"Model not found: {model_id}")
if dataset_id not in self.datasets:
raise ValueError(f"Dataset not found: {dataset_id}")
# Evaluate model
# results = your_framework.evaluate(
# model=self.models[model_id],
# data=self.datasets[dataset_id],
# metrics=metrics
# )
# Mock implementation
results = {
'accuracy': 0.94,
'precision': 0.92,
'recall': 0.96,
'f1_score': 0.94
}
return {
'model_id': model_id,
'dataset_id': dataset_id,
'metrics': results
}
def save_model(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Save a model to disk"""
model_id = args.get('model_id')
path = args.get('path')
format = args.get('format', 'native')
if model_id not in self.models:
raise ValueError(f"Model not found: {model_id}")
# Save model
# your_framework.save_model(
# model=self.models[model_id],
# path=path,
# format=format
# )
return {
'model_id': model_id,
'path': path,
'format': format,
'status': 'saved'
}
def load_dataset(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Load a dataset for training/evaluation"""
source = args.get('source')
format = args.get('format', 'csv')
options = args.get('options', {})
# Load dataset
# dataset = your_framework.load_data(
# source=source,
# format=format,
# **options
# )
# Mock implementation
dataset = {
'source': source,
'format': format,
'shape': (1000, 10),
'features': ['feature1', 'feature2', '...'],
'loaded_at': datetime.utcnow().isoformat()
}
dataset_id = str(uuid.uuid4())
self.datasets[dataset_id] = dataset
return {
'dataset_id': dataset_id,
'shape': dataset['shape'],
'features': dataset['features']
}
def preprocess(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Preprocess data"""
dataset_id = args.get('dataset_id')
operations = args.get('operations', [])
if dataset_id not in self.datasets:
raise ValueError(f"Dataset not found: {dataset_id}")
# Apply preprocessing
# processed_data = your_framework.preprocess(
# data=self.datasets[dataset_id],
# operations=operations
# )
# Create new dataset with processed data
processed_dataset = {
'original_id': dataset_id,
'operations': operations,
'shape': (950, 12), # Mock: some rows removed, features added
'processed_at': datetime.utcnow().isoformat()
}
new_dataset_id = str(uuid.uuid4())
self.datasets[new_dataset_id] = processed_dataset
return {
'dataset_id': new_dataset_id,
'original_dataset_id': dataset_id,
'operations_applied': operations,
'shape': processed_dataset['shape']
}
def list_models(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""List all loaded models"""
model_list = []
for model_id, model in self.models.items():
model_list.append({
'id': model_id,
'type': model.get('type'),
'created_at': model.get('trained_at') or model.get('loaded_at')
})
return {
'models': model_list,
'count': len(model_list)
}
def delete_model(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Delete a model from memory"""
model_id = args.get('model_id')
if model_id in self.models:
del self.models[model_id]
return {'status': 'deleted', 'model_id': model_id}
else:
raise ValueError(f"Model not found: {model_id}")
def cleanup(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Cleanup all resources"""
models_count = len(self.models)
datasets_count = len(self.datasets)
predictions_count = len(self.predictions)
# Clear all data
self.models.clear()
self.datasets.clear()
self.predictions.clear()
# Cleanup framework resources
# your_framework.cleanup()
return {
'status': 'cleaned',
'models_cleared': models_count,
'datasets_cleared': datasets_count,
'predictions_cleared': predictions_count
}
if __name__ == "__main__":
import sys
from datetime import datetime
mode = "standalone"
worker_id = None
if len(sys.argv) > 1 and sys.argv[1] == "--pool-worker":
mode = "pool_worker"
if len(sys.argv) > 2:
worker_id = sys.argv[2]
bridge = CustomMLBridge(mode=mode, worker_id=worker_id)
bridge.run()
Elixir Adapter
defmodule DSPex.Adapters.CustomMLAdapter do
@moduledoc """
Template adapter for custom ML frameworks.
This shows how to create an adapter for your own ML framework
while leveraging DSPex infrastructure.
"""
use DSPex.Adapters.BaseMLAdapter
# Define your framework-specific types
@type model_type :: :linear_regression | :random_forest | :neural_network | :custom
@type dataset_format :: :csv | :json | :parquet | :numpy
@type preprocessing_op :: :normalize | :standardize | :encode | :impute
@impl true
def get_framework_info do
call_bridge("get_info", %{})
end
@impl true
def validate_environment do
# Add your framework-specific validation
# For example, check for required files, libraries, etc.
:ok
end
@impl true
def initialize(options) do
# Initialize your framework
# This is called when the adapter starts
# Example: Set default configuration
config = %{
cache_models: Keyword.get(options, :cache_models, true),
auto_preprocessing: Keyword.get(options, :auto_preprocessing, false),
default_model_type: Keyword.get(options, :default_model_type, :random_forest)
}
{:ok, config}
end
# Model Management Functions
@doc """
Load a pre-trained model from disk
"""
def load_model(path, type \\ :auto, options \\ []) do
args = %{
path: path,
type: type
}
call_bridge("load_model", args, options)
end
@doc """
Train a new model
"""
def train_model(type, dataset_id, hyperparameters \\ %{}, options \\ []) do
args = %{
type: type,
dataset_id: dataset_id,
hyperparameters: hyperparameters
}
call_bridge("train_model", args, options)
end
@doc """
Make predictions with a model
"""
def predict(model_id, inputs, options \\ []) do
args = %{
model_id: model_id,
inputs: inputs,
options: options
}
with {:ok, result} <- call_bridge("predict", args, options) do
{:ok, %{
predictions: result["predictions"],
confidence: result["confidence"],
prediction_id: result["prediction_id"]
}}
end
end
@doc """
Evaluate model performance
"""
def evaluate(model_id, dataset_id, metrics \\ nil, options \\ []) do
args = %{
model_id: model_id,
dataset_id: dataset_id
}
if metrics do
args = Map.put(args, :metrics, metrics)
end
call_bridge("evaluate", args, options)
end
@doc """
Save a model to disk
"""
def save_model(model_id, path, format \\ :native, options \\ []) do
args = %{
model_id: model_id,
path: path,
format: format
}
call_bridge("save_model", args, options)
end
# Data Management Functions
@doc """
Load a dataset
"""
def load_dataset(source, format \\ :csv, options \\ []) do
args = %{
source: source,
format: format,
options: options
}
call_bridge("load_dataset", args, options)
end
@doc """
Preprocess a dataset
"""
def preprocess(dataset_id, operations, options \\ []) do
args = %{
dataset_id: dataset_id,
operations: operations
}
call_bridge("preprocess", args, options)
end
# Query Functions
@doc """
List all loaded models
"""
def list_models(options \\ []) do
with {:ok, result} <- call_bridge("list_models", %{}, options) do
{:ok, result["models"]}
end
end
@doc """
Delete a model
"""
def delete_model(model_id, options \\ []) do
call_bridge("delete_model", %{model_id: model_id}, options)
end
# High-Level Convenience Functions
@doc """
Train and evaluate a model in one call
"""
def train_and_evaluate(type, train_dataset_id, test_dataset_id, hyperparameters \\ %{}, options \\ []) do
with {:ok, %{"model_id" => model_id}} <- train_model(type, train_dataset_id, hyperparameters, options),
{:ok, metrics} <- evaluate(model_id, test_dataset_id, nil, options) do
{:ok, %{
model_id: model_id,
metrics: metrics["metrics"]
}}
end
end
@doc """
Quick prediction - load model, predict, and cleanup
"""
def quick_predict(model_path, inputs, options \\ []) do
with {:ok, %{"model_id" => model_id}} <- load_model(model_path, :auto, options),
{:ok, result} <- predict(model_id, inputs, options),
{:ok, _} <- delete_model(model_id, options) do
{:ok, result}
end
end
@doc """
Pipeline: load data, preprocess, train, evaluate
"""
def run_pipeline(data_source, model_type, preprocessing_ops, options \\ []) do
with {:ok, %{"dataset_id" => raw_dataset_id}} <- load_dataset(data_source, :csv, options),
{:ok, %{"dataset_id" => processed_dataset_id}} <- preprocess(raw_dataset_id, preprocessing_ops, options),
# Split dataset (this would be implemented in the bridge)
{:ok, %{"train_id" => train_id, "test_id" => test_id}} <-
call_bridge("split_dataset", %{dataset_id: processed_dataset_id, ratio: 0.8}, options),
# Train and evaluate
{:ok, result} <- train_and_evaluate(model_type, train_id, test_id, %{}, options) do
{:ok, result}
end
end
# Framework-Specific Extensions
@doc """
Example of framework-specific functionality
"""
def explain_prediction(model_id, input, options \\ []) do
# If your framework supports model explainability
args = %{
model_id: model_id,
input: input,
method: Keyword.get(options, :method, :shap)
}
call_bridge("explain_prediction", args, options)
end
@doc """
Hyperparameter tuning
"""
def tune_hyperparameters(model_type, dataset_id, param_grid, options \\ []) do
args = %{
model_type: model_type,
dataset_id: dataset_id,
param_grid: param_grid,
cv_folds: Keyword.get(options, :cv_folds, 5)
}
call_bridge("tune_hyperparameters", args, options)
end
end
Usage Examples
Using DSPy Adapter
# Direct adapter usage
alias DSPex.Adapters.DSPyAdapter
# Configure language model
DSPyAdapter.configure_lm(%{
type: "gemini",
model: "gemini-1.5-flash",
api_key: System.get_env("GEMINI_API_KEY")
})
# Create and use a signature
signature = %{
name: "QuestionAnswer",
inputs: %{
question: %{description: "A question to answer"}
},
outputs: %{
answer: %{description: "The answer to the question"}
}
}
{:ok, result} = DSPyAdapter.predict(
signature,
%{question: "What is the capital of France?"}
)
IO.puts("Answer: #{result["answer"]}")
Using LangChain Adapter
# Using the unified interface
alias DSPex.MLBridge
# Get LangChain adapter
{:ok, langchain} = MLBridge.get_adapter(:langchain)
# Simple Q&A
{:ok, answer} = langchain.ask("What is machine learning?")
# Conversational chat with memory
{:ok, response1} = langchain.chat("session_123", "Hello, my name is Alice")
{:ok, response2} = langchain.chat("session_123", "What's my name?")
# response2 will remember the name from the conversation
# Research agent
{:ok, research} = langchain.research("Recent advances in quantum computing")
Using Custom ML Adapter
# Custom ML workflow
alias DSPex.Adapters.CustomMLAdapter
# Load and use a model
{:ok, model_info} = CustomMLAdapter.load_model("/models/sentiment_classifier.pkl")
model_id = model_info["model_id"]
# Make predictions
{:ok, result} = CustomMLAdapter.predict(
model_id,
["This movie was fantastic!", "Terrible experience"]
)
# Run a complete pipeline
{:ok, pipeline_result} = CustomMLAdapter.run_pipeline(
"/data/training_data.csv",
:random_forest,
[:normalize, :encode]
)
Using Multiple Frameworks Together
# Use different frameworks for different tasks
defmodule MyApp.MLPipeline do
alias DSPex.MLBridge
def analyze_and_explain(text) do
# Use LangChain for analysis
{:ok, langchain} = MLBridge.get_adapter(:langchain)
{:ok, analysis} = langchain.ask("Analyze the sentiment of: #{text}")
# Use DSPy for structured extraction
{:ok, dspy} = MLBridge.get_adapter(:dspy)
{:ok, entities} = dspy.predict(
%{
name: "EntityExtraction",
inputs: %{text: %{description: "Text to extract entities from"}},
outputs: %{
entities: %{description: "List of entities"},
sentiment: %{description: "Overall sentiment"}
}
},
%{text: text}
)
# Combine results
%{
analysis: analysis,
entities: entities["entities"],
sentiment: entities["sentiment"]
}
end
end
These examples demonstrate how the modular architecture enables:
- Framework-specific interfaces that feel natural to users
- Easy integration of multiple frameworks
- Reuse of DSPex’s robust infrastructure
- Clean separation between frameworks