← Back to DocsRebuild20250712

PHOENIX COMMUNICATION PROTOCOLS

Documentation for PHOENIX_COMMUNICATION_PROTOCOLS from the Foundation repository.

Phoenix: Communication Protocols and Message Patterns

Date: 2025-07-12
Version: 1.0
Series: Distributed Agent System - Part 2 (Communication)

Executive Summary

This document specifies the communication protocols and message patterns for the Phoenix distributed agent system. Drawing from modern distributed systems research, BEAM ecosystem capabilities, and battle-tested production patterns, Phoenix implements a multi-layered communication architecture that adapts to different network conditions, consistency requirements, and performance objectives.

Key Innovation: Phoenix uses adaptive protocol selection where the system automatically chooses the optimal communication pattern based on message characteristics, network topology, and consistency requirements.

Table of Contents

  1. Communication Architecture Overview
  2. Message Format Specification
  3. Transport Layer Protocols
  4. Communication Patterns
  5. Delivery Guarantees
  6. Flow Control and Backpressure
  7. Security and Authentication
  8. Performance Optimization

Communication Architecture Overview

Multi-Layer Protocol Stack

Phoenix Communication Stack
┌─────────────────────────────────────────────────────┐
│ Application Layer                                   │
│ ├── Agent-to-Agent Messages                        │
│ ├── Control Plane Commands                         │
│ └── Observability Events                           │
├─────────────────────────────────────────────────────┤
│ Routing Layer                                       │
│ ├── Message Router (topology-aware)                │
│ ├── Load Balancer (adaptive)                       │
│ └── Circuit Breaker (fault isolation)              │
├─────────────────────────────────────────────────────┤
│ Protocol Adaptation Layer                           │
│ ├── Protocol Selector (automatic)                  │
│ ├── Format Converter (serialization)               │
│ └── Compression Manager (efficiency)               │
├─────────────────────────────────────────────────────┤
│ Transport Layer                                     │
│ ├── Distributed Erlang (local cluster)             │
│ ├── Partisan (large scale)                         │
│ ├── HTTP/2 (cross-DC)                              │
│ └── QUIC (low latency)                             │
├─────────────────────────────────────────────────────┤
│ Network Layer                                       │
│ ├── TCP/IP (reliable)                              │
│ ├── UDP (unreliable, fast)                         │
│ └── TLS (secure)                                   │
└─────────────────────────────────────────────────────┘

Protocol Selection Matrix

ScenarioDistanceSizeConsistencyProtocolTransport
Local ClusterSame DCAnyAnyDistributed ErlangTCP
Large ClusterSame DCAnyAnyPartisanTCP
Cross-DCMulti-DCAnyAnyHTTP/2TLS/TCP
Mobile/EdgeAnywhereSmallEventualQUICUDP
Real-timeSame RegionSmallCausalCustom UDPUDP
Bulk TransferAnywhereLargeEventualHTTP/2 StreamingTCP

Communication Patterns

defmodule Phoenix.Communication.Patterns do
  @moduledoc """
  Fundamental communication patterns in Phoenix:
  
  1. Point-to-Point: Direct agent communication
  2. Publish-Subscribe: Event distribution
  3. Request-Response: Synchronous interaction
  4. Pipeline: Ordered processing chains
  5. Gossip: Eventually consistent broadcast
  6. Consensus: Strongly consistent coordination
  """
  
  @type pattern :: 
    :point_to_point | 
    :publish_subscribe | 
    :request_response | 
    :pipeline | 
    :gossip | 
    :consensus
    
  @type message_characteristics :: %{
    size: :small | :medium | :large,
    priority: :low | :normal | :high | :critical,
    consistency: :eventual | :causal | :strong,
    ordering: :none | :causal | :total,
    delivery: :at_most_once | :at_least_once | :exactly_once
  }
end

Message Format Specification

Universal Message Format

Phoenix extends CloudEvents v1.0.2 with distributed systems extensions:

defmodule Phoenix.Message do
  @moduledoc """
  Universal message format for Phoenix distributed agents.
  
  Based on CloudEvents v1.0.2 with Phoenix extensions for:
  - Distributed tracing and correlation
  - Causal consistency tracking
  - Delivery guarantee specifications
  - Routing and placement hints
  """
  
  use TypedStruct
  
  typedstruct do
    # CloudEvents v1.0.2 Core Fields
    field :specversion, String.t(), default: "1.0.2"
    field :id, String.t()
    field :source, String.t()
    field :type, String.t()
    field :subject, String.t()
    field :time, DateTime.t()
    field :datacontenttype, String.t(), default: "application/x-erlang-term"
    field :dataschema, String.t()
    field :data, term()
    
    # Phoenix Distributed Extensions
    field :phoenix_trace, Phoenix.Trace.Context.t()
    field :phoenix_causality, Phoenix.VectorClock.t()
    field :phoenix_delivery, Phoenix.Delivery.Guarantee.t()
    field :phoenix_routing, Phoenix.Routing.Metadata.t()
    field :phoenix_security, Phoenix.Security.Context.t()
    field :phoenix_qos, Phoenix.QoS.Requirements.t()
  end
  
  def new(data, opts \\ []) do
    %__MODULE__{
      id: generate_message_id(),
      source: Keyword.get(opts, :source, Phoenix.Node.local_id()),
      type: Keyword.get(opts, :type, "phoenix.agent.message"),
      time: DateTime.utc_now(),
      data: data,
      phoenix_trace: Phoenix.Trace.current_context(),
      phoenix_causality: Phoenix.VectorClock.tick(Phoenix.VectorClock.local()),
      phoenix_delivery: Keyword.get(opts, :delivery, default_delivery_guarantee()),
      phoenix_routing: build_routing_metadata(opts),
      phoenix_security: Phoenix.Security.current_context(),
      phoenix_qos: Keyword.get(opts, :qos, default_qos_requirements())
    }
  end
end

Message Type Taxonomy

defmodule Phoenix.Message.Types do
  @moduledoc """
  Standardized message type taxonomy for Phoenix agents.
  
  Type format: phoenix.{domain}.{entity}.{action}[.{qualifier}]
  """
  
  # Control Plane Messages
  @control_plane [
    "phoenix.cluster.node.join",
    "phoenix.cluster.node.leave", 
    "phoenix.cluster.topology.update",
    "phoenix.agent.placement.request",
    "phoenix.agent.placement.response",
    "phoenix.agent.migration.start",
    "phoenix.agent.migration.complete"
  ]
  
  # Data Plane Messages  
  @data_plane [
    "phoenix.agent.message.send",
    "phoenix.agent.state.update",
    "phoenix.agent.action.execute",
    "phoenix.agent.action.result",
    "phoenix.agent.event.publish",
    "phoenix.agent.heartbeat.ping"
  ]
  
  # Observability Messages
  @observability [
    "phoenix.telemetry.metric.update",
    "phoenix.telemetry.trace.span",
    "phoenix.telemetry.log.entry",
    "phoenix.health.status.report",
    "phoenix.performance.measurement"
  ]
  
  def message_characteristics(message_type) do
    cond do
      message_type in @control_plane -> %{
        priority: :high,
        consistency: :strong,
        ordering: :total,
        delivery: :exactly_once
      }
      
      message_type in @data_plane -> %{
        priority: :normal,
        consistency: :causal,
        ordering: :causal,
        delivery: :at_least_once
      }
      
      message_type in @observability -> %{
        priority: :low,
        consistency: :eventual,
        ordering: :none,
        delivery: :at_most_once
      }
      
      true -> default_characteristics()
    end
  end
end

Serialization and Compression

defmodule Phoenix.Message.Serialization do
  @moduledoc """
  Multi-format serialization with automatic format selection.
  
  Formats:
  - Erlang Term: Zero-copy within BEAM, type preservation
  - MessagePack: Compact binary, cross-language
  - Protocol Buffers: Schema evolution, efficient
  - JSON: Human readable, debugging
  """
  
  @type format :: :erlang_term | :msgpack | :protobuf | :json
  
  def serialize(message, format \\ :auto) do
    selected_format = select_format(message, format)
    
    case selected_format do
      :erlang_term -> 
        compressed_term = :erlang.term_to_binary(message, [:compressed])
        {:ok, compressed_term, :erlang_term}
        
      :msgpack -> 
        case Msgpax.pack(message) do
          {:ok, binary} -> {:ok, binary, :msgpack}
          {:error, reason} -> {:error, reason}
        end
        
      :protobuf -> 
        case PhoenixProto.Message.encode(message) do
          {:ok, binary} -> {:ok, binary, :protobuf}
          {:error, reason} -> {:error, reason}
        end
        
      :json -> 
        case Jason.encode(message) do
          {:ok, json} -> {:ok, json, :json}
          {:error, reason} -> {:error, reason}
        end
    end
  end
  
  defp select_format(message, :auto) do
    cond do
      # Local BEAM communication - use Erlang terms
      same_beam_cluster?(message.phoenix_routing.target) -> 
        :erlang_term
        
      # Cross-language communication - use MessagePack
      cross_language?(message.phoenix_routing.target) -> 
        :msgpack
        
      # Schema evolution important - use Protocol Buffers
      schema_evolution_required?(message.type) -> 
        :protobuf
        
      # Debugging or development - use JSON
      debug_mode?() -> 
        :json
        
      # Default to MessagePack for efficiency
      true -> 
        :msgpack
    end
  end
  
  defp select_format(_message, format) when format in [:erlang_term, :msgpack, :protobuf, :json] do
    format
  end
end

Transport Layer Protocols

1. Distributed Erlang Transport

defmodule Phoenix.Transport.DistributedErlang do
  @moduledoc """
  Distributed Erlang transport for local cluster communication.
  
  Advantages:
  - Zero-copy message passing
  - Automatic process monitoring
  - Built-in clustering
  - Type preservation
  
  Use Cases:
  - Single datacenter clusters
  - Development environments
  - Small to medium scale (< 50 nodes)
  """
  
  @behaviour Phoenix.Transport
  
  def send_message(target, message, opts \\ []) do
    case resolve_target(target) do
      {:ok, {:local, pid}} -> 
        send(pid, message)
        {:ok, :sent}
        
      {:ok, {:remote, node, pid}} -> 
        case Node.ping(node) do
          :pong -> 
            send({pid, node}, message)
            {:ok, :sent}
          :pang -> 
            {:error, :node_unreachable}
        end
        
      {:error, reason} -> 
        {:error, reason}
    end
  end
  
  def broadcast_message(targets, message, opts \\ []) do
    delivery_mode = Keyword.get(opts, :delivery_mode, :async)
    
    case delivery_mode do
      :async -> 
        # Fire and forget to all targets
        Enum.each(targets, fn target ->
          send_message(target, message, opts)
        end)
        {:ok, :broadcast}
        
      :sync -> 
        # Wait for acknowledgment from all
        sync_broadcast(targets, message, opts)
    end
  end
  
  defp sync_broadcast(targets, message, opts) do
    timeout = Keyword.get(opts, :timeout, 5000)
    
    # Send to all targets with correlation ID
    correlation_id = generate_correlation_id()
    ack_message = add_ack_request(message, correlation_id)
    
    Enum.each(targets, fn target ->
      send_message(target, ack_message, opts)
    end)
    
    # Wait for acknowledgments
    wait_for_acks(targets, correlation_id, timeout)
  end
end

2. Partisan Transport

defmodule Phoenix.Transport.Partisan do
  @moduledoc """
  Partisan transport for large-scale cluster communication.
  
  Advantages:
  - Scales beyond Distributed Erlang limits
  - Pluggable overlay networks
  - Advanced failure detection
  - Message batching and compression
  
  Use Cases:
  - Large clusters (100+ nodes)
  - Cross-datacenter communication
  - High-throughput scenarios
  """
  
  @behaviour Phoenix.Transport
  
  def send_message(target, message, opts \\ []) do
    # Use Partisan's overlay network
    partisan_message = wrap_for_partisan(message, opts)
    
    case Partisan.send_message(target, partisan_message) do
      :ok -> {:ok, :sent}
      {:error, reason} -> {:error, reason}
    end
  end
  
  def configure_overlay(overlay_type, opts \\ []) do
    case overlay_type do
      :hyparview -> 
        # HyParView: Probabilistic broadcast
        Partisan.configure_overlay(:hyparview, [
          fanout: Keyword.get(opts, :fanout, 6),
          active_view_size: Keyword.get(opts, :active_view_size, 6),
          passive_view_size: Keyword.get(opts, :passive_view_size, 30)
        ])
        
      :client_server -> 
        # Client-Server: Traditional hub-and-spoke
        Partisan.configure_overlay(:client_server, [
          servers: Keyword.get(opts, :servers, []),
          connection_pool_size: Keyword.get(opts, :pool_size, 10)
        ])
        
      :tree -> 
        # Tree: Hierarchical topology
        Partisan.configure_overlay(:tree, [
          fanout: Keyword.get(opts, :tree_fanout, 4),
          depth: Keyword.get(opts, :max_depth, 6)
        ])
    end
  end
  
  defp wrap_for_partisan(message, opts) do
    %Partisan.Message{
      payload: message,
      metadata: %{
        priority: get_priority(opts),
        compression: should_compress?(message, opts),
        encryption: should_encrypt?(message, opts)
      }
    }
  end
end

3. HTTP/2 Transport

defmodule Phoenix.Transport.HTTP2 do
  @moduledoc """
  HTTP/2 transport for cross-datacenter and hybrid cloud communication.
  
  Advantages:
  - Works across firewalls and NAT
  - Built-in TLS encryption
  - Stream multiplexing
  - Standardized protocol
  
  Use Cases:
  - Cross-datacenter communication
  - Hybrid cloud deployments
  - Internet-facing endpoints
  """
  
  @behaviour Phoenix.Transport
  
  def send_message(target, message, opts \\ []) do
    url = build_target_url(target)
    headers = build_headers(message, opts)
    body = Phoenix.Message.Serialization.serialize(message, :json)
    
    case Finch.request(
      Finch.build(:post, url, headers, body),
      PhoenixHTTP2Pool,
      receive_timeout: Keyword.get(opts, :timeout, 30_000)
    ) do
      {:ok, %Finch.Response{status: 200}} -> 
        {:ok, :sent}
      {:ok, %Finch.Response{status: status}} -> 
        {:error, {:http_error, status}}
      {:error, reason} -> 
        {:error, reason}
    end
  end
  
  def start_http2_server(port, opts \\ []) do
    # Phoenix HTTP/2 server for receiving messages
    Phoenix.HTTP2Server.start_link([
      port: port,
      transport_options: [
        max_connections: Keyword.get(opts, :max_connections, 1000),
        num_acceptors: Keyword.get(opts, :num_acceptors, 100)
      ],
      protocol_options: [
        max_frame_size: Keyword.get(opts, :max_frame_size, 16_384),
        max_concurrent_streams: Keyword.get(opts, :max_streams, 100)
      ]
    ])
  end
  
  defp build_headers(message, opts) do
    base_headers = [
      {"content-type", "application/json"},
      {"user-agent", "Phoenix-Agent/1.0"},
      {"x-phoenix-message-id", message.id},
      {"x-phoenix-message-type", message.type}
    ]
    
    # Add tracing headers
    trace_headers = Phoenix.Trace.to_headers(message.phoenix_trace)
    
    # Add authentication headers
    auth_headers = Phoenix.Auth.to_headers(message.phoenix_security)
    
    base_headers ++ trace_headers ++ auth_headers
  end
end

4. QUIC Transport

defmodule Phoenix.Transport.QUIC do
  @moduledoc """
  QUIC transport for low-latency and mobile communication.
  
  Advantages:
  - Reduced connection establishment time
  - Built-in multiplexing
  - Connection migration support
  - Optimized for mobile networks
  
  Use Cases:
  - Mobile agent clients
  - Edge computing scenarios
  - Low-latency requirements
  - Unreliable networks
  """
  
  @behaviour Phoenix.Transport
  
  def send_message(target, message, opts \\ []) do
    connection = get_or_create_connection(target, opts)
    
    # Serialize message for QUIC
    serialized = Phoenix.Message.Serialization.serialize(message, :msgpack)
    
    case QUICEx.send_datagram(connection, serialized) do
      :ok -> {:ok, :sent}
      {:error, reason} -> {:error, reason}
    end
  end
  
  def configure_quic_connection(target, opts \\ []) do
    connection_opts = [
      # Connection settings
      idle_timeout: Keyword.get(opts, :idle_timeout, 30_000),
      keep_alive: Keyword.get(opts, :keep_alive, 15_000),
      
      # Performance settings
      initial_rtt: Keyword.get(opts, :initial_rtt, 100),
      congestion_control: Keyword.get(opts, :congestion_control, :cubic),
      
      # Security settings
      verify_peer: Keyword.get(opts, :verify_peer, true),
      alpn_protocols: ["phoenix-agent/1.0"]
    ]
    
    QUICEx.connect(target, connection_opts)
  end
  
  defp get_or_create_connection(target, opts) do
    case Phoenix.ConnectionPool.get(target) do
      {:ok, connection} -> connection
      {:error, :not_found} -> 
        {:ok, connection} = configure_quic_connection(target, opts)
        Phoenix.ConnectionPool.put(target, connection)
        connection
    end
  end
end

Communication Patterns

1. Point-to-Point Communication

defmodule Phoenix.Patterns.PointToPoint do
  @moduledoc """
  Direct agent-to-agent communication pattern.
  
  Characteristics:
  - Low latency
  - Direct routing
  - Strong delivery guarantees
  - Optional encryption
  """
  
  def send_direct(source_agent, target_agent, message, opts \\ []) do
    # Build direct message with routing metadata
    direct_message = Phoenix.Message.new(message, [
      type: "phoenix.agent.direct.send",
      source: source_agent,
      subject: target_agent,
      delivery: Keyword.get(opts, :delivery, :at_least_once),
      routing: %Phoenix.Routing.Metadata{
        pattern: :point_to_point,
        target: target_agent,
        priority: Keyword.get(opts, :priority, :normal)
      }
    ])
    
    # Route directly to target
    Phoenix.MessageRouter.route_message(direct_message, opts)
  end
  
  def request_response(source_agent, target_agent, request, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, 5000)
    correlation_id = generate_correlation_id()
    
    # Send request with correlation ID
    request_message = Phoenix.Message.new(request, [
      type: "phoenix.agent.request.send",
      source: source_agent,
      subject: target_agent,
      correlation_id: correlation_id
    ])
    
    # Set up response handler
    response_handler = start_response_handler(correlation_id, timeout)
    
    # Send request
    case Phoenix.MessageRouter.route_message(request_message, opts) do
      {:ok, :sent} -> 
        wait_for_response(response_handler, timeout)
      {:error, reason} -> 
        cleanup_response_handler(response_handler)
        {:error, reason}
    end
  end
end

2. Publish-Subscribe Pattern

defmodule Phoenix.Patterns.PubSub do
  @moduledoc """
  Event-driven publish-subscribe communication pattern.
  
  Features:
  - Topic-based routing
  - Pattern matching subscriptions
  - Message fanout
  - Backpressure handling
  """
  
  def publish(topic, event, opts \\ []) do
    # Create publication message
    pub_message = Phoenix.Message.new(event, [
      type: "phoenix.pubsub.publish",
      subject: topic,
      routing: %Phoenix.Routing.Metadata{
        pattern: :publish_subscribe,
        topic: topic,
        fanout_strategy: Keyword.get(opts, :fanout, :immediate)
      }
    ])
    
    # Route to all subscribers
    Phoenix.PubSub.publish(topic, pub_message, opts)
  end
  
  def subscribe(agent_id, topic_pattern, opts \\ []) do
    subscription = %Phoenix.PubSub.Subscription{
      subscriber: agent_id,
      topic_pattern: topic_pattern,
      message_filter: Keyword.get(opts, :filter),
      delivery_guarantee: Keyword.get(opts, :delivery, :at_most_once),
      backpressure_strategy: Keyword.get(opts, :backpressure, :drop_oldest)
    }
    
    Phoenix.PubSub.subscribe(subscription)
  end
  
  def unsubscribe(agent_id, topic_pattern) do
    Phoenix.PubSub.unsubscribe(agent_id, topic_pattern)
  end
end

3. Pipeline Pattern

defmodule Phoenix.Patterns.Pipeline do
  @moduledoc """
  Ordered processing pipeline pattern.
  
  Features:
  - Sequential processing stages
  - Ordered message delivery
  - Stage-specific error handling
  - Dynamic pipeline reconfiguration
  """
  
  defstruct [
    :id,
    :stages,
    :error_strategy,
    :backpressure_strategy,
    :monitoring
  ]
  
  def create_pipeline(stages, opts \\ []) do
    pipeline = %__MODULE__{
      id: generate_pipeline_id(),
      stages: stages,
      error_strategy: Keyword.get(opts, :error_strategy, :retry),
      backpressure_strategy: Keyword.get(opts, :backpressure, :buffer),
      monitoring: Keyword.get(opts, :monitoring, true)
    }
    
    # Start pipeline coordinator
    {:ok, coordinator} = Phoenix.Pipeline.Coordinator.start_link(pipeline)
    
    # Register pipeline stages
    Enum.each(stages, fn stage ->
      Phoenix.Pipeline.register_stage(coordinator, stage)
    end)
    
    {:ok, pipeline}
  end
  
  def send_to_pipeline(pipeline_id, message, opts \\ []) do
    pipeline_message = Phoenix.Message.new(message, [
      type: "phoenix.pipeline.process",
      routing: %Phoenix.Routing.Metadata{
        pattern: :pipeline,
        pipeline_id: pipeline_id,
        ordering: :sequential
      }
    ])
    
    Phoenix.Pipeline.send_message(pipeline_id, pipeline_message, opts)
  end
end

4. Gossip Protocol

defmodule Phoenix.Patterns.Gossip do
  @moduledoc """
  Eventually consistent gossip communication pattern.
  
  Features:
  - Epidemic-style message propagation
  - Redundant transmission for reliability
  - Convergence guarantees
  - Network partition tolerance
  """
  
  defstruct [
    :protocol_version,
    :fanout,
    :round_duration,
    :message_ttl,
    :compression_enabled
  ]
  
  def start_gossip_protocol(opts \\ []) do
    config = %__MODULE__{
      protocol_version: Keyword.get(opts, :version, "1.0"),
      fanout: Keyword.get(opts, :fanout, 3),
      round_duration: Keyword.get(opts, :round_duration, 1000),
      message_ttl: Keyword.get(opts, :message_ttl, 30_000),
      compression_enabled: Keyword.get(opts, :compression, true)
    }
    
    Phoenix.Gossip.Protocol.start_link(config)
  end
  
  def gossip_message(message, opts \\ []) do
    gossip_envelope = %Phoenix.Gossip.Envelope{
      id: generate_gossip_id(),
      payload: message,
      ttl: calculate_ttl(opts),
      hop_count: 0,
      origin_node: Phoenix.Node.local_id(),
      timestamp: Phoenix.VectorClock.now()
    }
    
    Phoenix.Gossip.initiate_gossip(gossip_envelope, opts)
  end
  
  def handle_gossip_reception(envelope) do
    # Check if message already seen
    case Phoenix.Gossip.MessageCache.seen?(envelope.id) do
      true -> 
        # Already processed, ignore
        :ignored
        
      false -> 
        # New message, process and forward
        Phoenix.Gossip.MessageCache.mark_seen(envelope.id)
        process_gossip_message(envelope)
        forward_gossip_message(envelope)
        :processed
    end
  end
end

5. Consensus Protocol

defmodule Phoenix.Patterns.Consensus do
  @moduledoc """
  Strongly consistent consensus communication pattern.
  
  Features:
  - Raft consensus algorithm
  - Leader election
  - Log replication
  - Strong consistency guarantees
  """
  
  def propose_value(value, opts \\ []) do
    # Create consensus proposal
    proposal = %Phoenix.Consensus.Proposal{
      id: generate_proposal_id(),
      value: value,
      proposer: Phoenix.Node.local_id(),
      timestamp: DateTime.utc_now()
    }
    
    # Submit to Raft cluster
    case Phoenix.Raft.propose(proposal, opts) do
      {:ok, :committed} -> {:ok, proposal.id}
      {:ok, :queued} -> {:ok, :queued, proposal.id}
      {:error, :not_leader} -> {:error, :not_leader}
      {:error, reason} -> {:error, reason}
    end
  end
  
  def read_consensus_value(key, opts \\ []) do
    consistency = Keyword.get(opts, :consistency, :strong)
    
    case consistency do
      :strong -> 
        # Read from current leader
        Phoenix.Raft.read_from_leader(key, opts)
        
      :eventual -> 
        # Read from any replica
        Phoenix.Raft.read_from_replica(key, opts)
        
      :session -> 
        # Read your own writes
        Phoenix.Raft.read_with_session_consistency(key, opts)
    end
  end
end

Delivery Guarantees

Guarantee Specifications

defmodule Phoenix.Delivery.Guarantee do
  @moduledoc """
  Configurable delivery guarantee system for Phoenix messages.
  
  Guarantees:
  - at_most_once: Fire-and-forget (best performance)
  - at_least_once: Retry until success (reliability)
  - exactly_once: Idempotent delivery (correctness)
  - causal_order: Respects causal dependencies
  - total_order: Global ordering (expensive)
  """
  
  use TypedStruct
  
  typedstruct do
    field :delivery_mode, atom(), default: :at_least_once
    field :timeout, pos_integer(), default: 5000
    field :retry_strategy, atom(), default: :exponential_backoff
    field :max_retries, pos_integer(), default: 3
    field :ordering_requirement, atom(), default: :none
    field :acknowledgment_required, boolean(), default: true
    field :duplicate_detection, boolean(), default: false
  end
  
  def build(delivery_mode, opts \\ []) do
    %__MODULE__{
      delivery_mode: delivery_mode,
      timeout: Keyword.get(opts, :timeout, 5000),
      retry_strategy: Keyword.get(opts, :retry_strategy, :exponential_backoff),
      max_retries: Keyword.get(opts, :max_retries, 3),
      ordering_requirement: Keyword.get(opts, :ordering, :none),
      acknowledgment_required: Keyword.get(opts, :ack_required, true),
      duplicate_detection: Keyword.get(opts, :duplicate_detection, false)
    }
  end
end

At-Most-Once Delivery

defmodule Phoenix.Delivery.AtMostOnce do
  @moduledoc """
  Fire-and-forget delivery for high-performance scenarios.
  
  Characteristics:
  - Lowest latency
  - Highest throughput
  - No delivery guarantees
  - No message ordering
  """
  
  def deliver(target, message, opts \\ []) do
    # Send message without waiting for acknowledgment
    Phoenix.Transport.send_message(target, message, [
      delivery_mode: :fire_and_forget,
      timeout: Keyword.get(opts, :timeout, 1000)
    ])
    
    # Return immediately
    {:ok, :sent}
  end
end

At-Least-Once Delivery

defmodule Phoenix.Delivery.AtLeastOnce do
  @moduledoc """
  Reliable delivery with retry logic.
  
  Characteristics:
  - Guaranteed delivery (if target eventually available)
  - Possible duplicates
  - Configurable retry strategies
  - Exponential backoff with jitter
  """
  
  def deliver(target, message, guarantee) do
    message_id = message.id
    
    # Add delivery tracking
    tracked_message = add_delivery_tracking(message, guarantee)
    
    # Start retry loop
    retry_delivery(target, tracked_message, guarantee, 0)
  end
  
  defp retry_delivery(target, message, guarantee, attempt) do
    case Phoenix.Transport.send_message(target, message) do
      {:ok, :sent} -> 
        # Wait for acknowledgment
        case wait_for_ack(message.id, guarantee.timeout) do
          {:ok, :ack_received} -> 
            {:ok, :delivered}
          {:error, :timeout} -> 
            handle_retry(target, message, guarantee, attempt)
        end
        
      {:error, reason} -> 
        handle_retry(target, message, guarantee, attempt)
    end
  end
  
  defp handle_retry(target, message, guarantee, attempt) do
    if attempt >= guarantee.max_retries do
      {:error, :max_retries_exceeded}
    else
      backoff_time = calculate_backoff(guarantee.retry_strategy, attempt)
      :timer.sleep(backoff_time)
      retry_delivery(target, message, guarantee, attempt + 1)
    end
  end
end

Exactly-Once Delivery

defmodule Phoenix.Delivery.ExactlyOnce do
  @moduledoc """
  Idempotent delivery using distributed coordination.
  
  Characteristics:
  - No duplicates
  - Requires coordination overhead
  - Uses distributed locks or consensus
  - Higher latency
  """
  
  def deliver(target, message, guarantee) do
    delivery_id = generate_delivery_id(message)
    
    # Acquire distributed lock for delivery
    case Phoenix.DistributedLock.acquire(delivery_id, guarantee.timeout) do
      {:ok, lock} -> 
        try do
          perform_exactly_once_delivery(target, message, delivery_id, guarantee)
        after
          Phoenix.DistributedLock.release(lock)
        end
        
      {:error, :lock_timeout} -> 
        {:error, :coordination_timeout}
        
      {:error, :already_delivered} -> 
        {:ok, :already_delivered}
    end
  end
  
  defp perform_exactly_once_delivery(target, message, delivery_id, guarantee) do
    # Check if already delivered
    case Phoenix.DeliveryLog.check_delivery(delivery_id) do
      {:ok, :already_delivered} -> 
        {:ok, :already_delivered}
        
      {:ok, :not_delivered} -> 
        # Perform delivery
        case Phoenix.Transport.send_message(target, message) do
          {:ok, :sent} -> 
            # Record successful delivery
            Phoenix.DeliveryLog.record_delivery(delivery_id)
            {:ok, :delivered}
            
          {:error, reason} -> 
            {:error, reason}
        end
    end
  end
end

Flow Control and Backpressure

Adaptive Flow Control

defmodule Phoenix.FlowControl do
  @moduledoc """
  Adaptive flow control system preventing message queue overflow.
  
  Strategies:
  - Token bucket: Rate limiting
  - Sliding window: Adaptive throttling
  - Circuit breaker: Failure isolation
  - Backpressure propagation: System-wide coordination
  """
  
  defstruct [
    :strategy,
    :max_queue_size,
    :high_watermark,
    :low_watermark,
    :backpressure_strategy
  ]
  
  def apply_flow_control(target, message, flow_control) do
    current_queue_size = get_queue_size(target)
    
    cond do
      current_queue_size >= flow_control.max_queue_size -> 
        handle_queue_overflow(target, message, flow_control)
        
      current_queue_size >= flow_control.high_watermark -> 
        apply_backpressure(target, message, flow_control)
        
      current_queue_size <= flow_control.low_watermark -> 
        release_backpressure(target)
        send_message(target, message)
        
      true -> 
        send_message(target, message)
    end
  end
  
  defp handle_queue_overflow(target, message, flow_control) do
    case flow_control.backpressure_strategy do
      :drop_oldest -> 
        drop_oldest_message(target)
        send_message(target, message)
        
      :drop_newest -> 
        {:error, :queue_full}
        
      :reject -> 
        {:error, :backpressure_active}
        
      :spill_to_disk -> 
        spill_message_to_disk(target, message)
        
      :redirect -> 
        redirect_to_alternative_target(message, flow_control)
    end
  end
end

Token Bucket Rate Limiting

defmodule Phoenix.RateLimit.TokenBucket do
  @moduledoc """
  Token bucket rate limiting for message sending.
  
  Features:
  - Configurable bucket size
  - Configurable refill rate
  - Burst allowance
  - Distributed token sharing
  """
  
  defstruct [
    :bucket_id,
    :capacity,
    :current_tokens,
    :refill_rate,
    :last_refill,
    :distributed
  ]
  
  def create_bucket(bucket_id, capacity, refill_rate, opts \\ []) do
    bucket = %__MODULE__{
      bucket_id: bucket_id,
      capacity: capacity,
      current_tokens: capacity,
      refill_rate: refill_rate,
      last_refill: System.monotonic_time(:millisecond),
      distributed: Keyword.get(opts, :distributed, false)
    }
    
    # Register bucket
    Phoenix.RateLimit.Registry.register_bucket(bucket)
    
    {:ok, bucket}
  end
  
  def acquire_tokens(bucket_id, tokens_needed) do
    case Phoenix.RateLimit.Registry.get_bucket(bucket_id) do
      {:ok, bucket} -> 
        # Refill tokens based on elapsed time
        updated_bucket = refill_tokens(bucket)
        
        if updated_bucket.current_tokens >= tokens_needed do
          # Grant tokens
          new_bucket = %{updated_bucket | 
            current_tokens: updated_bucket.current_tokens - tokens_needed
          }
          Phoenix.RateLimit.Registry.update_bucket(bucket_id, new_bucket)
          {:ok, :tokens_acquired}
        else
          # Insufficient tokens
          {:error, :insufficient_tokens, updated_bucket.current_tokens}
        end
        
      {:error, :not_found} -> 
        {:error, :bucket_not_found}
    end
  end
  
  defp refill_tokens(bucket) do
    now = System.monotonic_time(:millisecond)
    elapsed = now - bucket.last_refill
    
    # Calculate tokens to add
    tokens_to_add = (elapsed * bucket.refill_rate) / 1000
    new_token_count = min(
      bucket.capacity, 
      bucket.current_tokens + tokens_to_add
    )
    
    %{bucket | 
      current_tokens: new_token_count,
      last_refill: now
    }
  end
end

Security and Authentication

Message-Level Security

defmodule Phoenix.Security do
  @moduledoc """
  Message-level security for Phoenix communication.
  
  Features:
  - Message authentication
  - Payload encryption
  - Digital signatures
  - Key rotation
  """
  
  def secure_message(message, security_context) do
    # Add authentication
    authenticated_message = add_authentication(message, security_context)
    
    # Encrypt payload if required
    encrypted_message = maybe_encrypt_payload(authenticated_message, security_context)
    
    # Add digital signature
    signed_message = add_digital_signature(encrypted_message, security_context)
    
    {:ok, signed_message}
  end
  
  def verify_message(message, security_context) do
    with {:ok, _} <- verify_digital_signature(message, security_context),
         {:ok, decrypted_message} <- maybe_decrypt_payload(message, security_context),
         {:ok, _} <- verify_authentication(decrypted_message, security_context) do
      {:ok, decrypted_message}
    else
      {:error, reason} -> {:error, reason}
    end
  end
  
  defp add_authentication(message, context) do
    auth_header = %{
      agent_id: context.agent_id,
      timestamp: DateTime.utc_now(),
      nonce: generate_nonce(),
      signature: calculate_hmac(message, context.auth_key)
    }
    
    put_in(message.phoenix_security.authentication, auth_header)
  end
  
  defp add_digital_signature(message, context) do
    message_hash = calculate_message_hash(message)
    signature = :crypto.sign(:ecdsa, :sha256, message_hash, context.private_key)
    
    put_in(message.phoenix_security.signature, signature)
  end
end

Transport-Level Security

defmodule Phoenix.Security.Transport do
  @moduledoc """
  Transport-level security configuration.
  
  Features:
  - TLS configuration
  - Certificate management
  - Mutual authentication
  - Perfect forward secrecy
  """
  
  def configure_tls(transport, opts \\ []) do
    tls_config = [
      # Certificate configuration
      certfile: Keyword.get(opts, :certfile, default_cert_path()),
      keyfile: Keyword.get(opts, :keyfile, default_key_path()),
      cacertfile: Keyword.get(opts, :cacertfile, default_ca_path()),
      
      # Security settings
      verify: Keyword.get(opts, :verify, :verify_peer),
      versions: Keyword.get(opts, :versions, [:"tlsv1.3", :"tlsv1.2"]),
      ciphers: Keyword.get(opts, :ciphers, secure_ciphers()),
      
      # Perfect forward secrecy
      honor_cipher_order: true,
      secure_renegotiate: true,
      
      # ALPN for protocol negotiation
      alpn_preferred_protocols: ["phoenix-agent/1.0"]
    ]
    
    Phoenix.Transport.configure_security(transport, tls_config)
  end
  
  defp secure_ciphers do
    [
      # TLS 1.3 cipher suites
      "TLS_AES_256_GCM_SHA384",
      "TLS_CHACHA20_POLY1305_SHA256",
      "TLS_AES_128_GCM_SHA256",
      
      # TLS 1.2 cipher suites (fallback)
      "ECDHE-ECDSA-AES256-GCM-SHA384",
      "ECDHE-RSA-AES256-GCM-SHA384",
      "ECDHE-ECDSA-CHACHA20-POLY1305",
      "ECDHE-RSA-CHACHA20-POLY1305"
    ]
  end
end

Performance Optimization

Message Batching

defmodule Phoenix.Optimization.Batching do
  @moduledoc """
  Message batching for improved throughput.
  
  Strategies:
  - Time-based batching
  - Size-based batching
  - Priority-aware batching
  - Adaptive batching
  """
  
  defstruct [
    :batch_id,
    :messages,
    :max_batch_size,
    :max_wait_time,
    :created_at,
    :priority_distribution
  ]
  
  def start_batching(target, opts \\ []) do
    batch_config = %__MODULE__{
      batch_id: generate_batch_id(),
      messages: [],
      max_batch_size: Keyword.get(opts, :max_size, 100),
      max_wait_time: Keyword.get(opts, :max_wait, 10),
      created_at: System.monotonic_time(:millisecond),
      priority_distribution: %{}
    }
    
    Phoenix.Batcher.start_batch(target, batch_config)
  end
  
  def add_to_batch(target, message, opts \\ []) do
    case Phoenix.Batcher.add_message(target, message) do
      {:ok, :added} -> 
        {:ok, :batched}
        
      {:ok, :batch_full} -> 
        Phoenix.Batcher.flush_batch(target)
        Phoenix.Batcher.add_message(target, message)
        
      {:error, reason} -> 
        {:error, reason}
    end
  end
  
  def flush_batch(target, reason \\ :manual) do
    case Phoenix.Batcher.get_batch(target) do
      {:ok, batch} when length(batch.messages) > 0 -> 
        # Create batch message
        batch_message = create_batch_message(batch, reason)
        
        # Send batch
        Phoenix.Transport.send_message(target, batch_message)
        
        # Clear batch
        Phoenix.Batcher.clear_batch(target)
        
      {:ok, _empty_batch} -> 
        :ok
        
      {:error, reason} -> 
        {:error, reason}
    end
  end
end

Connection Pooling

defmodule Phoenix.ConnectionPool do
  @moduledoc """
  Connection pooling for transport efficiency.
  
  Features:
  - Per-target connection pools
  - Connection health monitoring
  - Automatic connection recycling
  - Load balancing across connections
  """
  
  def get_connection(target, opts \\ []) do
    pool_name = build_pool_name(target)
    
    case :poolboy.checkout(pool_name, true, 5000) do
      :full -> 
        {:error, :pool_exhausted}
      connection -> 
        case verify_connection_health(connection) do
          :ok -> {:ok, connection}
          {:error, reason} -> 
            :poolboy.checkin(pool_name, connection)
            create_new_connection(target, opts)
        end
    end
  end
  
  def return_connection(target, connection) do
    pool_name = build_pool_name(target)
    :poolboy.checkin(pool_name, connection)
  end
  
  def start_pool(target, opts \\ []) do
    pool_config = [
      name: {:local, build_pool_name(target)},
      worker_module: Phoenix.Connection.Worker,
      size: Keyword.get(opts, :pool_size, 10),
      max_overflow: Keyword.get(opts, :max_overflow, 5)
    ]
    
    worker_args = [
      target: target,
      connect_opts: Keyword.get(opts, :connect_opts, [])
    ]
    
    :poolboy.start_link(pool_config, worker_args)
  end
end

Summary

Phoenix’s communication infrastructure provides a comprehensive foundation for distributed agent systems:

Key Features Delivered

  1. Multi-Protocol Transport: Automatic protocol selection for optimal performance
  2. Flexible Delivery Guarantees: From fire-and-forget to exactly-once delivery
  3. Advanced Communication Patterns: Point-to-point, pub-sub, pipelines, gossip, consensus
  4. Flow Control: Adaptive backpressure and rate limiting
  5. Security: Message-level and transport-level protection
  6. Performance Optimization: Batching, pooling, and adaptive algorithms

Next Documents

  1. CRDT Integration and State Management - Detailed conflict-free state handling
  2. Fault Tolerance and Partition Handling - Comprehensive resilience strategies
  3. Performance Optimization and Scaling - Advanced performance engineering
  4. Implementation Roadmap - Concrete development plan

This communication layer enables Phoenix agents to interact efficiently and reliably across distributed environments while maintaining strong consistency guarantees where needed and optimal performance where possible.


Document Version: 1.0
Next Review: 2025-07-19
Implementation Priority: High
Dependencies: Phoenix Distributed Agent Architecture