Phoenix: Communication Protocols and Message Patterns
Date: 2025-07-12
Version: 1.0
Series: Distributed Agent System - Part 2 (Communication)
Executive Summary
This document specifies the communication protocols and message patterns for the Phoenix distributed agent system. Drawing from modern distributed systems research, BEAM ecosystem capabilities, and battle-tested production patterns, Phoenix implements a multi-layered communication architecture that adapts to different network conditions, consistency requirements, and performance objectives.
Key Innovation: Phoenix uses adaptive protocol selection where the system automatically chooses the optimal communication pattern based on message characteristics, network topology, and consistency requirements.
Table of Contents
- Communication Architecture Overview
- Message Format Specification
- Transport Layer Protocols
- Communication Patterns
- Delivery Guarantees
- Flow Control and Backpressure
- Security and Authentication
- Performance Optimization
Communication Architecture Overview
Multi-Layer Protocol Stack
Phoenix Communication Stack
┌─────────────────────────────────────────────────────┐
│ Application Layer │
│ ├── Agent-to-Agent Messages │
│ ├── Control Plane Commands │
│ └── Observability Events │
├─────────────────────────────────────────────────────┤
│ Routing Layer │
│ ├── Message Router (topology-aware) │
│ ├── Load Balancer (adaptive) │
│ └── Circuit Breaker (fault isolation) │
├─────────────────────────────────────────────────────┤
│ Protocol Adaptation Layer │
│ ├── Protocol Selector (automatic) │
│ ├── Format Converter (serialization) │
│ └── Compression Manager (efficiency) │
├─────────────────────────────────────────────────────┤
│ Transport Layer │
│ ├── Distributed Erlang (local cluster) │
│ ├── Partisan (large scale) │
│ ├── HTTP/2 (cross-DC) │
│ └── QUIC (low latency) │
├─────────────────────────────────────────────────────┤
│ Network Layer │
│ ├── TCP/IP (reliable) │
│ ├── UDP (unreliable, fast) │
│ └── TLS (secure) │
└─────────────────────────────────────────────────────┘
Protocol Selection Matrix
Scenario | Distance | Size | Consistency | Protocol | Transport |
---|---|---|---|---|---|
Local Cluster | Same DC | Any | Any | Distributed Erlang | TCP |
Large Cluster | Same DC | Any | Any | Partisan | TCP |
Cross-DC | Multi-DC | Any | Any | HTTP/2 | TLS/TCP |
Mobile/Edge | Anywhere | Small | Eventual | QUIC | UDP |
Real-time | Same Region | Small | Causal | Custom UDP | UDP |
Bulk Transfer | Anywhere | Large | Eventual | HTTP/2 Streaming | TCP |
Communication Patterns
defmodule Phoenix.Communication.Patterns do
@moduledoc """
Fundamental communication patterns in Phoenix:
1. Point-to-Point: Direct agent communication
2. Publish-Subscribe: Event distribution
3. Request-Response: Synchronous interaction
4. Pipeline: Ordered processing chains
5. Gossip: Eventually consistent broadcast
6. Consensus: Strongly consistent coordination
"""
@type pattern ::
:point_to_point |
:publish_subscribe |
:request_response |
:pipeline |
:gossip |
:consensus
@type message_characteristics :: %{
size: :small | :medium | :large,
priority: :low | :normal | :high | :critical,
consistency: :eventual | :causal | :strong,
ordering: :none | :causal | :total,
delivery: :at_most_once | :at_least_once | :exactly_once
}
end
Message Format Specification
Universal Message Format
Phoenix extends CloudEvents v1.0.2 with distributed systems extensions:
defmodule Phoenix.Message do
@moduledoc """
Universal message format for Phoenix distributed agents.
Based on CloudEvents v1.0.2 with Phoenix extensions for:
- Distributed tracing and correlation
- Causal consistency tracking
- Delivery guarantee specifications
- Routing and placement hints
"""
use TypedStruct
typedstruct do
# CloudEvents v1.0.2 Core Fields
field :specversion, String.t(), default: "1.0.2"
field :id, String.t()
field :source, String.t()
field :type, String.t()
field :subject, String.t()
field :time, DateTime.t()
field :datacontenttype, String.t(), default: "application/x-erlang-term"
field :dataschema, String.t()
field :data, term()
# Phoenix Distributed Extensions
field :phoenix_trace, Phoenix.Trace.Context.t()
field :phoenix_causality, Phoenix.VectorClock.t()
field :phoenix_delivery, Phoenix.Delivery.Guarantee.t()
field :phoenix_routing, Phoenix.Routing.Metadata.t()
field :phoenix_security, Phoenix.Security.Context.t()
field :phoenix_qos, Phoenix.QoS.Requirements.t()
end
def new(data, opts \\ []) do
%__MODULE__{
id: generate_message_id(),
source: Keyword.get(opts, :source, Phoenix.Node.local_id()),
type: Keyword.get(opts, :type, "phoenix.agent.message"),
time: DateTime.utc_now(),
data: data,
phoenix_trace: Phoenix.Trace.current_context(),
phoenix_causality: Phoenix.VectorClock.tick(Phoenix.VectorClock.local()),
phoenix_delivery: Keyword.get(opts, :delivery, default_delivery_guarantee()),
phoenix_routing: build_routing_metadata(opts),
phoenix_security: Phoenix.Security.current_context(),
phoenix_qos: Keyword.get(opts, :qos, default_qos_requirements())
}
end
end
Message Type Taxonomy
defmodule Phoenix.Message.Types do
@moduledoc """
Standardized message type taxonomy for Phoenix agents.
Type format: phoenix.{domain}.{entity}.{action}[.{qualifier}]
"""
# Control Plane Messages
@control_plane [
"phoenix.cluster.node.join",
"phoenix.cluster.node.leave",
"phoenix.cluster.topology.update",
"phoenix.agent.placement.request",
"phoenix.agent.placement.response",
"phoenix.agent.migration.start",
"phoenix.agent.migration.complete"
]
# Data Plane Messages
@data_plane [
"phoenix.agent.message.send",
"phoenix.agent.state.update",
"phoenix.agent.action.execute",
"phoenix.agent.action.result",
"phoenix.agent.event.publish",
"phoenix.agent.heartbeat.ping"
]
# Observability Messages
@observability [
"phoenix.telemetry.metric.update",
"phoenix.telemetry.trace.span",
"phoenix.telemetry.log.entry",
"phoenix.health.status.report",
"phoenix.performance.measurement"
]
def message_characteristics(message_type) do
cond do
message_type in @control_plane -> %{
priority: :high,
consistency: :strong,
ordering: :total,
delivery: :exactly_once
}
message_type in @data_plane -> %{
priority: :normal,
consistency: :causal,
ordering: :causal,
delivery: :at_least_once
}
message_type in @observability -> %{
priority: :low,
consistency: :eventual,
ordering: :none,
delivery: :at_most_once
}
true -> default_characteristics()
end
end
end
Serialization and Compression
defmodule Phoenix.Message.Serialization do
@moduledoc """
Multi-format serialization with automatic format selection.
Formats:
- Erlang Term: Zero-copy within BEAM, type preservation
- MessagePack: Compact binary, cross-language
- Protocol Buffers: Schema evolution, efficient
- JSON: Human readable, debugging
"""
@type format :: :erlang_term | :msgpack | :protobuf | :json
def serialize(message, format \\ :auto) do
selected_format = select_format(message, format)
case selected_format do
:erlang_term ->
compressed_term = :erlang.term_to_binary(message, [:compressed])
{:ok, compressed_term, :erlang_term}
:msgpack ->
case Msgpax.pack(message) do
{:ok, binary} -> {:ok, binary, :msgpack}
{:error, reason} -> {:error, reason}
end
:protobuf ->
case PhoenixProto.Message.encode(message) do
{:ok, binary} -> {:ok, binary, :protobuf}
{:error, reason} -> {:error, reason}
end
:json ->
case Jason.encode(message) do
{:ok, json} -> {:ok, json, :json}
{:error, reason} -> {:error, reason}
end
end
end
defp select_format(message, :auto) do
cond do
# Local BEAM communication - use Erlang terms
same_beam_cluster?(message.phoenix_routing.target) ->
:erlang_term
# Cross-language communication - use MessagePack
cross_language?(message.phoenix_routing.target) ->
:msgpack
# Schema evolution important - use Protocol Buffers
schema_evolution_required?(message.type) ->
:protobuf
# Debugging or development - use JSON
debug_mode?() ->
:json
# Default to MessagePack for efficiency
true ->
:msgpack
end
end
defp select_format(_message, format) when format in [:erlang_term, :msgpack, :protobuf, :json] do
format
end
end
Transport Layer Protocols
1. Distributed Erlang Transport
defmodule Phoenix.Transport.DistributedErlang do
@moduledoc """
Distributed Erlang transport for local cluster communication.
Advantages:
- Zero-copy message passing
- Automatic process monitoring
- Built-in clustering
- Type preservation
Use Cases:
- Single datacenter clusters
- Development environments
- Small to medium scale (< 50 nodes)
"""
@behaviour Phoenix.Transport
def send_message(target, message, opts \\ []) do
case resolve_target(target) do
{:ok, {:local, pid}} ->
send(pid, message)
{:ok, :sent}
{:ok, {:remote, node, pid}} ->
case Node.ping(node) do
:pong ->
send({pid, node}, message)
{:ok, :sent}
:pang ->
{:error, :node_unreachable}
end
{:error, reason} ->
{:error, reason}
end
end
def broadcast_message(targets, message, opts \\ []) do
delivery_mode = Keyword.get(opts, :delivery_mode, :async)
case delivery_mode do
:async ->
# Fire and forget to all targets
Enum.each(targets, fn target ->
send_message(target, message, opts)
end)
{:ok, :broadcast}
:sync ->
# Wait for acknowledgment from all
sync_broadcast(targets, message, opts)
end
end
defp sync_broadcast(targets, message, opts) do
timeout = Keyword.get(opts, :timeout, 5000)
# Send to all targets with correlation ID
correlation_id = generate_correlation_id()
ack_message = add_ack_request(message, correlation_id)
Enum.each(targets, fn target ->
send_message(target, ack_message, opts)
end)
# Wait for acknowledgments
wait_for_acks(targets, correlation_id, timeout)
end
end
2. Partisan Transport
defmodule Phoenix.Transport.Partisan do
@moduledoc """
Partisan transport for large-scale cluster communication.
Advantages:
- Scales beyond Distributed Erlang limits
- Pluggable overlay networks
- Advanced failure detection
- Message batching and compression
Use Cases:
- Large clusters (100+ nodes)
- Cross-datacenter communication
- High-throughput scenarios
"""
@behaviour Phoenix.Transport
def send_message(target, message, opts \\ []) do
# Use Partisan's overlay network
partisan_message = wrap_for_partisan(message, opts)
case Partisan.send_message(target, partisan_message) do
:ok -> {:ok, :sent}
{:error, reason} -> {:error, reason}
end
end
def configure_overlay(overlay_type, opts \\ []) do
case overlay_type do
:hyparview ->
# HyParView: Probabilistic broadcast
Partisan.configure_overlay(:hyparview, [
fanout: Keyword.get(opts, :fanout, 6),
active_view_size: Keyword.get(opts, :active_view_size, 6),
passive_view_size: Keyword.get(opts, :passive_view_size, 30)
])
:client_server ->
# Client-Server: Traditional hub-and-spoke
Partisan.configure_overlay(:client_server, [
servers: Keyword.get(opts, :servers, []),
connection_pool_size: Keyword.get(opts, :pool_size, 10)
])
:tree ->
# Tree: Hierarchical topology
Partisan.configure_overlay(:tree, [
fanout: Keyword.get(opts, :tree_fanout, 4),
depth: Keyword.get(opts, :max_depth, 6)
])
end
end
defp wrap_for_partisan(message, opts) do
%Partisan.Message{
payload: message,
metadata: %{
priority: get_priority(opts),
compression: should_compress?(message, opts),
encryption: should_encrypt?(message, opts)
}
}
end
end
3. HTTP/2 Transport
defmodule Phoenix.Transport.HTTP2 do
@moduledoc """
HTTP/2 transport for cross-datacenter and hybrid cloud communication.
Advantages:
- Works across firewalls and NAT
- Built-in TLS encryption
- Stream multiplexing
- Standardized protocol
Use Cases:
- Cross-datacenter communication
- Hybrid cloud deployments
- Internet-facing endpoints
"""
@behaviour Phoenix.Transport
def send_message(target, message, opts \\ []) do
url = build_target_url(target)
headers = build_headers(message, opts)
body = Phoenix.Message.Serialization.serialize(message, :json)
case Finch.request(
Finch.build(:post, url, headers, body),
PhoenixHTTP2Pool,
receive_timeout: Keyword.get(opts, :timeout, 30_000)
) do
{:ok, %Finch.Response{status: 200}} ->
{:ok, :sent}
{:ok, %Finch.Response{status: status}} ->
{:error, {:http_error, status}}
{:error, reason} ->
{:error, reason}
end
end
def start_http2_server(port, opts \\ []) do
# Phoenix HTTP/2 server for receiving messages
Phoenix.HTTP2Server.start_link([
port: port,
transport_options: [
max_connections: Keyword.get(opts, :max_connections, 1000),
num_acceptors: Keyword.get(opts, :num_acceptors, 100)
],
protocol_options: [
max_frame_size: Keyword.get(opts, :max_frame_size, 16_384),
max_concurrent_streams: Keyword.get(opts, :max_streams, 100)
]
])
end
defp build_headers(message, opts) do
base_headers = [
{"content-type", "application/json"},
{"user-agent", "Phoenix-Agent/1.0"},
{"x-phoenix-message-id", message.id},
{"x-phoenix-message-type", message.type}
]
# Add tracing headers
trace_headers = Phoenix.Trace.to_headers(message.phoenix_trace)
# Add authentication headers
auth_headers = Phoenix.Auth.to_headers(message.phoenix_security)
base_headers ++ trace_headers ++ auth_headers
end
end
4. QUIC Transport
defmodule Phoenix.Transport.QUIC do
@moduledoc """
QUIC transport for low-latency and mobile communication.
Advantages:
- Reduced connection establishment time
- Built-in multiplexing
- Connection migration support
- Optimized for mobile networks
Use Cases:
- Mobile agent clients
- Edge computing scenarios
- Low-latency requirements
- Unreliable networks
"""
@behaviour Phoenix.Transport
def send_message(target, message, opts \\ []) do
connection = get_or_create_connection(target, opts)
# Serialize message for QUIC
serialized = Phoenix.Message.Serialization.serialize(message, :msgpack)
case QUICEx.send_datagram(connection, serialized) do
:ok -> {:ok, :sent}
{:error, reason} -> {:error, reason}
end
end
def configure_quic_connection(target, opts \\ []) do
connection_opts = [
# Connection settings
idle_timeout: Keyword.get(opts, :idle_timeout, 30_000),
keep_alive: Keyword.get(opts, :keep_alive, 15_000),
# Performance settings
initial_rtt: Keyword.get(opts, :initial_rtt, 100),
congestion_control: Keyword.get(opts, :congestion_control, :cubic),
# Security settings
verify_peer: Keyword.get(opts, :verify_peer, true),
alpn_protocols: ["phoenix-agent/1.0"]
]
QUICEx.connect(target, connection_opts)
end
defp get_or_create_connection(target, opts) do
case Phoenix.ConnectionPool.get(target) do
{:ok, connection} -> connection
{:error, :not_found} ->
{:ok, connection} = configure_quic_connection(target, opts)
Phoenix.ConnectionPool.put(target, connection)
connection
end
end
end
Communication Patterns
1. Point-to-Point Communication
defmodule Phoenix.Patterns.PointToPoint do
@moduledoc """
Direct agent-to-agent communication pattern.
Characteristics:
- Low latency
- Direct routing
- Strong delivery guarantees
- Optional encryption
"""
def send_direct(source_agent, target_agent, message, opts \\ []) do
# Build direct message with routing metadata
direct_message = Phoenix.Message.new(message, [
type: "phoenix.agent.direct.send",
source: source_agent,
subject: target_agent,
delivery: Keyword.get(opts, :delivery, :at_least_once),
routing: %Phoenix.Routing.Metadata{
pattern: :point_to_point,
target: target_agent,
priority: Keyword.get(opts, :priority, :normal)
}
])
# Route directly to target
Phoenix.MessageRouter.route_message(direct_message, opts)
end
def request_response(source_agent, target_agent, request, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 5000)
correlation_id = generate_correlation_id()
# Send request with correlation ID
request_message = Phoenix.Message.new(request, [
type: "phoenix.agent.request.send",
source: source_agent,
subject: target_agent,
correlation_id: correlation_id
])
# Set up response handler
response_handler = start_response_handler(correlation_id, timeout)
# Send request
case Phoenix.MessageRouter.route_message(request_message, opts) do
{:ok, :sent} ->
wait_for_response(response_handler, timeout)
{:error, reason} ->
cleanup_response_handler(response_handler)
{:error, reason}
end
end
end
2. Publish-Subscribe Pattern
defmodule Phoenix.Patterns.PubSub do
@moduledoc """
Event-driven publish-subscribe communication pattern.
Features:
- Topic-based routing
- Pattern matching subscriptions
- Message fanout
- Backpressure handling
"""
def publish(topic, event, opts \\ []) do
# Create publication message
pub_message = Phoenix.Message.new(event, [
type: "phoenix.pubsub.publish",
subject: topic,
routing: %Phoenix.Routing.Metadata{
pattern: :publish_subscribe,
topic: topic,
fanout_strategy: Keyword.get(opts, :fanout, :immediate)
}
])
# Route to all subscribers
Phoenix.PubSub.publish(topic, pub_message, opts)
end
def subscribe(agent_id, topic_pattern, opts \\ []) do
subscription = %Phoenix.PubSub.Subscription{
subscriber: agent_id,
topic_pattern: topic_pattern,
message_filter: Keyword.get(opts, :filter),
delivery_guarantee: Keyword.get(opts, :delivery, :at_most_once),
backpressure_strategy: Keyword.get(opts, :backpressure, :drop_oldest)
}
Phoenix.PubSub.subscribe(subscription)
end
def unsubscribe(agent_id, topic_pattern) do
Phoenix.PubSub.unsubscribe(agent_id, topic_pattern)
end
end
3. Pipeline Pattern
defmodule Phoenix.Patterns.Pipeline do
@moduledoc """
Ordered processing pipeline pattern.
Features:
- Sequential processing stages
- Ordered message delivery
- Stage-specific error handling
- Dynamic pipeline reconfiguration
"""
defstruct [
:id,
:stages,
:error_strategy,
:backpressure_strategy,
:monitoring
]
def create_pipeline(stages, opts \\ []) do
pipeline = %__MODULE__{
id: generate_pipeline_id(),
stages: stages,
error_strategy: Keyword.get(opts, :error_strategy, :retry),
backpressure_strategy: Keyword.get(opts, :backpressure, :buffer),
monitoring: Keyword.get(opts, :monitoring, true)
}
# Start pipeline coordinator
{:ok, coordinator} = Phoenix.Pipeline.Coordinator.start_link(pipeline)
# Register pipeline stages
Enum.each(stages, fn stage ->
Phoenix.Pipeline.register_stage(coordinator, stage)
end)
{:ok, pipeline}
end
def send_to_pipeline(pipeline_id, message, opts \\ []) do
pipeline_message = Phoenix.Message.new(message, [
type: "phoenix.pipeline.process",
routing: %Phoenix.Routing.Metadata{
pattern: :pipeline,
pipeline_id: pipeline_id,
ordering: :sequential
}
])
Phoenix.Pipeline.send_message(pipeline_id, pipeline_message, opts)
end
end
4. Gossip Protocol
defmodule Phoenix.Patterns.Gossip do
@moduledoc """
Eventually consistent gossip communication pattern.
Features:
- Epidemic-style message propagation
- Redundant transmission for reliability
- Convergence guarantees
- Network partition tolerance
"""
defstruct [
:protocol_version,
:fanout,
:round_duration,
:message_ttl,
:compression_enabled
]
def start_gossip_protocol(opts \\ []) do
config = %__MODULE__{
protocol_version: Keyword.get(opts, :version, "1.0"),
fanout: Keyword.get(opts, :fanout, 3),
round_duration: Keyword.get(opts, :round_duration, 1000),
message_ttl: Keyword.get(opts, :message_ttl, 30_000),
compression_enabled: Keyword.get(opts, :compression, true)
}
Phoenix.Gossip.Protocol.start_link(config)
end
def gossip_message(message, opts \\ []) do
gossip_envelope = %Phoenix.Gossip.Envelope{
id: generate_gossip_id(),
payload: message,
ttl: calculate_ttl(opts),
hop_count: 0,
origin_node: Phoenix.Node.local_id(),
timestamp: Phoenix.VectorClock.now()
}
Phoenix.Gossip.initiate_gossip(gossip_envelope, opts)
end
def handle_gossip_reception(envelope) do
# Check if message already seen
case Phoenix.Gossip.MessageCache.seen?(envelope.id) do
true ->
# Already processed, ignore
:ignored
false ->
# New message, process and forward
Phoenix.Gossip.MessageCache.mark_seen(envelope.id)
process_gossip_message(envelope)
forward_gossip_message(envelope)
:processed
end
end
end
5. Consensus Protocol
defmodule Phoenix.Patterns.Consensus do
@moduledoc """
Strongly consistent consensus communication pattern.
Features:
- Raft consensus algorithm
- Leader election
- Log replication
- Strong consistency guarantees
"""
def propose_value(value, opts \\ []) do
# Create consensus proposal
proposal = %Phoenix.Consensus.Proposal{
id: generate_proposal_id(),
value: value,
proposer: Phoenix.Node.local_id(),
timestamp: DateTime.utc_now()
}
# Submit to Raft cluster
case Phoenix.Raft.propose(proposal, opts) do
{:ok, :committed} -> {:ok, proposal.id}
{:ok, :queued} -> {:ok, :queued, proposal.id}
{:error, :not_leader} -> {:error, :not_leader}
{:error, reason} -> {:error, reason}
end
end
def read_consensus_value(key, opts \\ []) do
consistency = Keyword.get(opts, :consistency, :strong)
case consistency do
:strong ->
# Read from current leader
Phoenix.Raft.read_from_leader(key, opts)
:eventual ->
# Read from any replica
Phoenix.Raft.read_from_replica(key, opts)
:session ->
# Read your own writes
Phoenix.Raft.read_with_session_consistency(key, opts)
end
end
end
Delivery Guarantees
Guarantee Specifications
defmodule Phoenix.Delivery.Guarantee do
@moduledoc """
Configurable delivery guarantee system for Phoenix messages.
Guarantees:
- at_most_once: Fire-and-forget (best performance)
- at_least_once: Retry until success (reliability)
- exactly_once: Idempotent delivery (correctness)
- causal_order: Respects causal dependencies
- total_order: Global ordering (expensive)
"""
use TypedStruct
typedstruct do
field :delivery_mode, atom(), default: :at_least_once
field :timeout, pos_integer(), default: 5000
field :retry_strategy, atom(), default: :exponential_backoff
field :max_retries, pos_integer(), default: 3
field :ordering_requirement, atom(), default: :none
field :acknowledgment_required, boolean(), default: true
field :duplicate_detection, boolean(), default: false
end
def build(delivery_mode, opts \\ []) do
%__MODULE__{
delivery_mode: delivery_mode,
timeout: Keyword.get(opts, :timeout, 5000),
retry_strategy: Keyword.get(opts, :retry_strategy, :exponential_backoff),
max_retries: Keyword.get(opts, :max_retries, 3),
ordering_requirement: Keyword.get(opts, :ordering, :none),
acknowledgment_required: Keyword.get(opts, :ack_required, true),
duplicate_detection: Keyword.get(opts, :duplicate_detection, false)
}
end
end
At-Most-Once Delivery
defmodule Phoenix.Delivery.AtMostOnce do
@moduledoc """
Fire-and-forget delivery for high-performance scenarios.
Characteristics:
- Lowest latency
- Highest throughput
- No delivery guarantees
- No message ordering
"""
def deliver(target, message, opts \\ []) do
# Send message without waiting for acknowledgment
Phoenix.Transport.send_message(target, message, [
delivery_mode: :fire_and_forget,
timeout: Keyword.get(opts, :timeout, 1000)
])
# Return immediately
{:ok, :sent}
end
end
At-Least-Once Delivery
defmodule Phoenix.Delivery.AtLeastOnce do
@moduledoc """
Reliable delivery with retry logic.
Characteristics:
- Guaranteed delivery (if target eventually available)
- Possible duplicates
- Configurable retry strategies
- Exponential backoff with jitter
"""
def deliver(target, message, guarantee) do
message_id = message.id
# Add delivery tracking
tracked_message = add_delivery_tracking(message, guarantee)
# Start retry loop
retry_delivery(target, tracked_message, guarantee, 0)
end
defp retry_delivery(target, message, guarantee, attempt) do
case Phoenix.Transport.send_message(target, message) do
{:ok, :sent} ->
# Wait for acknowledgment
case wait_for_ack(message.id, guarantee.timeout) do
{:ok, :ack_received} ->
{:ok, :delivered}
{:error, :timeout} ->
handle_retry(target, message, guarantee, attempt)
end
{:error, reason} ->
handle_retry(target, message, guarantee, attempt)
end
end
defp handle_retry(target, message, guarantee, attempt) do
if attempt >= guarantee.max_retries do
{:error, :max_retries_exceeded}
else
backoff_time = calculate_backoff(guarantee.retry_strategy, attempt)
:timer.sleep(backoff_time)
retry_delivery(target, message, guarantee, attempt + 1)
end
end
end
Exactly-Once Delivery
defmodule Phoenix.Delivery.ExactlyOnce do
@moduledoc """
Idempotent delivery using distributed coordination.
Characteristics:
- No duplicates
- Requires coordination overhead
- Uses distributed locks or consensus
- Higher latency
"""
def deliver(target, message, guarantee) do
delivery_id = generate_delivery_id(message)
# Acquire distributed lock for delivery
case Phoenix.DistributedLock.acquire(delivery_id, guarantee.timeout) do
{:ok, lock} ->
try do
perform_exactly_once_delivery(target, message, delivery_id, guarantee)
after
Phoenix.DistributedLock.release(lock)
end
{:error, :lock_timeout} ->
{:error, :coordination_timeout}
{:error, :already_delivered} ->
{:ok, :already_delivered}
end
end
defp perform_exactly_once_delivery(target, message, delivery_id, guarantee) do
# Check if already delivered
case Phoenix.DeliveryLog.check_delivery(delivery_id) do
{:ok, :already_delivered} ->
{:ok, :already_delivered}
{:ok, :not_delivered} ->
# Perform delivery
case Phoenix.Transport.send_message(target, message) do
{:ok, :sent} ->
# Record successful delivery
Phoenix.DeliveryLog.record_delivery(delivery_id)
{:ok, :delivered}
{:error, reason} ->
{:error, reason}
end
end
end
end
Flow Control and Backpressure
Adaptive Flow Control
defmodule Phoenix.FlowControl do
@moduledoc """
Adaptive flow control system preventing message queue overflow.
Strategies:
- Token bucket: Rate limiting
- Sliding window: Adaptive throttling
- Circuit breaker: Failure isolation
- Backpressure propagation: System-wide coordination
"""
defstruct [
:strategy,
:max_queue_size,
:high_watermark,
:low_watermark,
:backpressure_strategy
]
def apply_flow_control(target, message, flow_control) do
current_queue_size = get_queue_size(target)
cond do
current_queue_size >= flow_control.max_queue_size ->
handle_queue_overflow(target, message, flow_control)
current_queue_size >= flow_control.high_watermark ->
apply_backpressure(target, message, flow_control)
current_queue_size <= flow_control.low_watermark ->
release_backpressure(target)
send_message(target, message)
true ->
send_message(target, message)
end
end
defp handle_queue_overflow(target, message, flow_control) do
case flow_control.backpressure_strategy do
:drop_oldest ->
drop_oldest_message(target)
send_message(target, message)
:drop_newest ->
{:error, :queue_full}
:reject ->
{:error, :backpressure_active}
:spill_to_disk ->
spill_message_to_disk(target, message)
:redirect ->
redirect_to_alternative_target(message, flow_control)
end
end
end
Token Bucket Rate Limiting
defmodule Phoenix.RateLimit.TokenBucket do
@moduledoc """
Token bucket rate limiting for message sending.
Features:
- Configurable bucket size
- Configurable refill rate
- Burst allowance
- Distributed token sharing
"""
defstruct [
:bucket_id,
:capacity,
:current_tokens,
:refill_rate,
:last_refill,
:distributed
]
def create_bucket(bucket_id, capacity, refill_rate, opts \\ []) do
bucket = %__MODULE__{
bucket_id: bucket_id,
capacity: capacity,
current_tokens: capacity,
refill_rate: refill_rate,
last_refill: System.monotonic_time(:millisecond),
distributed: Keyword.get(opts, :distributed, false)
}
# Register bucket
Phoenix.RateLimit.Registry.register_bucket(bucket)
{:ok, bucket}
end
def acquire_tokens(bucket_id, tokens_needed) do
case Phoenix.RateLimit.Registry.get_bucket(bucket_id) do
{:ok, bucket} ->
# Refill tokens based on elapsed time
updated_bucket = refill_tokens(bucket)
if updated_bucket.current_tokens >= tokens_needed do
# Grant tokens
new_bucket = %{updated_bucket |
current_tokens: updated_bucket.current_tokens - tokens_needed
}
Phoenix.RateLimit.Registry.update_bucket(bucket_id, new_bucket)
{:ok, :tokens_acquired}
else
# Insufficient tokens
{:error, :insufficient_tokens, updated_bucket.current_tokens}
end
{:error, :not_found} ->
{:error, :bucket_not_found}
end
end
defp refill_tokens(bucket) do
now = System.monotonic_time(:millisecond)
elapsed = now - bucket.last_refill
# Calculate tokens to add
tokens_to_add = (elapsed * bucket.refill_rate) / 1000
new_token_count = min(
bucket.capacity,
bucket.current_tokens + tokens_to_add
)
%{bucket |
current_tokens: new_token_count,
last_refill: now
}
end
end
Security and Authentication
Message-Level Security
defmodule Phoenix.Security do
@moduledoc """
Message-level security for Phoenix communication.
Features:
- Message authentication
- Payload encryption
- Digital signatures
- Key rotation
"""
def secure_message(message, security_context) do
# Add authentication
authenticated_message = add_authentication(message, security_context)
# Encrypt payload if required
encrypted_message = maybe_encrypt_payload(authenticated_message, security_context)
# Add digital signature
signed_message = add_digital_signature(encrypted_message, security_context)
{:ok, signed_message}
end
def verify_message(message, security_context) do
with {:ok, _} <- verify_digital_signature(message, security_context),
{:ok, decrypted_message} <- maybe_decrypt_payload(message, security_context),
{:ok, _} <- verify_authentication(decrypted_message, security_context) do
{:ok, decrypted_message}
else
{:error, reason} -> {:error, reason}
end
end
defp add_authentication(message, context) do
auth_header = %{
agent_id: context.agent_id,
timestamp: DateTime.utc_now(),
nonce: generate_nonce(),
signature: calculate_hmac(message, context.auth_key)
}
put_in(message.phoenix_security.authentication, auth_header)
end
defp add_digital_signature(message, context) do
message_hash = calculate_message_hash(message)
signature = :crypto.sign(:ecdsa, :sha256, message_hash, context.private_key)
put_in(message.phoenix_security.signature, signature)
end
end
Transport-Level Security
defmodule Phoenix.Security.Transport do
@moduledoc """
Transport-level security configuration.
Features:
- TLS configuration
- Certificate management
- Mutual authentication
- Perfect forward secrecy
"""
def configure_tls(transport, opts \\ []) do
tls_config = [
# Certificate configuration
certfile: Keyword.get(opts, :certfile, default_cert_path()),
keyfile: Keyword.get(opts, :keyfile, default_key_path()),
cacertfile: Keyword.get(opts, :cacertfile, default_ca_path()),
# Security settings
verify: Keyword.get(opts, :verify, :verify_peer),
versions: Keyword.get(opts, :versions, [:"tlsv1.3", :"tlsv1.2"]),
ciphers: Keyword.get(opts, :ciphers, secure_ciphers()),
# Perfect forward secrecy
honor_cipher_order: true,
secure_renegotiate: true,
# ALPN for protocol negotiation
alpn_preferred_protocols: ["phoenix-agent/1.0"]
]
Phoenix.Transport.configure_security(transport, tls_config)
end
defp secure_ciphers do
[
# TLS 1.3 cipher suites
"TLS_AES_256_GCM_SHA384",
"TLS_CHACHA20_POLY1305_SHA256",
"TLS_AES_128_GCM_SHA256",
# TLS 1.2 cipher suites (fallback)
"ECDHE-ECDSA-AES256-GCM-SHA384",
"ECDHE-RSA-AES256-GCM-SHA384",
"ECDHE-ECDSA-CHACHA20-POLY1305",
"ECDHE-RSA-CHACHA20-POLY1305"
]
end
end
Performance Optimization
Message Batching
defmodule Phoenix.Optimization.Batching do
@moduledoc """
Message batching for improved throughput.
Strategies:
- Time-based batching
- Size-based batching
- Priority-aware batching
- Adaptive batching
"""
defstruct [
:batch_id,
:messages,
:max_batch_size,
:max_wait_time,
:created_at,
:priority_distribution
]
def start_batching(target, opts \\ []) do
batch_config = %__MODULE__{
batch_id: generate_batch_id(),
messages: [],
max_batch_size: Keyword.get(opts, :max_size, 100),
max_wait_time: Keyword.get(opts, :max_wait, 10),
created_at: System.monotonic_time(:millisecond),
priority_distribution: %{}
}
Phoenix.Batcher.start_batch(target, batch_config)
end
def add_to_batch(target, message, opts \\ []) do
case Phoenix.Batcher.add_message(target, message) do
{:ok, :added} ->
{:ok, :batched}
{:ok, :batch_full} ->
Phoenix.Batcher.flush_batch(target)
Phoenix.Batcher.add_message(target, message)
{:error, reason} ->
{:error, reason}
end
end
def flush_batch(target, reason \\ :manual) do
case Phoenix.Batcher.get_batch(target) do
{:ok, batch} when length(batch.messages) > 0 ->
# Create batch message
batch_message = create_batch_message(batch, reason)
# Send batch
Phoenix.Transport.send_message(target, batch_message)
# Clear batch
Phoenix.Batcher.clear_batch(target)
{:ok, _empty_batch} ->
:ok
{:error, reason} ->
{:error, reason}
end
end
end
Connection Pooling
defmodule Phoenix.ConnectionPool do
@moduledoc """
Connection pooling for transport efficiency.
Features:
- Per-target connection pools
- Connection health monitoring
- Automatic connection recycling
- Load balancing across connections
"""
def get_connection(target, opts \\ []) do
pool_name = build_pool_name(target)
case :poolboy.checkout(pool_name, true, 5000) do
:full ->
{:error, :pool_exhausted}
connection ->
case verify_connection_health(connection) do
:ok -> {:ok, connection}
{:error, reason} ->
:poolboy.checkin(pool_name, connection)
create_new_connection(target, opts)
end
end
end
def return_connection(target, connection) do
pool_name = build_pool_name(target)
:poolboy.checkin(pool_name, connection)
end
def start_pool(target, opts \\ []) do
pool_config = [
name: {:local, build_pool_name(target)},
worker_module: Phoenix.Connection.Worker,
size: Keyword.get(opts, :pool_size, 10),
max_overflow: Keyword.get(opts, :max_overflow, 5)
]
worker_args = [
target: target,
connect_opts: Keyword.get(opts, :connect_opts, [])
]
:poolboy.start_link(pool_config, worker_args)
end
end
Summary
Phoenix’s communication infrastructure provides a comprehensive foundation for distributed agent systems:
Key Features Delivered
- Multi-Protocol Transport: Automatic protocol selection for optimal performance
- Flexible Delivery Guarantees: From fire-and-forget to exactly-once delivery
- Advanced Communication Patterns: Point-to-point, pub-sub, pipelines, gossip, consensus
- Flow Control: Adaptive backpressure and rate limiting
- Security: Message-level and transport-level protection
- Performance Optimization: Batching, pooling, and adaptive algorithms
Next Documents
- CRDT Integration and State Management - Detailed conflict-free state handling
- Fault Tolerance and Partition Handling - Comprehensive resilience strategies
- Performance Optimization and Scaling - Advanced performance engineering
- Implementation Roadmap - Concrete development plan
This communication layer enables Phoenix agents to interact efficiently and reliably across distributed environments while maintaining strong consistency guarantees where needed and optimal performance where possible.
Document Version: 1.0
Next Review: 2025-07-19
Implementation Priority: High
Dependencies: Phoenix Distributed Agent Architecture