Foundation 2.1: Implementation Patterns & Best Practices
This document provides concrete implementation patterns that demonstrate
the “Leaky Abstractions by Design” philosophy in action, including
MABEAM multi-agent coordination patterns.
==============================================================================
PATTERN 1: The Intentionally Leaky Facade
==============================================================================
defmodule Foundation.ProcessManager do @moduledoc """ Smart Facade for distributed process patterns.
Philosophy: Intentionally Leaky
This module is designed as a leaky abstraction. It provides convenient functions for common patterns, but ALWAYS shows you what it’s doing underneath. When you need more power, drop down to Horde directly.
What This Module Does
- Combines multi-step Horde operations into single function calls
- Provides clear names for distributed patterns (singleton, replicated)
- Handles error cases and provides better error messages
What This Module Does NOT Do
- Hide Horde’s APIs or prevent direct access
- Reimplement any core Horde functionality
- Add significant performance overhead
Examples of “Leakiness”
# Facade: Simple and clear
Foundation.ProcessManager.start_singleton(MyWorker, args)
# What it actually does (visible in source):
Horde.DynamicSupervisor.start_child(Foundation.DistributedSupervisor, child_spec)
|> case do
{:ok, pid} -> Horde.Registry.register(Foundation.ProcessRegistry, name, pid)
error -> error
end
# Direct access when you need it:
Horde.DynamicSupervisor.start_child(Foundation.DistributedSupervisor, complex_spec)
"""
require Logger
LEAKY DESIGN: These are the actual tools we use, clearly exposed
@horde_supervisor Foundation.DistributedSupervisor @horde_registry Foundation.ProcessRegistry
@doc """ Starts a globally unique process (singleton) across the cluster.
What This Function Actually Does
- Creates a child_spec for Horde.DynamicSupervisor
- Starts the child with Horde.DynamicSupervisor.start_child/2
- Registers the process with Horde.Registry.register/3 (if name provided)
- Provides better error messages than raw Horde
When to Use the Facade vs Raw Horde
Use this facade when:
- You want a simple singleton pattern
- You want combined start+register operation
- You want clearer error messages
Use Horde directly when:
- You need custom child_spec configuration
- You need advanced Horde.Registry metadata
- You’re implementing a custom distribution pattern
Examples
# Simple singleton
{:ok, pid} = Foundation.ProcessManager.start_singleton(MyWorker, [])
# Singleton with name for discovery
{:ok, pid} = Foundation.ProcessManager.start_singleton(
MyWorker,
[],
name: :global_worker
)
# Custom restart strategy
{:ok, pid} = Foundation.ProcessManager.start_singleton(
MyWorker,
[],
name: :temp_worker,
restart: :temporary
)
""" @spec start_singleton(module(), list(), keyword()) :: {:ok, pid()} | {:error, term()} def start_singleton(module, args, opts \ []) do name = Keyword.get(opts, :name) restart = Keyword.get(opts, :restart, :permanent)
# LEAKY: We show exactly what child_spec we create
child_spec = %{
id: name || module,
start: {module, :start_link, [args]},
restart: restart,
type: :worker
}
Logger.debug("Foundation.ProcessManager: Starting singleton #{inspect(name || module)}")
Logger.debug(" Child spec: #{inspect(child_spec)}")
Logger.debug(" Using Horde.DynamicSupervisor: #{@horde_supervisor}")
# LEAKY: We show the exact Horde calls we make
case Horde.DynamicSupervisor.start_child(@horde_supervisor, child_spec) do
{:ok, pid} ->
register_result = if name do
Logger.debug(" Registering with Horde.Registry: #{@horde_registry}")
metadata = %{
pid: pid,
node: Node.self(),
started_at: System.system_time(:second),
module: module
}
case Horde.Registry.register(@horde_registry, name, metadata) do
:ok ->
Logger.info("Foundation.ProcessManager: Singleton #{name} started and registered")
:ok
{:error, reason} ->
Logger.warning("Singleton started but registration failed: #{inspect(reason)}")
Logger.warning("Process is running but not discoverable via lookup_singleton/1")
:ok # Still return success since process started
end
else
:ok
end
{:ok, pid}
{:error, {:already_started, existing_pid}} ->
Logger.debug("Singleton #{inspect(name || module)} already running")
{:ok, existing_pid}
{:error, reason} = error ->
Logger.error("Failed to start singleton #{inspect(name || module)}: #{inspect(reason)}")
Logger.error("You can try calling Horde.DynamicSupervisor.start_child/2 directly for more control")
error
end
end
@doc """ Looks up a singleton process by name.
What This Function Actually Does
This is a thin wrapper around Horde.Registry.lookup/2 that:
- Simplifies the return format for the common single-result case
- Provides better error messages
Equivalent Horde Call
# This facade call:
Foundation.ProcessManager.lookup_singleton(:my_service)
# Is equivalent to:
case Horde.Registry.lookup(Foundation.ProcessRegistry, :my_service) do
[{pid, _metadata}] -> {:ok, pid}
[] -> :not_found
end
""" @spec lookup_singleton(term()) :: {:ok, pid()} | :not_found def lookup_singleton(name) do Logger.debug(“Foundation.ProcessManager: Looking up singleton #{inspect(name)}”) Logger.debug(" Using Horde.Registry: #{@horde_registry}")
case Horde.Registry.lookup(@horde_registry, name) do
[{pid, metadata}] ->
Logger.debug(" Found: #{inspect(pid)} on #{metadata.node}")
{:ok, pid}
[] ->
Logger.debug(" Not found")
Logger.debug(" You can also call: Horde.Registry.lookup(#{@horde_registry}, #{inspect(name)})")
:not_found
multiple_results ->
# This shouldn't happen with singleton pattern, but if it does, we're transparent
Logger.warning("Multiple results for singleton lookup: #{inspect(multiple_results)}")
Logger.warning("This suggests a bug in the singleton pattern or registry inconsistency")
{:ok, elem(hd(multiple_results), 0)} # Return first PID
end
end
@doc """ Starts a replicated process - one instance per node in the cluster.
What This Function Actually Does
This function coordinates starting a process on every node in the cluster:
- Gets list of all nodes with [Node.self() | Node.list()]
- Uses :rpc.call/4 to start the singleton on each node
- Returns results from all nodes
This is NOT a Horde-specific pattern - it’s a coordination pattern that uses singleton creation across multiple nodes. """ @spec start_replicated(module(), list(), keyword()) :: list({:ok, pid()} | {:error, term()}) def start_replicated(module, args, opts \ []) do base_name = Keyword.get(opts, :name, module) nodes = [Node.self() | Node.list()]
Logger.info("Foundation.ProcessManager: Starting replicated #{module} on #{length(nodes)} nodes")
# LEAKY: We show exactly how we coordinate across nodes
tasks = Enum.map(nodes, fn node ->
Task.async(fn ->
# Create unique name per node to avoid conflicts
node_name = {base_name, node}
node_opts = Keyword.put(opts, :name, node_name)
if node == Node.self() do
# Local call
start_singleton(module, args, node_opts)
else
# Remote call - transparently shown
Logger.debug(" Making RPC call to #{node}")
case :rpc.call(node, __MODULE__, :start_singleton, [module, args, node_opts]) do
{:badrpc, reason} -> {:error, {:rpc_failed, node, reason}}
result -> result
end
end
end)
end)
results = Task.await_many(tasks, 30_000)
Logger.info("Replicated start complete: #{inspect(results)}")
results
end end
==============================================================================
PATTERN 2: The Transparent Channel Abstraction
==============================================================================
defmodule Foundation.Channels do @moduledoc """ Application-layer channels using Phoenix.PubSub.
Philosophy: Solve Head-of-Line Blocking Transparently
Distributed Erlang has a head-of-line blocking problem: a large message on the TCP connection can block small, urgent messages.
This module solves it by using Phoenix.PubSub topics as logical “channels” over the same underlying Distributed Erlang connection.
Leaky Design: You Can See Exactly What We Do
- :control messages -> Phoenix.PubSub topic “foundation:control”
- :events messages -> Phoenix.PubSub topic “foundation:events”
- :data messages -> Phoenix.PubSub topic “foundation:data”
- :telemetry messages -> Phoenix.PubSub topic “foundation:telemetry”
When to Use Channels vs Direct PubSub
Use Foundation.Channels when:
- You want standard channel semantics
- You want automatic channel selection based on message type
- You want the convenience of compression and routing
Use Phoenix.PubSub directly when:
- You need custom topics
- You need advanced PubSub features (local vs distributed)
- You’re building your own messaging patterns """
require Logger
LEAKY: The exact PubSub process we use is clearly exposed
@pubsub Foundation.PubSub
LEAKY: The topic mapping is explicit and simple
@topic_mapping %{ control: “foundation:control”, events: “foundation:events”, data: “foundation:data”, telemetry: “foundation:telemetry” }
@doc """ Broadcast a message on a logical channel.
What This Function Actually Does
- Maps channel atom to Phoenix.PubSub topic string
- Optionally compresses message if requested
- Calls Phoenix.PubSub.broadcast/3
Equivalent Phoenix.PubSub Call
# This facade call:
Foundation.Channels.broadcast(:events, my_message)
# Is equivalent to:
Phoenix.PubSub.broadcast(Foundation.PubSub, "foundation:events", my_message)
Channel to Topic Mapping
#{inspect(@topic_mapping, pretty: true)} """ @spec broadcast(atom(), term(), keyword()) :: :ok | {:error, term()} def broadcast(channel, message, opts \ []) do topic = Map.get(@topic_mapping, channel)
unless topic do
available_channels = Map.keys(@topic_mapping)
raise ArgumentError, """
Invalid channel: #{inspect(channel)}
Available channels: #{inspect(available_channels)}
Or use Phoenix.PubSub.broadcast(#{@pubsub}, "your-custom-topic", message)
"""
end
Logger.debug("Foundation.Channels: Broadcasting to #{channel} (topic: #{topic})")
final_message = case Keyword.get(opts, :compression, false) do
true ->
compressed = compress_message(message)
Logger.debug(" Message compressed: #{byte_size(:erlang.term_to_binary(message))} -> #{byte_size(compressed)} bytes")
{:foundation_compressed, compressed}
false ->
message
end
# LEAKY: Show the exact Phoenix.PubSub call
Logger.debug(" Phoenix.PubSub.broadcast(#{@pubsub}, #{inspect(topic)}, ...)")
case Phoenix.PubSub.broadcast(@pubsub, topic, final_message) do
:ok ->
Logger.debug(" Broadcast successful")
:ok
{:error, reason} = error ->
Logger.error("Channel broadcast failed: #{inspect(reason)}")
Logger.error("You can try Phoenix.PubSub.broadcast/3 directly if needed")
error
end
end
@doc """ Subscribe to a logical channel.
What This Function Actually Does
- Maps channel atom to Phoenix.PubSub topic
- Calls Phoenix.PubSub.subscribe/2
- Optionally registers a message handler
Equivalent Phoenix.PubSub Call
# This:
Foundation.Channels.subscribe(:events)
# Is equivalent to:
Phoenix.PubSub.subscribe(Foundation.PubSub, "foundation:events")
""" @spec subscribe(atom(), pid()) :: :ok | {:error, term()} def subscribe(channel, handler \ self()) do topic = Map.get(@topic_mapping, channel)
unless topic do
available_channels = Map.keys(@topic_mapping)
raise ArgumentError, "Invalid channel: #{inspect(channel)}. Available: #{inspect(available_channels)}"
end
Logger.debug("Foundation.Channels: Subscribing to #{channel} (topic: #{topic})")
Logger.debug(" Phoenix.PubSub.subscribe(#{@pubsub}, #{inspect(topic)})")
case Phoenix.PubSub.subscribe(@pubsub, topic) do
:ok ->
if handler != self() do
# Register custom handler (implementation would store this mapping)
Logger.debug(" Registered custom handler: #{inspect(handler)}")
end
:ok
{:error, reason} = error ->
Logger.error("Channel subscription failed: #{inspect(reason)}")
error
end
end
@doc """ Intelligently route a message to the appropriate channel.
What This Function Actually Does
This function demonstrates “smart” behavior while being completely transparent:
- Analyzes message content and options
- Selects appropriate channel based on heuristics
- Calls broadcast/3 with selected channel
Intelligence Rules (Completely Visible)
If opts[:priority] == :high -> :control channel
If message size > 10KB -> :data channel
If message matches telemetry pattern -> :telemetry channel
Otherwise -> :events channel """ def route_message(message, opts \ []) do channel = determine_channel(message, opts)
Logger.debug(“Foundation.Channels: Auto-routing message to #{channel} channel”) Logger.debug(" Routing logic: #{explain_routing_decision(message, opts, channel)}")
broadcast(channel, message, opts) end
LEAKY: All routing logic is visible and simple
defp determine_channel(message, opts) do cond do Keyword.get(opts, :priority) == :high -> :control
large_message?(message) ->
:data
telemetry_message?(message) ->
:telemetry
true ->
:events
end
end
defp explain_routing_decision(message, opts, channel) do cond do Keyword.get(opts, :priority) == :high -> “High priority specified -> control”
large_message?(message) ->
"Large message (#{estimate_size(message)} bytes) -> data"
telemetry_message?(message) ->
"Telemetry pattern detected -> telemetry"
true ->
"Default routing -> events"
end
end
defp large_message?(message) do estimate_size(message) > 10_240 # 10KB threshold end
defp telemetry_message?(message) do case message do {:telemetry, _} -> true %{type: :metric} -> true %{type: :measurement} -> true _ -> false end end
defp estimate_size(message) do byte_size(:erlang.term_to_binary(message)) end
defp compress_message(message) do :zlib.compress(:erlang.term_to_binary(message)) end end
==============================================================================
PATTERN 3: The Configuration Translation Layer
==============================================================================
defmodule Foundation.ClusterConfig do @moduledoc """ Translates Foundation’s simple configuration to libcluster topologies.
Philosophy: Translation, Not Replacement
This module never tries to replace libcluster’s configuration system. Instead, it provides a simple front-end that translates to proper libcluster configuration.
Transparency: You Can See The Translation
Every translation function shows exactly what libcluster configuration it produces. This makes debugging easy and learning gradual.
The Three Modes
- Mortal Mode:
cluster: true
-> Auto-detect best strategy - Apprentice Mode:
cluster: :kubernetes
-> Translate to libcluster config - Wizard Mode: Existing libcluster config -> Foundation steps aside """
require Logger
@doc """ Resolves Foundation configuration to libcluster topology.
What This Function Actually Does
- Checks if libcluster is already configured (Wizard mode)
- If not, looks at Foundation’s :cluster config
- Translates Foundation config to libcluster topology
- Returns topology ready for Cluster.Supervisor
Return Values
{:wizard_mode, existing_config}
- libcluster already configured{:apprentice_mode, translated_config}
- simple config translated{:mortal_mode, auto_detected_config}
- auto-detected configuration{:no_clustering, []}
- clustering disabled """ def resolve_cluster_config() do Logger.info(“Foundation.ClusterConfig: Resolving cluster configuration…”)LEAKY: Show exactly how we detect existing libcluster config
case Application.get_env(:libcluster, :topologies) do topologies when is_list(topologies) and topologies != [] -> Logger.info(" Found existing libcluster configuration (Wizard mode)") Logger.info(" Foundation will defer to: #{inspect(topologies)}") {:wizard_mode, topologies}
_ -> # No existing libcluster config, check Foundation config foundation_config = Application.get_env(:foundation, :cluster, false) Logger.info(" Foundation cluster config: #{inspect(foundation_config)}")
translate_foundation_config(foundation_config)
end end
LEAKY: All translation logic is completely visible
defp translate_foundation_config(false) do Logger.info(" Clustering disabled") {:no_clustering, []} end
defp translate_foundation_config(true) do Logger.info(" Mortal mode: Auto-detecting best strategy…") strategy_result = auto_detect_strategy() Logger.info(" Auto-detected: #{inspect(strategy_result)}") {:mortal_mode, strategy_result} end
defp translate_foundation_config(:kubernetes) do Logger.info(" Apprentice mode: Translating :kubernetes") config = translate_kubernetes_config() Logger.info(" Translated to: #{inspect(config)}") {:apprentice_mode, config} end
defp translate_foundation_config(:consul) do Logger.info(" Apprentice mode: Translating :consul") config = translate_consul_config() Logger.info(" Translated to: #{inspect(config)}") {:apprentice_mode, config} end
defp translate_foundation_config(:dns) do Logger.info(" Apprentice mode: Translating :dns") config = translate_dns_config() Logger.info(" Translated to: #{inspect(config)}") {:apprentice_mode, config} end
defp translate_foundation_config([strategy: strategy] = opts) when is_atom(strategy) do Logger.info(" Apprentice mode: Translating strategy #{strategy} with opts") config = translate_strategy_with_opts(strategy, opts) Logger.info(" Translated to: #{inspect(config)}") {:apprentice_mode, config} end
defp translate_foundation_config(other) do Logger.warning(" Unknown Foundation cluster config: #{inspect(other)}") Logger.warning(" Falling back to auto-detection") strategy_result = auto_detect_strategy() {:mortal_mode, strategy_result} end
@doc """ Auto-detects the best clustering strategy based on environment.
Detection Logic (Completely Visible)
- Check if mdns_lite is available AND we’re in development -> MdnsLite strategy
- Check if Kubernetes environment variables exist -> Kubernetes strategy
- Check if Consul is available -> Consul strategy
- Fall back to Gossip strategy with default settings
Environment Detection Methods
Kubernetes: Checks for KUBERNETES_SERVICE_HOST environment variable
Consul: Checks for CONSUL_HTTP_ADDR environment variable
Development: Checks MIX_ENV == “dev”
mdns_lite: Checks if :mdns_lite application is loaded """ def auto_detect_strategy() do Logger.info(“Foundation.ClusterConfig: Auto-detecting clustering strategy…”)
strategy = cond do mdns_lite_available?() and development_mode?() -> Logger.info(" Detected: Development mode with mdns_lite available") Logger.info(" Strategy: Foundation.Strategies.MdnsLite") foundation_mdns_strategy()
kubernetes_environment?() -> Logger.info(" Detected: Kubernetes environment (KUBERNETES_SERVICE_HOST present)") Logger.info(" Strategy: Cluster.Strategy.Kubernetes") translate_kubernetes_config()
consul_available?() -> Logger.info(" Detected: Consul available (CONSUL_HTTP_ADDR present)") Logger.info(" Strategy: Cluster.Strategy.Consul") translate_consul_config()
true -> Logger.info(" Detected: Generic environment") Logger.info(" Strategy: Cluster.Strategy.Gossip (fallback)") translate_gossip_config() end
Logger.info(" Final topology: #{inspect(strategy)}") strategy end
LEAKY: All environment detection logic is simple and visible
defp mdns_lite_available?() do case Application.load(:mdns_lite) do :ok -> true {:error, {:already_loaded, :mdns_lite}} -> true _ -> false end end
defp development_mode?() do System.get_env(“MIX_ENV”) == “dev” end
defp kubernetes_environment?() do System.get_env(“KUBERNETES_SERVICE_HOST”) != nil end
defp consul_available?() do System.get_env(“CONSUL_HTTP_ADDR”) != nil end
LEAKY: All translation functions show exact libcluster configuration
@doc """ Translates to Kubernetes strategy configuration.
Generated libcluster Configuration
[
foundation_k8s: [
strategy: Cluster.Strategy.Kubernetes,
config: [
mode: :hostname,
kubernetes_node_basename: "app-name",
kubernetes_selector: "app=app-name"
]
]
]
Configuration Sources
App name: FOUNDATION_APP_NAME env var or Application.get_application()
Selector: FOUNDATION_K8S_SELECTOR env var or “app=
” """ def translate_kubernetes_config() do app_name = get_app_name() selector = System.get_env(“FOUNDATION_K8S_SELECTOR”, “app=#{app_name}”) config = [ foundation_k8s: [ strategy: Cluster.Strategy.Kubernetes, config: [ mode: :hostname, kubernetes_node_basename: app_name, kubernetes_selector: selector ] ] ]
Logger.debug(“Kubernetes config generated:”) Logger.debug(" App name: #{app_name}") Logger.debug(" Selector: #{selector}") Logger.debug(" libcluster topology: #{inspect(config)}")
config end
@doc """ Translates to Consul strategy configuration.
Generated libcluster Configuration
[
foundation_consul: [
strategy: Cluster.Strategy.Consul,
config: [
service_name: "app-name"
]
]
]
""" def translate_consul_config() do service_name = System.get_env(“FOUNDATION_SERVICE_NAME”, get_app_name())
config = [
foundation_consul: [
strategy: Cluster.Strategy.Consul,
config: [
service_name: service_name
]
]
]
Logger.debug("Consul config generated: #{inspect(config)}")
config
end
@doc """ Translates to DNS strategy configuration.
Generated libcluster Configuration
[
foundation_dns: [
strategy: Cluster.Strategy.DNS,
config: [
service: "app-name",
application_name: "app-name"
]
]
]
""" def translate_dns_config() do app_name = get_app_name() service = System.get_env(“FOUNDATION_DNS_SERVICE”, app_name)
config = [
foundation_dns: [
strategy: Cluster.Strategy.DNS,
config: [
service: service,
application_name: app_name
]
]
]
Logger.debug("DNS config generated: #{inspect(config)}")
config
end
@doc """ Translates to Gossip strategy configuration (fallback).
Generated libcluster Configuration
[
foundation_gossip: [
strategy: Cluster.Strategy.Gossip,
config: [
multicast_addr: "230.1.1.251",
multicast_ttl: 1,
secret: "foundation-cluster"
]
]
]
""" def translate_gossip_config() do secret = System.get_env(“FOUNDATION_GOSSIP_SECRET”, “foundation-cluster”)
config = [
foundation_gossip: [
strategy: Cluster.Strategy.Gossip,
config: [
multicast_addr: "230.1.1.251",
multicast_ttl: 1,
secret: secret
]
]
]
Logger.debug("Gossip config generated: #{inspect(config)}")
config
end
@doc """ Creates configuration for Foundation’s custom MdnsLite strategy.
Generated libcluster Configuration
[
foundation_mdns: [
strategy: Foundation.Strategies.MdnsLite,
config: [
service_name: "app-name-dev",
discovery_interval: 5000
]
]
]
""" def foundation_mdns_strategy() do app_name = get_app_name() service_name = “#{app_name}-dev”
config = [
foundation_mdns: [
strategy: Foundation.Strategies.MdnsLite,
config: [
service_name: service_name,
discovery_interval: 5000
]
]
]
Logger.debug("MdnsLite strategy config: #{inspect(config)}")
config
end
defp translate_strategy_with_opts(strategy, opts) do # Handle explicit strategy with options base_config = case strategy do :gossip -> translate_gossip_config() :kubernetes -> translate_kubernetes_config() :consul -> translate_consul_config() :dns -> translate_dns_config() end
# Could merge additional options here
base_config
end
defp get_app_name() do System.get_env(“FOUNDATION_APP_NAME”) || to_string(Application.get_application(MODULE) || “foundation-app”) end end
==============================================================================
PATTERN 4: The Self-Documenting Health Monitor
==============================================================================
defmodule Foundation.HealthMonitor do @moduledoc """ Cluster health monitoring with complete transparency.
Philosophy: Observable and Explainable
This module collects health metrics from all Foundation components and underlying tools. Every metric source is clearly documented, and the health assessment logic is completely visible.
What We Monitor (And How)
- Cluster connectivity: Node.list() and ping tests
- Horde health: Process counts and registry synchronization
- Phoenix.PubSub: Message delivery and subscription counts
- Service availability: Registered services and health checks
- Performance metrics: Message latency and throughput
Leaky Design: All Assessment Logic is Visible
The health assessment rules are implemented as simple, readable functions that you can inspect, understand, and modify. """
use GenServer require Logger
defstruct [ :last_check_time, :health_history, :check_interval ]
def start_link(opts \ []) do GenServer.start_link(MODULE, opts, name: MODULE) end
@impl true def init(opts) do check_interval = Keyword.get(opts, :check_interval, 30_000) # 30 seconds
# Start first health check after a brief delay
Process.send_after(self(), :health_check, 5_000)
state = %__MODULE__{
last_check_time: nil,
health_history: [],
check_interval: check_interval
}
Logger.info("Foundation.HealthMonitor: Started with #{check_interval}ms check interval")
{:ok, state}
end
@doc """ Gets comprehensive cluster health information.
Health Report Structure
%{
overall_status: :healthy | :degraded | :critical,
cluster: %{
connected_nodes: 3,
expected_nodes: 3,
node_health: [%{node: :app@host, status: :healthy, ...}]
},
services: %{
total_registered: 15,
healthy: 14,
unhealthy: 1,
services: [%{name: :user_service, status: :healthy, ...}]
},
infrastructure: %{
horde_status: :synchronized,
pubsub_status: :healthy,
message_queues: :normal
},
performance: %{
avg_message_latency_ms: 12,
message_throughput_per_sec: 1547,
error_rate_percent: 0.02
},
recommendations: ["Consider adding more nodes", ...]
}
""" def get_cluster_health() do GenServer.call(MODULE, :get_health, 10_000) end
@impl true def handle_call(:get_health, _from, state) do health_report = perform_comprehensive_health_check() {:reply, health_report, state} end
@impl true def handle_info(:health_check, state) do Logger.debug(“Foundation.HealthMonitor: Performing scheduled health check”)
health_report = perform_comprehensive_health_check()
# Store in history (keep last 10 checks)
new_history = [health_report | Enum.take(state.health_history, 9)]
# Log concerning issues
case health_report.overall_status do
:healthy ->
Logger.debug("Cluster health: OK")
:degraded ->
Logger.warning("Cluster health: DEGRADED")
Logger.warning("Issues: #{inspect(health_report.issues)}")
:critical ->
Logger.error("Cluster health: CRITICAL")
Logger.error("Critical issues: #{inspect(health_report.critical_issues)}")
end
# Schedule next check
Process.send_after(self(), :health_check, state.check_interval)
new_state = %{state |
last_check_time: System.system_time(:millisecond),
health_history: new_history
}
{:noreply, new_state}
end
LEAKY: All health check logic is completely visible and simple
defp perform_comprehensive_health_check() do Logger.debug(" Checking cluster connectivity…") cluster_health = check_cluster_connectivity()
Logger.debug(" Checking service registry...")
service_health = check_service_registry_health()
Logger.debug(" Checking infrastructure components...")
infrastructure_health = check_infrastructure_health()
Logger.debug(" Collecting performance metrics...")
performance_metrics = collect_performance_metrics()
Logger.debug(" Analyzing overall status...")
overall_status = determine_overall_status([
cluster_health.status,
service_health.status,
infrastructure_health.status
])
Logger.debug(" Generating recommendations...")
recommendations = generate_health_recommendations(
cluster_health,
service_health,
infrastructure_health,
performance_metrics
)
%{
overall_status: overall_status,
cluster: cluster_health,
services: service_health,
infrastructure: infrastructure_health,
performance: performance_metrics,
recommendations: recommendations,
checked_at: System.system_time(:millisecond),
check_duration_ms: 0 # Would measure actual check time
}
end
@doc """ Checks cluster connectivity and node health.
What This Function Actually Does
- Gets connected nodes with [Node.self() | Node.list()]
- Pings each node to verify connectivity
- Checks expected node count from configuration
- Assesses each node’s basic health metrics
Health Assessment Logic
:healthy: All expected nodes connected and responding
:degraded: Some nodes missing or slow to respond
:critical: Majority of nodes unreachable """ def check_cluster_connectivity() do connected_nodes = [Node.self() | Node.list()] expected_nodes = get_expected_node_count()
Logger.debug(" Connected nodes: #{length(connected_nodes)}") Logger.debug(" Expected nodes: #{expected_nodes}")
Test connectivity to each node
node_health_results = Enum.map(connected_nodes, fn node -> check_individual_node_health(node) end)
healthy_nodes = Enum.count(node_health_results, &(&1.status == :healthy))
status = cond do healthy_nodes >= expected_nodes -> :healthy healthy_nodes >= div(expected_nodes, 2) -> :degraded true -> :critical end
%{ status: status, connected_nodes: length(connected_nodes), expected_nodes: expected_nodes, healthy_nodes: healthy_nodes, node_health: node_health_results } end
defp check_individual_node_health(node) do start_time = System.monotonic_time(:microsecond)
ping_result = if node == Node.self() do
:pong # Local node always responds
else
Node.ping(node)
end
ping_time_us = System.monotonic_time(:microsecond) - start_time
case ping_result do
:pong ->
%{
node: node,
status: :healthy,
ping_time_us: ping_time_us,
load_avg: get_node_load_average(node),
memory_usage: get_node_memory_usage(node)
}
:pang ->
%{
node: node,
status: :unreachable,
ping_time_us: nil,
load_avg: nil,
memory_usage: nil
}
end
end
Simple RPC calls to get node metrics (with error handling)
defp get_node_load_average(node) do case safe_rpc_call(node, :cpu_sup, :avg1, []) do {:ok, load} -> load / 256 # Convert to percentage _ -> nil end end
defp get_node_memory_usage(node) do case safe_rpc_call(node, :erlang, :memory, [:total]) do {:ok, memory} -> memory _ -> nil end end
defp safe_rpc_call(node, module, function, args) do case :rpc.call(node, module, function, args, 5000) do {:badrpc, _reason} -> {:error, :rpc_failed} result -> {:ok, result} end end
defp get_expected_node_count() do # Try various sources for expected node count System.get_env(“FOUNDATION_EXPECTED_NODES”, “1”) |> String.to_integer() rescue _ -> 1 end
defp determine_overall_status(component_statuses) do cond do Enum.any?(component_statuses, &(&1 == :critical)) -> :critical Enum.any?(component_statuses, &(&1 == :degraded)) -> :degraded true -> :healthy end end
Stub implementations for other health checks
defp check_service_registry_health() do %{status: :healthy, total_registered: 0, healthy: 0, unhealthy: 0, services: []} end
defp check_infrastructure_health() do %{status: :healthy, horde_status: :synchronized, pubsub_status: :healthy} end
defp collect_performance_metrics() do %{ avg_message_latency_ms: 0, message_throughput_per_sec: 0, error_rate_percent: 0.0 } end
defp generate_health_recommendations(_cluster, _services, _infra, _perf) do [] end end
==============================================================================
PATTERN 4: MABEAM Multi-Agent Coordination Patterns
==============================================================================
defmodule Foundation.MABEAM.ProcessManager do @moduledoc """ MABEAM-enhanced Process Manager with intelligent agent coordination.
Extends Foundation.ProcessManager with multi-agent capabilities while maintaining the same leaky abstraction philosophy. """
@doc """ Starts an intelligent agent with coordination capabilities.
What This Function Actually Does
- Validates agent specification and coordination capabilities
- Registers agent with Foundation.MABEAM.AgentRegistry
- Starts agent process using Foundation.ProcessManager
- Configures coordination protocols based on agent capabilities """ def start_coordination_agent(module, args, opts \ []) do capabilities = Keyword.get(opts, :capabilities, []) name = Keyword.get(opts, :name)
agent_config = %{
module: module,
args: args,
coordination_capabilities: capabilities,
supported_protocols: Keyword.get(opts, :protocols, []),
distribution_strategy: Keyword.get(opts, :strategy, :singleton)
}
Foundation.MABEAM.AgentRegistry.register_agent(name || module, agent_config)
end
@doc """ Coordinates resource allocation using auction-based protocol. """ def coordinate_resource_allocation(agent_ids, resources) do variable = %{ id: :resource_allocation, type: :auction, agents: agent_ids, coordination_fn: &auction_coordination_fn/3 }
with {:ok, _} <- Foundation.MABEAM.Core.register_orchestration_variable(variable),
{:ok, result} <- Foundation.MABEAM.Coordination.run_auction(
%{type: :sealed_bid, resources: resources},
agent_ids,
%{timeout: 30_000}
) do
{:ok, result}
end
end
defp auction_coordination_fn(agents, resources, _context) do # Visible auction coordination logic bids = Enum.map(agents, fn agent_id -> case GenServer.call(agent_id, {:bid_for_resources, resources}, 5000) do {:ok, bid} -> {agent_id, bid} _ -> {agent_id, %{bid: 0}} end end)
winner = bids |> Enum.max_by(fn {_agent, bid} -> bid.bid end) |> elem(0)
%{winner: winner, allocation: resources}
end end
==============================================================================
PATTERN 4: MABEAM Multi-Agent Coordination Patterns
==============================================================================
defmodule Foundation.MABEAM.ProcessManager do @moduledoc """ MABEAM-enhanced Process Manager with intelligent agent coordination.
Philosophy: Leaky Multi-Agent Abstractions
This module extends Foundation.ProcessManager with multi-agent capabilities while maintaining the same leaky abstraction philosophy. You can see exactly what coordination protocols are being used and drop down to direct MABEAM APIs when needed.
What This Module Does
- Combines agent lifecycle with coordination capabilities
- Provides clear names for multi-agent patterns (consensus, auction, negotiation)
- Handles complex coordination scenarios with better error messages
What This Module Does NOT Do
- Hide underlying MABEAM.Core or MABEAM.Coordination APIs
- Reimplement any core MABEAM functionality
- Add significant coordination overhead
Examples of “Leakiness”
# Facade: Simple and clear
Foundation.MABEAM.ProcessManager.coordinate_resource_allocation(agents, resources)
# What it actually does (visible in source):
variable = %{id: :resource_allocation, type: :auction, agents: agents}
Foundation.MABEAM.Core.register_orchestration_variable(variable)
Foundation.MABEAM.Coordination.run_auction(auction_spec, agents, context)
# Direct access when you need it:
Foundation.MABEAM.Coordination.run_custom_protocol(custom_spec, agents, context)
"""
require Logger
LEAKY DESIGN: These are the actual MABEAM tools we use, clearly exposed
@mabeam_core Foundation.MABEAM.Core @mabeam_coordination Foundation.MABEAM.Coordination @mabeam_registry Foundation.MABEAM.AgentRegistry
@doc """ Starts an intelligent agent with coordination capabilities.
What This Function Actually Does
- Validates agent specification and coordination capabilities
- Registers agent with Foundation.MABEAM.AgentRegistry
- Starts agent process using Foundation.ProcessManager
- Configures coordination protocols based on agent capabilities
When to Use the Facade vs Direct MABEAM
Use this facade when:
- You want a simple agent with standard coordination capabilities
- You want combined start+register+configure operation
- You want clearer error messages for agent setup
Use MABEAM directly when:
- You need custom agent specifications
- You need advanced coordination protocol configuration
- You’re implementing custom multi-agent patterns
Examples
# Simple coordination agent
{:ok, agent_pid} = Foundation.MABEAM.ProcessManager.start_coordination_agent(
MyApp.ResourceAgent,
[],
capabilities: [:consensus_voting, :auction_bidding]
)
# Agent with custom coordination protocols
{:ok, agent_pid} = Foundation.MABEAM.ProcessManager.start_coordination_agent(
MyApp.OptimizationAgent,
[optimization_target: :minimize_latency],
capabilities: [:auction_bidding, :negotiation],
protocols: [:sealed_bid_auction, :alternating_offers_negotiation]
)
""" @spec start_coordination_agent(module(), list(), keyword()) :: {:ok, pid()} | {:error, term()} def start_coordination_agent(module, args, opts \ []) do capabilities = Keyword.get(opts, :capabilities, []) protocols = Keyword.get(opts, :protocols, []) name = Keyword.get(opts, :name)
Logger.debug("Foundation.MABEAM.ProcessManager: Starting coordination agent #{inspect(name || module)}")
Logger.debug(" Capabilities: #{inspect(capabilities)}")
Logger.debug(" Protocols: #{inspect(protocols)}")
Logger.debug(" Using MABEAM.AgentRegistry: #{@mabeam_registry}")
# LEAKY: We show exactly what agent config we create
agent_config = %{
module: module,
args: args,
coordination_capabilities: capabilities,
supported_protocols: protocols,
distribution_strategy: Keyword.get(opts, :strategy, :singleton),
resource_requirements: Keyword.get(opts, :resources, %{})
}
Logger.debug(" Agent config: #{inspect(agent_config)}")
# LEAKY: We show the exact MABEAM calls we make
case @mabeam_registry.register_agent(name || module, agent_config) do
{:ok, agent_pid} ->
Logger.info("Foundation.MABEAM.ProcessManager: Coordination agent #{name || module} started")
{:ok, agent_pid}
{:error, reason} = error ->
Logger.error("Failed to start coordination agent #{inspect(name || module)}: #{inspect(reason)}")
Logger.error("You can try calling Foundation.MABEAM.AgentRegistry.register_agent/2 directly for more control")
error
end
end
@doc """ Coordinates resource allocation using auction-based protocol.
What This Function Actually Does
This is a facade for a common multi-agent pattern:
- Creates an auction-type orchestration variable
- Registers it with Foundation.MABEAM.Core
- Runs auction coordination protocol
- Returns allocation results
Equivalent MABEAM Calls
# This facade call:
Foundation.MABEAM.ProcessManager.coordinate_resource_allocation(agents, resources)
# Is equivalent to:
variable = %{
id: :resource_allocation,
type: :auction,
agents: agents,
coordination_fn: &auction_coordination/3
}
Foundation.MABEAM.Core.register_orchestration_variable(variable)
Foundation.MABEAM.Coordination.run_auction(auction_spec, agents, context)
""" @spec coordinate_resource_allocation([atom()], map()) :: {:ok, map()} | {:error, term()} def coordinate_resource_allocation(agent_ids, resources) when is_list(agent_ids) and is_map(resources) do Logger.debug(“Foundation.MABEAM.ProcessManager: Coordinating resource allocation”) Logger.debug(" Agents: #{inspect(agent_ids)}") Logger.debug(" Resources: #{inspect(resources)}") Logger.debug(" Using MABEAM.Core: #{@mabeam_core}") Logger.debug(" Using MABEAM.Coordination: #{@mabeam_coordination}")
# LEAKY: We show exactly what orchestration variable we create
variable = %{
id: :resource_allocation,
type: :auction,
agents: agent_ids,
coordination_fn: &auction_coordination_fn/3,
constraints: [
total_resources: resources,
max_allocation_time: 30_000
]
}
Logger.debug(" Orchestration variable: #{inspect(variable)}")
# LEAKY: We show the exact MABEAM calls we make
with {:ok, _} <- @mabeam_core.register_orchestration_variable(variable),
{:ok, result} <- @mabeam_coordination.run_auction(
%{
type: :sealed_bid,
resources: resources,
payment_rule: :second_price
},
agent_ids,
%{timeout: 30_000}
) do
Logger.info("Resource allocation coordination completed successfully")
{:ok, result}
else
{:error, reason} = error ->
Logger.error("Resource allocation coordination failed: #{inspect(reason)}")
Logger.error("You can call Foundation.MABEAM.Coordination.run_auction/3 directly for custom auctions")
error
end
end
LEAKY: Coordination function is visible and modifiable
defp auction_coordination_fn(agents, resources, context) do # Simple auction coordination logic # This is completely visible and can be customized Logger.debug(" Running auction coordination with #{length(agents)} agents")
# Collect bids from all agents
bids = Enum.map(agents, fn agent_id ->
case GenServer.call(agent_id, {:bid_for_resources, resources, context}, 5000) do
{:ok, bid} -> {agent_id, bid}
_ -> {agent_id, %{bid: 0, priority: 0}}
end
end)
# Simple highest-bid-wins allocation
winner = bids
|> Enum.max_by(fn {_agent, bid} -> bid.bid end)
|> elem(0)
%{
winner: winner,
allocation: resources,
payment: calculate_second_price_payment(bids),
auction_type: :sealed_bid
}
end
defp calculate_second_price_payment(bids) do sorted_bids = bids |> Enum.map(fn {_agent, bid} -> bid.bid end) |> Enum.sort(:desc)
case sorted_bids do
[_highest, second | _] -> second
[only_bid] -> only_bid
[] -> 0
end
end
@doc """ Coordinates consensus decision among agents.
Leaky Implementation
This shows exactly how consensus coordination works and allows you to drop down to direct MABEAM APIs when needed. """ @spec coordinate_consensus([atom()], term()) :: {:ok, term()} | {:error, term()} def coordinate_consensus(agent_ids, proposal) do Logger.debug(“Foundation.MABEAM.ProcessManager: Coordinating consensus”) Logger.debug(" Agents: #{inspect(agent_ids)}") Logger.debug(" Proposal: #{inspect(proposal)}")
# LEAKY: Direct call to MABEAM.Coordination with full transparency
case @mabeam_coordination.coordinate_consensus(agent_ids, proposal, %{
consensus_type: :majority,
timeout: 15_000,
required_votes: length(agent_ids)
}) do
{:ok, result} ->
Logger.info("Consensus achieved: #{inspect(result.consensus)}")
{:ok, result}
{:error, reason} = error ->
Logger.error("Consensus failed: #{inspect(reason)}")
Logger.error("Try Foundation.MABEAM.Coordination.coordinate_consensus/3 for custom consensus")
error
end
end end