Here’s the complete MABEAM_API_REFERENCE.md in full:
# MABEAM API Reference
## Overview
This document provides a comprehensive API reference for the Foundation MABEAM (Multi-Agent BEAM) system. MABEAM is a universal multi-agent coordination kernel built on Foundation's infrastructure, providing distribution-ready agent lifecycle management, economic coordination protocols, and serializable data structures.
**Key Design Principles:**
- **Agent Identity Over Process Identity**: All APIs use durable `agent_id` instead of PIDs
- **Serialization-First Data Structures**: Every structure avoids non-serializable terms
- **Communication Abstraction**: Abstract local vs. remote calls from day one
- **Asynchronous Coordination**: All protocols are non-blocking state machines
- **Conflict Resolution Primitives**: Build distributed conflict handling locally first
## Table of Contents
1. [Core Types (`Foundation.MABEAM.Types`)](#core-types)
2. [Process Registry (`Foundation.MABEAM.ProcessRegistry`)](#process-registry)
3. [Core Orchestrator (`Foundation.MABEAM.Core`)](#core-orchestrator)
4. [Coordination Framework (`Foundation.MABEAM.Coordination`)](#coordination-framework)
5. [Auction Coordination (`Foundation.MABEAM.Coordination.Auction`)](#auction-coordination)
6. [Market Coordination (`Foundation.MABEAM.Coordination.Market`)](#market-coordination)
7. [Communication Layer (`Foundation.MABEAM.Comms`)](#communication-layer)
8. [Configuration](#configuration)
9. [Examples](#examples)
---
## Core Types
### `Foundation.MABEAM.Types`
Provides type definitions and utility functions for the MABEAM system. All structures are 100% serializable.
#### Key Types
```elixir
@type agent_id :: atom() | String.t()
@type process_reference :: {:agent, agent_id()}
@type agent_config :: %{
id: agent_id(),
module: module(), # The GenServer module to start
args: [term()], # Arguments for start_link
type: agent_type(), # Classification for coordination
capabilities: [atom()], # What this agent can do
restart_policy: restart_policy(), # How to handle crashes
resource_requirements: resource_spec(), # Resource needs
metadata: map(), # Extensible metadata
created_at: DateTime.t() # Timestamp when config was created
}
@type agent_type ::
:coordinator | :worker | :monitor | :resource_provider | :optimizer
@type restart_policy :: %{
strategy: :permanent | :temporary | :transient,
max_restarts: non_neg_integer(),
period_seconds: pos_integer(),
backoff_strategy: :linear | :exponential | :fixed
}
@type resource_spec :: %{
memory_mb: pos_integer(),
cpu_weight: float(),
network_priority: :low | :normal | :high,
custom_resources: map()
}
@type universal_variable :: %{
name: atom(),
value: term(),
version: pos_integer(), # For optimistic concurrency
last_modifier: agent_id(),
conflict_resolution: conflict_resolution_strategy(),
access_permissions: access_permissions(),
metadata: map(),
constraints: map(), # Variable constraints
created_at: DateTime.t(), # Creation timestamp
updated_at: DateTime.t() # Last update timestamp
}
@type conflict_resolution_strategy ::
:last_write_wins | :consensus | :priority_based | {:custom, module(), atom()}
@type access_permissions ::
:public | :restricted | {:agents, [agent_id()]} | {:capabilities, [atom()]}
@type coordination_request :: %{
protocol: atom(), # :auction, :consensus, :market
type: request_type(),
params: map(),
timeout: pos_integer(),
correlation_id: binary(),
created_at: DateTime.t() # Request creation timestamp
}
@type coordination_response :: %{
correlation_id: binary(),
response_type: response_type(),
data: term(),
confidence: float(), # Optional confidence score
metadata: map()
}
@type request_type ::
:consensus | :negotiation | :auction | :market | :resource_allocation
@type response_type ::
:success | :failure | :timeout | :partial
@type auction_spec :: %{
type: atom(), # Auction type
resource_id: term(), # Resource being auctioned
participants: [agent_id()], # Participating agents
starting_price: number() | nil, # Starting price (if applicable)
payment_rule: atom(), # Payment rule
auction_params: map(), # Type-specific parameters
created_at: DateTime.t() # Auction creation timestamp
}
#### Utility Functions
##### `new_agent_config/4`
Creates a new agent configuration with default values.
```elixir
@spec new_agent_config(agent_id(), module(), [term()], keyword()) :: agent_config()
Parameters:
id
- Unique identifier for the agentmodule
- GenServer module to startargs
- Arguments for start_linkopts
- Optional configuration
Examples:
# Create a basic worker agent
config = Foundation.MABEAM.Types.new_agent_config(
:worker_1,
MyApp.WorkerAgent,
[worker_id: 1]
)
# Create a coordinator with custom capabilities
config = Foundation.MABEAM.Types.new_agent_config(
"coord_1",
MyApp.CoordinatorAgent,
[],
type: :coordinator,
capabilities: [:consensus, :auction_coordination],
resources: %{memory_mb: 200, cpu_weight: 2.0}
)
new_variable/4
Creates a new universal variable with default configuration.
@spec new_variable(atom(), term(), agent_id(), keyword()) :: universal_variable()
Parameters:
name
- Variable name identifiervalue
- Initial value (must be serializable)creator_id
- Agent that created the variableopts
- Optional configuration
Examples:
# Create a simple shared counter
var = Foundation.MABEAM.Types.new_variable(:counter, 0, :agent_1)
# Create a complex shared state variable
var = Foundation.MABEAM.Types.new_variable(
"shared_state",
%{status: :ready, data: []},
"coordinator",
conflict_resolution: :consensus,
permissions: :restricted,
metadata: %{priority: :high}
)
new_coordination_request/4
Creates a coordination request structure.
@spec new_coordination_request(atom(), request_type(), map(), keyword()) :: coordination_request()
Examples:
request = Foundation.MABEAM.Types.new_coordination_request(
:consensus_voting,
:consensus,
%{question: "Deploy v2.0?", options: [:yes, :no]},
timeout: 30_000
)
new_auction_spec/3
and new_auction_spec/4
Creates an auction specification structure.
@spec new_auction_spec(atom(), term(), [agent_id()]) :: auction_spec()
@spec new_auction_spec(atom(), term(), [agent_id()], keyword()) :: auction_spec()
Examples:
# Basic sealed-bid auction
spec = Foundation.MABEAM.Types.new_auction_spec(
:sealed_bid,
:compute_resource_1,
[:agent1, :agent2, :agent3]
)
# English auction with parameters
spec = Foundation.MABEAM.Types.new_auction_spec(
:english,
:premium_slot,
[:bidder1, :bidder2],
starting_price: 10,
increment: 1,
max_rounds: 10
)
# Dutch auction with parameters
spec = Foundation.MABEAM.Types.new_auction_spec(
:dutch,
:urgent_task,
[:agent1],
starting_price: 100,
decrement: 5,
min_price: 20
)
Process Registry
Foundation.MABEAM.ProcessRegistry
Advanced agent lifecycle management with distribution readiness. Manages agents by agent_id, not PID, making it naturally distributed-ready with pluggable backend architecture.
Backend Architecture
The ProcessRegistry uses a pluggable backend system for seamless evolution from single-node to distributed deployment:
defmodule Backend do
@callback init(opts :: keyword()) :: {:ok, state :: term()} | {:error, reason :: term()}
@callback register_agent(agent_entry()) :: :ok | {:error, reason :: term()}
@callback update_agent_status(agent_id(), atom(), pid() | nil) :: :ok | {:error, reason :: term()}
@callback get_agent(agent_id()) :: {:ok, agent_entry()} | {:error, :not_found}
@callback unregister_agent(agent_id()) :: :ok | {:error, reason :: term()}
@callback list_all_agents() :: [agent_entry()]
@callback find_agents_by_capability([atom()]) :: {:ok, [agent_id()]} | {:error, reason :: term()}
@callback get_agents_by_status(atom()) :: {:ok, [agent_entry()]} | {:error, reason :: term()}
@callback cleanup_inactive_agents() :: {:ok, cleaned_count :: non_neg_integer()} | {:error, reason :: term()}
end
Available Backends:
Foundation.MABEAM.ProcessRegistry.LocalETS
- Single-node ETS backend (current)Foundation.MABEAM.ProcessRegistry.Horde
- Distributed CRDT backend (future)
Public API
start_link/1
Starts the Process Registry service.
@spec start_link(keyword()) :: GenServer.on_start()
Options:
:backend
- Backend module (default:LocalETS
):config
- Backend-specific configuration
Examples:
# Start with default LocalETS backend
{:ok, pid} = Foundation.MABEAM.ProcessRegistry.start_link()
# Start with custom configuration
{:ok, pid} = Foundation.MABEAM.ProcessRegistry.start_link(
backend: Foundation.MABEAM.ProcessRegistry.LocalETS,
config: %{cleanup_interval: 30_000, max_agents: 10_000}
)
# Future: Start with Horde backend for distribution
{:ok, pid} = Foundation.MABEAM.ProcessRegistry.start_link(
backend: Foundation.MABEAM.ProcessRegistry.Horde,
config: %{name: MABEAMRegistry, keys: :unique, members: :auto}
)
register_agent/1
Registers an agent with the registry.
@spec register_agent(agent_config()) :: :ok | {:error, term()}
Examples:
config = Foundation.MABEAM.Types.new_agent_config(
:worker_1,
MyApp.WorkerAgent,
[task_queue: :default],
type: :worker,
capabilities: [:variable_access, :coordination, :optimization]
)
:ok = Foundation.MABEAM.ProcessRegistry.register_agent(config)
start_agent/1
Starts a registered agent.
@spec start_agent(agent_id()) :: {:ok, pid()} | {:error, term()}
Examples:
{:ok, pid} = Foundation.MABEAM.ProcessRegistry.start_agent(:worker_1)
stop_agent/1
Stops a running agent gracefully.
@spec stop_agent(agent_id()) :: :ok | {:error, term()}
restart_agent/1
Restarts an agent according to its restart policy.
@spec restart_agent(agent_id()) :: {:ok, pid()} | {:error, term()}
get_agent_info/1
Retrieves comprehensive agent information.
@spec get_agent_info(agent_id()) :: {:ok, agent_entry()} | {:error, :not_found}
Returns:
%{
id: agent_id(),
config: agent_config(),
pid: pid() | nil,
status: :registered | :starting | :active | :stopping | :stopped | :failed,
node: node(),
health: health_status(),
start_time: DateTime.t() | nil,
last_heartbeat: DateTime.t() | nil,
restart_count: non_neg_integer(),
resource_usage: resource_usage()
}
list_agents/0
and list_agents/1
Lists all registered agents, optionally with filters.
@spec list_agents() :: {:ok, [agent_info()]}
@spec list_agents(keyword()) :: {:ok, [agent_info()]} | {:error, :not_supported}
find_agents_by_capability/1
Finds agents with specific capabilities.
@spec find_agents_by_capability([atom()]) :: [agent_id()]
Examples:
# Find all agents capable of coordination
coordinators = Foundation.MABEAM.ProcessRegistry.find_agents_by_capability([:coordination])
# Find agents with multiple capabilities
optimizers = Foundation.MABEAM.ProcessRegistry.find_agents_by_capability(
[:optimization, :resource_allocation]
)
find_agents_by_type/1
Finds agents by type.
@spec find_agents_by_type(agent_type()) :: [agent_id()]
get_agent_health/1
Gets current health status of an agent.
@spec get_agent_health(agent_id()) :: {:ok, :healthy | :degraded | :unhealthy} | {:error, term()}
get_agent_status/1
Gets the current status of an agent.
@spec get_agent_status(agent_id()) :: {:ok, agent_status()} | {:error, :not_found}
get_agent_pid/1
Gets the PID of a running agent.
@spec get_agent_pid(agent_id()) :: {:ok, pid()} | {:error, :not_found | :not_running}
get_agent_stats/1
Gets agent statistics.
@spec get_agent_stats(agent_id()) :: {:ok, map()} | {:error, :not_implemented | :not_found}
Agent Status Types:
@type agent_status :: :registered | :starting | :running | :stopping | :stopped | :failed
Core Orchestrator
Foundation.MABEAM.Core
Universal Variable Orchestrator for multi-agent coordination and system-wide state management.
Public API
start_link/1
Starts the MABEAM Core orchestrator service.
@spec start_link(keyword()) :: GenServer.on_start()
register_variable/1
Registers a universal variable with the Core orchestrator.
@spec register_variable(universal_variable()) :: :ok | {:error, term()}
Examples:
variable = Foundation.MABEAM.Types.new_variable(
:global_config,
%{setting: "default", enabled: true},
:system,
conflict_resolution: :consensus
)
:ok = Foundation.MABEAM.Core.register_variable(variable)
get_variable/1
Retrieves a variable’s current state.
@spec get_variable(atom()) :: {:ok, universal_variable()} | {:error, :not_found}
update_variable/3
Updates a variable’s value with conflict resolution.
@spec update_variable(atom(), term(), agent_id()) :: :ok | {:error, term()}
@spec update_variable(atom(), term(), agent_id(), keyword()) :: :ok | {:error, term()}
Examples:
# Simple update
:ok = Foundation.MABEAM.Core.update_variable(:counter, 42, :agent_1)
# Update with version check (optimistic concurrency)
:ok = Foundation.MABEAM.Core.update_variable(
:shared_state,
%{status: :processing},
:coordinator,
expected_version: 5
)
# Update with custom conflict resolution
:ok = Foundation.MABEAM.Core.update_variable(
:critical_config,
%{max_connections: 1000},
:admin_agent,
conflict_resolution: :priority_based,
priority: :high
)
list_variables/0
Lists all registered variables.
@spec list_variables() :: {:ok, [atom()]} | {:error, term()}
delete_variable/2
Deletes a variable (with authorization).
@spec delete_variable(atom(), agent_id()) :: :ok | {:error, term()}
coordinate_system/0
Initiates system-wide coordination across all registered agents and variables.
@spec coordinate_system() :: {:ok, [coordination_result()]} | {:error, term()}
get_system_status/0
Retrieves comprehensive system status.
@spec get_system_status() :: {:ok, system_status()} | {:error, term()}
Returns:
%{
variables: %{atom() => universal_variable()},
agents: [agent_id()],
coordination_sessions: [session_info()],
health: :healthy | :degraded | :unhealthy,
metrics: system_metrics(),
uptime: pos_integer(),
resource_usage: system_resource_usage()
}
get_variable_history/2
Gets the change history for a variable.
@spec get_variable_history(atom(), keyword()) :: {:ok, [variable_change()]} | {:error, term()}
subscribe_to_variable/1
Subscribes to variable change notifications.
@spec subscribe_to_variable(atom()) :: :ok | {:error, term()}
unsubscribe_from_variable/1
Unsubscribes from variable change notifications.
@spec unsubscribe_from_variable(atom()) :: :ok | {:error, term()}
get_variable_statistics/1
Gets statistics for a specific variable.
@spec get_variable_statistics(atom()) :: {:ok, map()} | {:error, term()}
Returns:
%{
update_count: non_neg_integer(),
last_updated: DateTime.t(),
conflict_count: non_neg_integer(),
subscriber_count: non_neg_integer()
}
Coordination Framework
Foundation.MABEAM.Coordination
Advanced coordination protocols for multi-agent systems with pluggable protocol architecture.
Public API
start_link/1
Starts the Coordination service.
@spec start_link(keyword()) :: GenServer.on_start()
register_protocol/2
Registers a coordination protocol.
@spec register_protocol(atom(), coordination_protocol()) :: :ok | {:error, term()}
Protocol Structure:
@type coordination_protocol :: %{
name: atom(),
type: protocol_type(),
algorithm: function(),
timeout: pos_integer(),
retry_policy: retry_policy(),
validation: function() | nil,
metadata: map()
}
@type protocol_type :: :consensus | :negotiation | :auction | :market | :custom
Examples:
protocol = %{
name: :consensus_voting,
type: :consensus,
algorithm: &MyApp.ConsensusAlgorithms.majority_vote/1,
timeout: 10_000,
retry_policy: %{max_retries: 3, backoff: :exponential},
validation: &MyApp.Validators.validate_consensus_input/1
}
:ok = Foundation.MABEAM.Coordination.register_protocol(:consensus_voting, protocol)
coordinate/4
Executes a coordination protocol with specified agents.
@spec coordinate(atom(), [agent_id()], map(), keyword()) :: {:ok, [coordination_result()]} | {:error, term()}
Parameters:
protocol_name
- Name of the registered protocolagent_ids
- List of participating agentscontext
- Coordination context and parametersopts
- Optional execution parameters
Examples:
# Simple consensus
{:ok, results} = Foundation.MABEAM.Coordination.coordinate(
:consensus_voting,
[:agent1, :agent2, :agent3],
%{question: "Should we proceed with deployment?", options: [:yes, :no]}
)
# Resource allocation with timeout
{:ok, results} = Foundation.MABEAM.Coordination.coordinate(
:resource_allocation,
[:worker1, :worker2, :worker3],
%{resource: :cpu_time, available: 100, priority: :high},
timeout: 15_000
)
# Async coordination
{:ok, session_id} = Foundation.MABEAM.Coordination.coordinate(
:complex_negotiation,
[:agent1, :agent2, :agent3, :agent4],
%{negotiation_type: :multi_issue, deadline: DateTime.add(DateTime.utc_now(), 3600)},
async: true
)
get_coordination_status/1
Gets the status of a coordination session.
@spec get_coordination_status(session_id()) :: {:ok, session_status()} | {:error, term()}
cancel_coordination/1
Cancels an active coordination session.
@spec cancel_coordination(session_id()) :: :ok | {:error, term()}
resolve_conflict/2
Resolves conflicts using specified strategies.
@spec resolve_conflict(conflict(), keyword()) :: {:ok, conflict_resolution()} | {:error, term()}
Examples:
conflict = %{
type: :variable_conflict,
variable: :shared_counter,
conflicting_updates: [
%{agent_id: :agent1, value: 10, version: 5, timestamp: DateTime.utc_now()},
%{agent_id: :agent2, value: 15, version: 5, timestamp: DateTime.utc_now()}
]
}
{:ok, resolution} = Foundation.MABEAM.Coordination.resolve_conflict(
conflict,
strategy: :consensus,
participants: [:agent1, :agent2, :arbiter]
)
list_protocols/0
Lists all registered protocols.
@spec list_protocols() :: {:ok, [atom()]} | {:error, term()}
get_protocol_info/1
Gets information about a registered protocol.
@spec get_protocol_info(atom()) :: {:ok, coordination_protocol()} | {:error, term()}
update_protocol/2
Updates an existing coordination protocol.
@spec update_protocol(atom(), coordination_protocol()) :: :ok | {:error, term()}
unregister_protocol/1
Unregisters a coordination protocol.
@spec unregister_protocol(atom()) :: :ok | {:error, term()}
get_coordination_stats/0
Gets coordination system statistics.
@spec get_coordination_stats() :: {:ok, coordination_stats()}
Returns:
@type coordination_stats :: %{
total_coordinations: non_neg_integer(),
successful_coordinations: non_neg_integer(),
failed_coordinations: non_neg_integer(),
average_coordination_time: float()
}
Auction Coordination
Foundation.MABEAM.Coordination.Auction
Sophisticated auction mechanisms for resource allocation and parameter optimization with economic validation.
Public API
start_link/1
Starts the Auction coordination service.
@spec start_link(keyword()) :: GenServer.on_start()
run_auction/3
Runs an auction for resource allocation.
@spec run_auction(auction_spec(), [agent_id()], keyword()) :: {:ok, auction_result()} | {:error, term()}
sealed_bid_auction/3
Runs a sealed-bid auction.
@spec sealed_bid_auction(auction_spec(), [agent_id()], keyword()) :: {:ok, auction_result()} | {:error, term()}
open_bid_auction/3
Runs an open-bid auction.
@spec open_bid_auction(auction_spec(), [agent_id()], keyword()) :: {:ok, auction_result()} | {:error, term()}
dutch_auction/3
Runs a Dutch (descending price) auction.
@spec dutch_auction(auction_spec(), [agent_id()], keyword()) :: {:ok, auction_result()} | {:error, term()}
vickrey_auction/3
Runs a Vickrey (second-price sealed-bid) auction.
@spec vickrey_auction(auction_spec(), [agent_id()], keyword()) :: {:ok, auction_result()} | {:error, term()}
Auction Types:
:sealed_bid
- Sealed-bid auction (first or second price):english
- English (ascending) auction:dutch
- Dutch (descending) auction:combinatorial
- Combinatorial auction for bundles:double
- Double auction (simultaneous buy/sell)
Payment Rules:
:first_price
- Winner pays their bid:second_price
- Winner pays second-highest bid:vickrey
- Truthful mechanism for combinatorial auctions
Examples:
# Sealed-bid auction with explicit bids
bids = [
%{agent_id: :agent1, bid_amount: 100, metadata: %{strategy: :conservative}},
%{agent_id: :agent2, bid_amount: 150, metadata: %{strategy: :aggressive}},
%{agent_id: :agent3, bid_amount: 120, metadata: %{strategy: :balanced}}
]
{:ok, result} = Foundation.MABEAM.Coordination.Auction.run_auction(
:cpu_resource,
bids,
auction_type: :sealed_bid,
payment_rule: :second_price
)
# English auction with agent collection
{:ok, result} = Foundation.MABEAM.Coordination.Auction.run_auction(
:memory_allocation,
[:agent1, :agent2, :agent3],
auction_type: :english,
starting_price: 50,
increment: 10,
max_rounds: 20
)
# Dutch auction for time-sensitive resources
{:ok, result} = Foundation.MABEAM.Coordination.Auction.run_auction(
:priority_slot,
[:urgent_agent1, :urgent_agent2],
auction_type: :dutch,
starting_price: 200,
decrement: 5,
min_price: 50
)
# Combinatorial auction for resource bundles
bundles = [
%{agent_id: :agent1, bundle: [:cpu, :memory], bid_amount: 200},
%{agent_id: :agent2, bundle: [:cpu], bid_amount: 100},
%{agent_id: :agent3, bundle: [:memory, :storage], bid_amount: 180}
]
{:ok, result} = Foundation.MABEAM.Coordination.Auction.run_auction(
:resource_bundle,
bundles,
auction_type: :combinatorial,
payment_rule: :vickrey
)
get_auction_status/1
Retrieves the current status of an auction.
@spec get_auction_status(auction_id()) :: {:ok, auction_status()} | {:error, :not_found}
cancel_auction/1
Cancels a running auction.
@spec cancel_auction(auction_id()) :: :ok | {:error, term()}
list_active_auctions/0
Lists all currently active auctions.
@spec list_active_auctions() :: {:ok, [auction_id()]} | {:error, term()}
get_auction_history/1
Gets the complete history of an auction.
@spec get_auction_history(reference()) :: {:ok, map()} | {:error, term()}
validate_economic_efficiency/1
Validates the economic efficiency of an auction result.
@spec validate_economic_efficiency(auction_result()) :: {:ok, map()} | {:error, term()}
get_auction_statistics/0
Gets auction system statistics.
@spec get_auction_statistics() :: {:ok, map()} | {:error, term()}
Returns:
%{
total_auctions: non_neg_integer(),
successful_auctions: non_neg_integer(),
failed_auctions: non_neg_integer(),
average_efficiency: float(),
total_value_traded: float()
}
Auction Result Structure:
@type auction_result :: %{
winner: agent_id() | nil,
winning_bid: number() | nil,
all_bids: [bid()],
efficiency_score: float(),
auction_type: atom(),
metadata: map()
}
@type bid :: %{
agent_id: agent_id(),
amount: number(),
metadata: map()
}
Market Coordination
Foundation.MABEAM.Coordination.Market
Market-based coordination mechanisms for resource allocation and price discovery with economic simulation capabilities.
Public API
start_link/1
Starts the Market coordination service.
@spec start_link(keyword()) :: GenServer.on_start()
create_market/1
Creates a new market for resource trading.
@spec create_market(market_spec()) :: {:ok, reference()} | {:error, term()}
Market Specification:
@type market_spec :: %{
name: atom(),
commodity: atom(),
market_type: :continuous | :call | :sealed_bid,
price_mechanism: :auction | :negotiation | :fixed,
participants: [agent_id()],
metadata: map()
}
Market Configuration:
@type market_config :: %{
resource_type: atom(),
initial_price: number(),
price_adjustment_rate: float(),
max_participants: pos_integer(),
trading_rules: trading_rules(),
market_type: market_type(),
clearing_mechanism: clearing_mechanism()
}
@type market_type :: :continuous | :call | :double_auction
@type clearing_mechanism :: :uniform_price | :discriminatory | :vickrey
Examples:
market_config = %{
resource_type: :compute_credits,
initial_price: 100,
price_adjustment_rate: 0.1,
max_participants: 50,
trading_rules: %{
min_order_size: 1,
max_order_size: 100,
tick_size: 0.01,
trading_hours: %{start: ~T[09:00:00], end: ~T[17:00:00]}
},
market_type: :continuous,
clearing_mechanism: :uniform_price
}
{:ok, market_id} = Foundation.MABEAM.Coordination.Market.create_market(
:compute_market,
market_config
)
find_equilibrium/1
Finds market equilibrium for a given market.
@spec find_equilibrium(reference()) :: {:ok, market_equilibrium()} | {:error, term()}
close_market/1
Closes a market and returns final results.
@spec close_market(reference()) :: {:ok, market_result()} | {:error, term()}
submit_order/2
Submits an order to a market.
@spec submit_order(reference(), market_order()) :: :ok | {:error, term()}
Market Order:
@type market_order :: %{
agent_id: agent_id(),
type: :buy | :sell,
quantity: number(),
price: number() | :market,
metadata: map()
}
Examples:
supply = [
%{agent_id: :provider1, quantity: 100, min_price: 50, metadata: %{quality: :high}},
%{agent_id: :provider2, quantity: 150, min_price: 45, metadata: %{quality: :standard}},
%{agent_id: :provider3, quantity: 80, min_price: 55, metadata: %{quality: :premium}}
]
demand = [
%{agent_id: :consumer1, quantity: 80, max_price: 65, metadata: %{urgency: :high}},
%{agent_id: :consumer2, quantity: 120, max_price: 55, metadata: %{urgency: :normal}},
%{agent_id: :consumer3, quantity: 60, max_price: 70, metadata: %{urgency: :low}}
]
{:ok, equilibrium} = Foundation.MABEAM.Coordination.Market.find_equilibrium(
supply,
demand
)
simulate_market/2
Simulates market dynamics over multiple periods.
@spec simulate_market(market_id(), simulation_config()) :: {:ok, simulation_result()} | {:error, term()}
Examples:
simulation_config = %{
periods: 24, # 24-hour simulation
demand_variation: 0.15, # 15% demand variation
supply_variation: 0.10, # 10% supply variation
learning_enabled: true,
learning_rate: 0.05,
shock_events: [
%{period: 5, type: :demand_spike, magnitude: 0.3},
%{period: 8, type: :supply_disruption, magnitude: 0.2},
%{period: 15, type: :price_shock, magnitude: 0.25}
],
agent_strategies: %{
adaptive: 0.6, # 60% adaptive agents
random: 0.2, # 20% random agents
strategic: 0.2 # 20% strategic agents
}
}
{:ok, result} = Foundation.MABEAM.Coordination.Market.simulate_market(
:compute_market,
simulation_config
)
place_order/3
Places a buy or sell order in the market.
@spec place_order(market_id(), agent_id(), order()) :: {:ok, order_id()} | {:error, term()}
cancel_order/2
Cancels an existing order.
@spec cancel_order(market_id(), order_id()) :: :ok | {:error, term()}
get_market_status/1
Gets current market status and statistics.
@spec get_market_status(market_id()) :: {:ok, market_status()} | {:error, term()}
list_active_markets/0
Lists all currently active markets.
@spec list_active_markets() :: {:ok, [reference()]} | {:error, term()}
get_market_statistics/0
Gets market system statistics.
@spec get_market_statistics() :: {:ok, map()} | {:error, term()}
Returns:
%{
total_markets: non_neg_integer(),
successful_markets: non_neg_integer(),
failed_markets: non_neg_integer(),
total_trades: non_neg_integer(),
total_volume: float(),
average_efficiency: float()
}
Market Result Structures:
@type market_result :: %{
trades: [trade()],
clearing_price: number() | nil,
total_volume: number(),
market_efficiency: float(),
participants: [agent_id()],
metadata: map()
}
@type trade :: %{
buyer: agent_id(),
seller: agent_id(),
quantity: number(),
price: number(),
timestamp: DateTime.t()
}
@type market_equilibrium :: %{
equilibrium_price: number(),
equilibrium_quantity: number(),
consumer_surplus: number(),
producer_surplus: number(),
total_welfare: number()
}
Communication Layer
Foundation.MABEAM.Comms
Distribution-aware communication layer abstracting local vs. remote calls with automatic routing and retry policies.
Public API
start_link/1
Starts the Communication service.
@spec start_link(keyword()) :: GenServer.on_start()
request/2
and request/3
Sends a request to an agent and waits for a response.
@spec request(agent_id(), term()) :: {:ok, term()} | {:error, term()}
@spec request(agent_id(), term(), timeout_ms()) :: {:ok, term()} | {:error, term()}
Examples:
# Simple request with default timeout
{:ok, response} = Foundation.MABEAM.Comms.request(
:worker_1,
{:process_task, %{data: [1, 2, 3]}}
)
# Request with custom timeout
{:ok, response} = Foundation.MABEAM.Comms.request(
:remote_agent,
{:compute, %{algorithm: :complex}},
30_000
)
send_request/3
Sends a request to an agent with automatic routing (wrapper around request/3
).
@spec send_request(agent_id(), term(), timeout_ms()) :: {:ok, term()} | {:error, term()}
notify/2
Sends a fire-and-forget notification to an agent.
@spec notify(agent_id(), term()) :: :ok | {:error, term()}
Examples:
# Send notification
:ok = Foundation.MABEAM.Comms.notify(
:worker_1,
{:update_state, %{key: "value"}}
)
send_async_request/3
Sends an asynchronous request and returns a reference.
@spec send_async_request(agent_id(), term(), timeout_ms()) :: {:ok, reference()} | {:error, term()}
coordination_request/4
Sends a coordination request to an agent.
@spec coordination_request(agent_id(), atom(), map(), timeout_ms()) :: {:ok, term()} | {:error, term()}
Examples:
# Send coordination request
{:ok, result} = Foundation.MABEAM.Comms.coordination_request(
:agent_id,
:consensus,
%{question: "Proceed?", options: [:yes, :no]},
5000
)
broadcast/3
Broadcasts a message to multiple agents.
@spec broadcast([agent_id()], term(), keyword()) :: {:ok, [response()]} | {:error, term()}
Examples:
# Broadcast to all workers
{:ok, responses} = Foundation.MABEAM.Comms.broadcast(
[:worker_1, :worker_2, :worker_3],
{:system_announcement, "Maintenance window starting"},
timeout: 5000
)
# Broadcast with partial failure tolerance
{:ok, responses} = Foundation.MABEAM.Comms.broadcast(
[:agent_1, :agent_2, :agent_3, :agent_4],
{:health_check, DateTime.utc_now()},
allow_partial_failure: true,
min_success_rate: 0.75
)
multicast/3
Sends a message to agents with specific capabilities.
@spec multicast([atom()], term(), keyword()) :: {:ok, [response()]} | {:error, term()}
Examples:
# Send to all agents with optimization capability
{:ok, responses} = Foundation.MABEAM.Comms.multicast(
[:optimization],
{:optimize_parameters, %{target: :efficiency}},
timeout: 15_000
)
subscribe/2
Subscribes to events from specific agents.
@spec subscribe(agent_id(), [event_type()]) :: :ok | {:error, term()}
Examples:
# Subscribe to status updates
:ok = Foundation.MABEAM.Comms.subscribe(:worker_1, [:status_change, :error])
# Subscribe to all events from coordinator
:ok = Foundation.MABEAM.Comms.subscribe(:coordinator, :all)
unsubscribe/2
Unsubscribes from agent events.
@spec unsubscribe(agent_id(), [event_type()]) :: :ok | {:error, term()}
publish_event/3
Publishes an event to subscribers.
@spec publish_event(agent_id(), event_type(), term()) :: :ok | {:error, term()}
get_routing_table/0
Gets the current agent routing information.
@spec get_routing_table() :: {:ok, routing_table()} | {:error, term()}
get_communication_stats/0
Gets communication statistics.
@spec get_communication_stats() :: map()
Returns:
%{
total_requests: non_neg_integer(),
successful_requests: non_neg_integer(),
failed_requests: non_neg_integer(),
total_notifications: non_neg_integer(),
coordination_requests: non_neg_integer(),
average_response_time: float()
}
Configuration
MABEAM System Configuration
MABEAM can be configured through the application environment:
# In config/config.exs
# ProcessRegistry Configuration
config :foundation, Foundation.MABEAM.ProcessRegistry,
backend: Foundation.MABEAM.ProcessRegistry.LocalETS,
max_agents: 10_000,
health_check_interval: 30_000,
cleanup_interval: 60_000,
auto_restart: true,
resource_monitoring: true
# Core Orchestrator Configuration
config :foundation, Foundation.MABEAM.Core,
max_variables: 5_000,
coordination_timeout: 10_000,
conflict_resolution_timeout: 5_000,
history_retention: 1_000,
telemetry_enabled: true,
variable_cleanup_interval: 300_000
# Coordination Framework Configuration
config :foundation, Foundation.MABEAM.Coordination,
default_timeout: 10_000,
max_concurrent_coordinations: 500,
protocol_registry_size: 100,
session_cleanup_interval: 300_000,
telemetry_enabled: true,
metrics_enabled: true
# Auction Coordination Configuration
config :foundation, Foundation.MABEAM.Coordination.Auction,
max_concurrent_auctions: 100,
default_auction_timeout: 60_000,
bid_collection_timeout: 30_000,
auction_cleanup_interval: 60_000,
economic_validation: true,
efficiency_threshold: 0.8,
payment_precision: 4
# Market Coordination Configuration
config :foundation, Foundation.MABEAM.Coordination.Market,
max_markets: 50,
equilibrium_calculation_timeout: 10_000,
simulation_max_periods: 1_000,
price_precision: 4,
market_update_frequency: 5_000,
historical_data_retention: 86_400_000 # 24 hours
# Communication Layer Configuration
config :foundation, Foundation.MABEAM.Comms,
request_timeout: 5_000,
max_concurrent_requests: 1_000,
retry_policy: %{
max_retries: 3,
backoff: :exponential,
base_delay: 100,
max_delay: 10_000
},
telemetry_enabled: true,
routing_cache_ttl: 60_000,
event_buffer_size: 10_000
# Performance Tuning
config :foundation, :mabeam_performance,
memory_limit_mb: 512,
gc_frequency: 30_000,
metrics_retention: 86_400_000,
telemetry_buffer_size: 1_000
# Health Monitoring
config :foundation, :mabeam_health,
global_health_check_interval: 30_000,
health_check_timeout: 5_000,
unhealthy_threshold: 3,
degraded_threshold: 1,
auto_recovery: true
Environment Variables
# Core Settings
export MABEAM_BACKEND=LocalETS
export MABEAM_MAX_AGENTS=10000
export MABEAM_MAX_VARIABLES=5000
# Performance
export MABEAM_COORDINATION_TIMEOUT=10000
export MABEAM_AUCTION_TIMEOUT=60000
export MABEAM_MEMORY_LIMIT_MB=512
# Monitoring
export MABEAM_TELEMETRY_ENABLED=true
export MABEAM_HEALTH_CHECK_INTERVAL=30000
export MABEAM_METRICS_RETENTION=86400000
Examples
Complete Multi-Agent Coordination Example
# 1. Start MABEAM services
{:ok, _} = Foundation.MABEAM.ProcessRegistry.start_link()
{:ok, _} = Foundation.MABEAM.Core.start_link()
{:ok, _} = Foundation.MABEAM.Coordination.start_link()
{:ok, _} = Foundation.MABEAM.Coordination.Auction.start_link()
{:ok, _} = Foundation.MABEAM.Coordination.Market.start_link()
{:ok, _} = Foundation.MABEAM.Comms.start_link()
# 2. Register and start agents
worker_config = Foundation.MABEAM.Types.new_agent_config(
:worker_1,
MyApp.WorkerAgent,
[worker_id: 1],
type: :worker,
capabilities: [:coordination, :resource_bidding, :optimization]
)
coordinator_config = Foundation.MABEAM.Types.new_agent_config(
:coordinator_1,
MyApp.CoordinatorAgent,
[coordinator_id: 1],
type: :coordinator,
capabilities: [:consensus, :auction_coordination, :market_making]
)
:ok = Foundation.MABEAM.ProcessRegistry.register_agent(worker_config)
:ok = Foundation.MABEAM.ProcessRegistry.register_agent(coordinator_config)
{:ok, _pid1} = Foundation.MABEAM.ProcessRegistry.start_agent(:worker_1)
{:ok, _pid2} = Foundation.MABEAM.ProcessRegistry.start_agent(:coordinator_1)
# 3. Register shared variables
task_queue = Foundation.MABEAM.Types.new_variable(
:task_queue,
[],
:system,
conflict_resolution: :consensus,
permissions: :public
)
resource_pool = Foundation.MABEAM.Types.new_variable(
:resource_pool,
%{cpu: 100, memory: 1000, storage: 500},
:system,
conflict_resolution: :priority_based
)
:ok = Foundation.MABEAM.Core.register_variable(task_queue)
:ok = Foundation.MABEAM.Core.register_variable(resource_pool)
# 4. Register coordination protocols
consensus_protocol = %{
name: :task_assignment,
type: :consensus,
algorithm: &MyApp.Protocols.task_assignment_consensus/1,
timeout: 10_000,
retry_policy: %{max_retries: 3, backoff: :exponential}
}
:ok = Foundation.MABEAM.Coordination.register_protocol(:task_assignment, consensus_protocol)
# 5. Execute coordination
{:ok, results} = Foundation.MABEAM.Coordination.coordinate(
:task_assignment,
[:worker_1, :worker_2, :worker_3],
%{
tasks: ["task_a", "task_b", "task_c"],
deadline: DateTime.add(DateTime.utc_now(), 3600),
priority: :high
}
)
# 6. Run auction for resource allocation
{:ok, auction_result} = Foundation.MABEAM.Coordination.Auction.run_auction(
:premium_cpu_time,
[:worker_1, :worker_2, :worker_3],
auction_type: :sealed_bid,
payment_rule: :second_price,
timeout: 30_000
)
# 7. Create and operate market
market_config = %{
resource_type: :compute_credits,
initial_price: 10.0,
price_adjustment_rate: 0.05,
max_participants: 10,
trading_rules: %{min_order_size: 1, max_order_size: 100}
}
{:ok, market_id} = Foundation.MABEAM.Coordination.Market.create_market(
:compute_market,
market_config
)
# 8. Update shared variables based on results
:ok = Foundation.MABEAM.Core.update_variable(
:task_queue,
results.task_assignments,
:coordinator_1
)
:ok = Foundation.MABEAM.Core.update_variable(
:resource_pool,
auction_result.resource_allocations,
:coordinator_1
)
# 9. Monitor system status
{:ok, status} = Foundation.MABEAM.Core.get_system_status()
IO.puts("System health: #{status.health}")
IO.puts("Active agents: #{length(status.agents)}")
IO.puts("Registered variables: #{map_size(status.variables)}")
Economic Market Simulation
# Create a comprehensive compute resource market
market_config = %{
resource_type: :compute_units,
initial_price: 10.0,
price_adjustment_rate: 0.05,
max_participants: 20,
trading_rules: %{
min_order_size: 1,
max_order_size: 1000,
tick_size: 0.01,
trading_hours: %{start: ~T[00:00:00], end: ~T[23:59:59]}
},
market_type: :continuous,
clearing_mechanism: :uniform_price
}
{:ok, market_id} = Foundation.MABEAM.Coordination.Market.create_market(
:compute_market,
market_config
)
# Define diverse market participants
suppliers = [
%{agent_id: :datacenter_1, quantity: 1000, min_price: 8.0, metadata: %{region: :us_east, quality: :premium}},
%{agent_id: :datacenter_2, quantity: 800, min_price: 9.0, metadata: %{region: :us_west, quality: :standard}},
%{agent_id: :edge_provider, quantity: 200, min_price: 12.0, metadata: %{region: :edge, quality: :low_latency}},
%{agent_id: :cloud_burst, quantity: 500, min_price: 15.0, metadata: %{region: :multi, quality: :burst}}
]
consumers = [
%{agent_id: :ml_trainer, quantity: 500, max_price: 15.0, metadata: %{workload: :training, urgency: :medium}},
%{agent_id: :web_service, quantity: 300, max_price: 11.0, metadata: %{workload: :serving, urgency: :high}},
%{agent_id: :batch_processor, quantity: 400, max_price: 10.0, metadata: %{workload: :batch, urgency: :low}},
%{agent_id: :research_lab, quantity: 200, max_price: 20.0, metadata: %{workload: :research, urgency: :variable}}
]
# Find initial equilibrium
{:ok, equilibrium} = Foundation.MABEAM.Coordination.Market.find_equilibrium(
suppliers,
consumers
)
IO.puts("Initial market equilibrium:")
IO.puts(" Price: $#{equilibrium.price}")
IO.puts(" Quantity: #{equilibrium.quantity} units")
IO.puts(" Economic efficiency: #{Float.round(equilibrium.efficiency * 100, 2)}%")
IO.puts(" Consumer surplus: $#{equilibrium.surplus.consumer}")
IO.puts(" Producer surplus: $#{equilibrium.surplus.producer}")
# Run comprehensive multi-period simulation
simulation_config = %{
periods: 168, # One week (hourly periods)
demand_variation: 0.2, # 20% demand variation
supply_variation: 0.1, # 10% supply variation
learning_enabled: true,
learning_rate: 0.02,
shock_events: [
%{period: 24, type: :demand_spike, magnitude: 0.4, duration: 3}, # Day 2: High demand
%{period: 72, type: :supply_disruption, magnitude: 0.3, duration: 6}, # Day 4: Supply issue
%{period: 120, type: :price_shock, magnitude: 0.25, duration: 2}, # Day 6: Price volatility
%{period: 144, type: :new_competitor, magnitude: 0.15, duration: 24} # Day 7: Market entry
],
agent_strategies: %{
adaptive: 0.5, # 50% adaptive learning agents
random: 0.2, # 20% random behavior agents
strategic: 0.2, # 20% strategic game-theory agents
momentum: 0.1 # 10% momentum-following agents
},
market_mechanisms: %{
price_discovery: :continuous,
clearing_frequency: :hourly,
information_transparency: :partial
}
}
{:ok, simulation} = Foundation.MABEAM.Coordination.Market.simulate_market(
market_id,
simulation_config
)
IO.puts("\nMarket simulation results (168-hour period):")
IO.puts(" Average efficiency: #{Float.round(simulation.average_efficiency * 100, 2)}%")
IO.puts(" Price stability: #{Float.round(simulation.price_stability * 100, 2)}%")
IO.puts(" Market volatility: #{Float.round(simulation.volatility * 100, 2)}%")
IO.puts(" Learning convergence: #{simulation.learning_effects.convergence_rate}")
# Analyze agent adaptations
Enum.each(simulation.agent_adaptations, fn adaptation ->
IO.puts(" Agent #{adaptation.agent_id}: #{adaptation.strategy_evolution}")
end)
# Get final market status
{:ok, final_status} = Foundation.MABEAM.Coordination.Market.get_market_status(market_id)
IO.puts("\nFinal market state:")
IO.puts(" Current price: $#{final_status.current_price}")
IO.puts(" Active orders: #{final_status.active_orders}")
IO.puts(" Total volume traded: #{final_status.total_volume}")
Advanced Coordination Workflow
# Complex multi-protocol coordination example
defmodule MyApp.AdvancedCoordination do
alias Foundation.MABEAM.{ProcessRegistry, Core, Coordination, Comms}
def run_complex_workflow do
# 1. Setup agents with different capabilities
agents = setup_diverse_agents()
# 2. Register multiple coordination protocols
register_coordination_protocols()
# 3. Create hierarchical coordination
execute_hierarchical_coordination(agents)
end
defp setup_diverse_agents do
# Coordinator agents
coordinator_config = Foundation.MABEAM.Types.new_agent_config(
:master_coordinator,
MyApp.MasterCoordinator,
[],
type: :coordinator,
capabilities: [:consensus, :auction_coordination, :conflict_resolution]
)
# Specialized worker agents
ml_worker_config = Foundation.MABEAM.Types.new_agent_config(
:ml_optimizer,
MyApp.MLOptimizer,
[model_type: :neural_network],
type: :optimizer,
capabilities: [:optimization, :learning, :resource_bidding]
)
resource_manager_config = Foundation.MABEAM.Types.new_agent_config(
:resource_manager,
MyApp.ResourceManager,
[pool_size: 1000],
type: :resource_provider,
capabilities: [:resource_allocation, :monitoring, :auction_participation]
)
monitor_config = Foundation.MABEAM.Types.new_agent_config(
:system_monitor,
MyApp.SystemMonitor,
[check_interval: 5000],
type: :monitor,
capabilities: [:monitoring, :alerting, :health_assessment]
)
# Register and start all agents
configs = [coordinator_config, ml_worker_config, resource_manager_config, monitor_config]
Enum.each(configs, fn config ->
:ok = ProcessRegistry.register_agent(config)
{:ok, _pid} = ProcessRegistry.start_agent(config.id)
end)
Enum.map(configs, & &1.id)
end
defp register_coordination_protocols do
# Multi-level consensus protocol
consensus_protocol = %{
name: :hierarchical_consensus,
type: :consensus,
algorithm: &MyApp.Protocols.hierarchical_consensus/1,
timeout: 15_000,
retry_policy: %{max_retries: 5, backoff: :exponential}
}
# Resource negotiation protocol
negotiation_protocol = %{
name: :resource_negotiation,
type: :negotiation,
algorithm: &MyApp.Protocols.multi_issue_negotiation/1,
timeout: 30_000,
retry_policy: %{max_retries: 3, backoff: :linear}
}
# Optimization coordination protocol
optimization_protocol = %{
name: :distributed_optimization,
type: :custom,
algorithm: &MyApp.Protocols.distributed_gradient_descent/1,
timeout: 60_000,
retry_policy: %{max_retries: 2, backoff: :fixed}
}
:ok = Coordination.register_protocol(:hierarchical_consensus, consensus_protocol)
:ok = Coordination.register_protocol(:resource_negotiation, negotiation_protocol)
:ok = Coordination.register_protocol(:distributed_optimization, optimization_protocol)
end
defp execute_hierarchical_coordination(agents) do
# Phase 1: System-wide consensus on objectives
{:ok, consensus_results} = Coordination.coordinate(
:hierarchical_consensus,
agents,
%{
objective: "optimize_system_performance",
constraints: %{max_cost: 1000, min_efficiency: 0.85},
voting_weights: %{
master_coordinator: 3,
ml_optimizer: 2,
resource_manager: 2,
system_monitor: 1
}
}
)
# Phase 2: Resource negotiation based on consensus
if consensus_achieved?(consensus_results) do
{:ok, negotiation_results} = Coordination.coordinate(
:resource_negotiation,
[:ml_optimizer, :resource_manager],
%{
resources_needed: consensus_results.agreed_resources,
negotiation_rounds: 5,
concession_strategy: :time_dependent
}
)
# Phase 3: Distributed optimization execution
if negotiation_successful?(negotiation_results) do
{:ok, optimization_results} = Coordination.coordinate(
:distributed_optimization,
[:ml_optimizer],
%{
algorithm_params: negotiation_results.agreed_allocation,
convergence_criteria: %{tolerance: 0.001, max_iterations: 100},
communication_topology: :ring
}
)
{:ok, %{
consensus: consensus_results,
negotiation: negotiation_results,
optimization: optimization_results
}}
else
{:error, :negotiation_failed}
end
else
{:error, :consensus_failed}
end
end
defp consensus_achieved?(results) do
agreement_rate = Enum.count(results, &(&1.decision == :agree)) / length(results)
agreement_rate >= 0.75
end
defp negotiation_successful?(results) do
Map.get(results, :agreement_reached, false)
end
end
Error Handling
All MABEAM APIs use structured error handling with detailed error information:
case Foundation.MABEAM.Coordination.coordinate(:invalid_protocol, [:agent1], %{}) do
{:ok, results} ->
handle_success(results)
{:error, %{
type: :protocol_not_found,
message: message,
context: context,
suggestions: suggestions
}} ->
Logger.error("Protocol not found: #{message}")
Logger.info("Suggestions: #{inspect(suggestions)}")
handle_protocol_error(context)
{:error, %{
type: :coordination_timeout,
message: message,
participants: failed_agents,
partial_results: partial
}} ->
Logger.warn("Coordination timeout: #{message}")
Logger.debug("Failed agents: #{inspect(failed_agents)}")
handle_timeout(failed_agents, partial)
{:error, %{
type: :agent_unavailable,
message: message,
agent_id: agent_id,
last_seen: timestamp
}} ->
Logger.error("Agent #{agent_id} unavailable: #{message}")
Logger.debug("Last seen: #{timestamp}")
handle_agent_unavailable(agent_id)
{:error, %{
type: :economic_validation_failed,
message: message,
violations: violations,
auction_id: auction_id
}} ->
Logger.error("Economic validation failed for auction #{auction_id}: #{message}")
Logger.debug("Violations: #{inspect(violations)}")
handle_economic_violation(auction_id, violations)
{:error, error} ->
Logger.error("Coordination failed: #{inspect(error)}")
handle_general_error(error)
end
Common Error Types
@type mabeam_error :: %{
type: error_type(),
message: String.t(),
context: map(),
timestamp: DateTime.t(),
suggestions: [String.t()],
retry_after: pos_integer() | nil
}
@type error_type ::
:protocol_not_found |
:agent_not_found |
:agent_unavailable |
:coordination_timeout |
:auction_invalid_bids |
:market_no_equilibrium |
:variable_conflict |
:permission_denied |
:economic_validation_failed |
:backend_error |
:communication_failure |
:resource_exhausted |
:invalid_configuration
Integration with Foundation Services
MABEAM integrates seamlessly with Foundation’s core services:
Foundation Service Integration
- Foundation.ProcessRegistry: Agent process registration and discovery
- Foundation.ServiceRegistry: MABEAM service registration and health monitoring
- Foundation.Events: Coordination events and audit trails
- Foundation.Telemetry: Performance metrics and observability
- Foundation.Config: Centralized configuration management
Telemetry Events
MABEAM emits comprehensive telemetry events:
# Agent lifecycle events
:telemetry.execute([:foundation, :mabeam, :agent, :registered], %{count: 1}, metadata)
:telemetry.execute([:foundation, :mabeam, :agent, :started], %{count: 1}, metadata)
:telemetry.execute([:foundation, :mabeam, :agent, :stopped], %{count: 1}, metadata)
# Coordination events
:telemetry.execute([:foundation, :mabeam, :coordination, :started], %{count: 1}, metadata)
:telemetry.execute([:foundation, :mabeam, :coordination, :completed], %{duration: ms}, metadata)
:telemetry.execute([:foundation, :mabeam, :coordination, :failed], %{count: 1}, metadata)
# Auction events
:telemetry.execute([:foundation, :mabeam, :auction, :started], %{participants: count}, metadata)
:telemetry.execute([:foundation, :mabeam, :auction, :completed], %{efficiency: score}, metadata)
# Market events
:telemetry.execute([:foundation, :mabeam, :market, :equilibrium_found], %{price: price}, metadata)
:telemetry.execute([:foundation, :mabeam, :market, :trade_executed], %{volume: volume}, metadata)
# Communication events
:telemetry.execute([:foundation, :mabeam, :comms, :request_sent], %{count: 1}, metadata)
:telemetry.execute([:foundation, :mabeam, :comms, :request_completed], %{duration: ms}, metadata)
Event Handling Example
defmodule MyApp.MABEAMTelemetryHandler do
def handle_event([:foundation, :mabeam, :coordination, :completed], measurements, metadata, _config) do
duration_ms = measurements.duration
protocol = metadata.protocol
participants = metadata.participants
# Log performance metrics
Logger.info("Coordination completed", [
protocol: protocol,
duration_ms: duration_ms,
participants: length(participants)
])
# Update performance dashboard
MyApp.Dashboard.update_coordination_metrics(protocol, duration_ms)
# Check for performance anomalies
if duration_ms > 10_000 do
MyApp.Alerts.send_performance_alert(protocol, duration_ms)
end
end
def handle_event([:foundation, :mabeam, :auction, :completed], measurements, metadata, _config) do
efficiency = measurements.efficiency
auction_type = metadata.auction_type
# Track economic efficiency
MyApp.Economics.record_efficiency(auction_type, efficiency)
# Alert on low efficiency
if efficiency < 0.7 do
MyApp.Alerts.send_efficiency_alert(auction_type, efficiency)
end
end
end
# Attach telemetry handlers
:telemetry.attach_many(
"mabeam-telemetry-handler",
[
[:foundation, :mabeam, :coordination, :completed],
[:foundation, :mabeam, :auction, :completed],
[:foundation, :mabeam, :market, :equilibrium_found]
],
&MyApp.MABEAMTelemetryHandler.handle_event/4,
nil
)
Performance Considerations
Optimization Guidelines
- Agent Pool Management: Use agent pools for high-frequency operations
- Coordination Batching: Batch multiple coordination requests when possible
- Variable Caching: Cache frequently accessed universal variables
- Auction Optimization: Use appropriate auction types for different scenarios
- Market Efficiency: Configure market parameters for optimal performance
Performance Monitoring
# Monitor key performance metrics
defmodule MyApp.MABEAMPerformance do
def monitor_performance do
# Agent registration rate
agent_reg_rate = measure_agent_registration_rate()
# Coordination latency
coord_latency = measure_coordination_latency()
# Auction efficiency
auction_efficiency = measure_auction_efficiency()
# Market convergence time
market_convergence = measure_market_convergence()
%{
agent_registration_rate: agent_reg_rate,
coordination_latency: coord_latency,
auction_efficiency: auction_efficiency,
market_convergence: market_convergence
}
end
defp measure_agent_registration_rate do
# Implementation for measuring agent registration rate
# Target: >1000 agents/second
end
defp measure_coordination_latency do
# Implementation for measuring coordination latency
# Target: <100ms for simple consensus with 10 agents
end
defp measure_auction_efficiency do
# Implementation for measuring auction efficiency
# Target: >0.85 economic efficiency
end
defp measure_market_convergence do
# Implementation for measuring market convergence time
# Target: <5 seconds for equilibrium finding
end
end
Security Considerations
Authentication and Authorization
# Agent authentication example
defmodule MyApp.MABEAMSecurity do
def authenticate_agent(agent_id, credentials) do
case verify_agent_credentials(agent_id, credentials) do
{:ok, agent_info} ->
{:ok, agent_info}
{:error, reason} ->
Logger.warn("Agent authentication failed", agent_id: agent_id, reason: reason)
{:error, :authentication_failed}
end
end
def authorize_coordination(agent_id, protocol, context) do
case check_coordination_permissions(agent_id, protocol, context) do
:authorized ->
:ok
:unauthorized ->
Logger.warn("Coordination authorization failed",
agent_id: agent_id,
protocol: protocol
)
{:error, :unauthorized}
end
end
def authorize_variable_access(agent_id, variable_name, operation) do
case check_variable_permissions(agent_id, variable_name, operation) do
:authorized ->
:ok
:unauthorized ->
Logger.warn("Variable access denied",
agent_id: agent_id,
variable: variable_name,
operation: operation
)
{:error, :access_denied}
end
end
end
Data Protection
- All communication uses encrypted channels when distributed
- Sensitive auction bids are protected until reveal phase
- Variable access is controlled by permission systems
- Audit trails are maintained for all operations
Migration Guide
From Legacy MABEAM to New Architecture
# Migration helper module
defmodule Foundation.MABEAM.Migration do
@doc """
Migrates from legacy PID-based agents to agent_id-based system
"""
def migrate_legacy_agents(legacy_agents) do
Enum.map(legacy_agents, fn {pid, config} ->
# Extract agent information from legacy format
agent_id = extract_agent_id(config)
# Convert to new agent configuration format
new_config = Foundation.MABEAM.Types.new_agent_config(
agent_id,
config.module,
config.args,
type: config.type || :worker,
capabilities: config.capabilities || [],
metadata: %{migrated_from: pid}
)
# Register with new ProcessRegistry
:ok = Foundation.MABEAM.ProcessRegistry.register_agent(new_config)
# Stop legacy agent and start new one
:ok = GenServer.stop(pid)
{:ok, new_pid} = Foundation.MABEAM.ProcessRegistry.start_agent(agent_id)
{agent_id, new_pid}
end)
end
@doc """
Migrates legacy variables to universal variables
"""
def migrate_legacy_variables(legacy_variables) do
Enum.map(legacy_variables, fn {name, value, metadata} ->
universal_var = Foundation.MABEAM.Types.new_variable(
name,
value,
:migration_system,
conflict_resolution: :last_write_wins,
metadata: Map.put(metadata, :migrated, true)
)
:ok = Foundation.MABEAM.Core.register_variable(universal_var)
universal_var
end)
end
end