Foundation.Distributed - Partisan-Powered Distribution Layer
lib/foundation/distributed.ex
defmodule Foundation.Distributed do @moduledoc """ Foundation 2.0 Distributed Layer powered by Partisan
Provides revolutionary clustering capabilities:
- Multi-channel communication
- Dynamic topology switching
- Intelligent service discovery
- Partition-tolerant coordination """
def available?() do Application.get_env(:foundation, :distributed, false) and Code.ensure_loaded?(:partisan) end
defdelegate switch_topology(topology), to: Foundation.Distributed.Topology defdelegate send_message(channel, destination, message), to: Foundation.Distributed.Channels defdelegate discover_services(criteria), to: Foundation.Distributed.Discovery defdelegate coordinate_consensus(proposal), to: Foundation.Distributed.Consensus end
Foundation.Distributed.Topology - Dynamic Network Management
lib/foundation/distributed/topology.ex
defmodule Foundation.Distributed.Topology do @moduledoc """ Dynamic topology management with Partisan overlays.
Automatically selects and switches between optimal topologies based on cluster size, workload patterns, and network conditions. """
use GenServer require Logger
defstruct [ :current_topology, :overlay_config, :performance_metrics, :topology_history, :switch_cooldown ]
Public API
@doc """ Switches the cluster topology at runtime.
Supported Topologies:
:full_mesh
- Every node connected to every other (best for <10 nodes):hyparview
- Peer sampling with partial views (best for 10-1000 nodes):client_server
- Hierarchical with designated servers (best for >1000 nodes):pub_sub
- Event-driven with message brokers (best for event systems) """ def switch_topology(new_topology) do GenServer.call(MODULE, {:switch_topology, new_topology}) end
@doc """ Optimizes topology for specific workload patterns. """ def optimize_for_workload(workload_type) do GenServer.call(MODULE, {:optimize_for_workload, workload_type}) end
@doc """ Gets current topology information. """ def current_topology() do GenServer.call(MODULE, :get_current_topology) end
@doc """ Monitors topology performance and suggests optimizations. """ def analyze_performance() do GenServer.call(MODULE, :analyze_performance) end
GenServer Implementation
def start_link(opts \ []) do GenServer.start_link(MODULE, opts, name: MODULE) end
@impl true def init(opts) do initial_topology = Keyword.get(opts, :initial_topology, :full_mesh)
state = %__MODULE__{
current_topology: initial_topology,
overlay_config: initialize_overlay_config(),
performance_metrics: %{},
topology_history: [],
switch_cooldown: 30_000 # 30 seconds minimum between switches
}
# Initialize Partisan with initial topology
initialize_partisan_topology(initial_topology)
# Start performance monitoring
schedule_performance_monitoring()
{:ok, state}
end
@impl true def handle_call({:switch_topology, new_topology}, _from, state) do case can_switch_topology?(state) do true -> case perform_topology_switch(new_topology, state) do {:ok, new_state} -> Logger.info(“Successfully switched topology from #{state.current_topology} to #{new_topology}”) {:reply, :ok, new_state} {:error, reason} -> Logger.error(“Failed to switch topology: #{reason}”) {:reply, {:error, reason}, state} end false -> {:reply, {:error, :switch_cooldown_active}, state} end end
@impl true def handle_call({:optimize_for_workload, workload_type}, _from, state) do optimal_topology = determine_optimal_topology(workload_type, state)
if optimal_topology != state.current_topology do
case perform_topology_switch(optimal_topology, state) do
{:ok, new_state} ->
Logger.info("Optimized topology for #{workload_type}: #{optimal_topology}")
{:reply, {:ok, optimal_topology}, new_state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
else
{:reply, {:already_optimal, optimal_topology}, state}
end
end
@impl true def handle_call(:get_current_topology, _from, state) do topology_info = %{ current: state.current_topology, node_count: get_cluster_size(), performance: get_current_performance_metrics(), last_switch: get_last_switch_time(state) }
{:reply, topology_info, state}
end
@impl true def handle_call(:analyze_performance, _from, state) do analysis = analyze_topology_performance(state) {:reply, analysis, state} end
@impl true def handle_info(:monitor_performance, state) do new_metrics = collect_performance_metrics() new_state = %{state | performance_metrics: new_metrics}
# Check if topology optimization is needed
case should_auto_optimize?(new_state) do
{:yes, recommended_topology} ->
Logger.info("Auto-optimization recommends switching to #{recommended_topology}")
case perform_topology_switch(recommended_topology, new_state) do
{:ok, optimized_state} ->
schedule_performance_monitoring()
{:noreply, optimized_state}
{:error, reason} ->
Logger.warning("Auto-optimization failed: #{reason}")
schedule_performance_monitoring()
{:noreply, new_state}
end
:no ->
schedule_performance_monitoring()
{:noreply, new_state}
end
end
Topology Management Implementation
defp initialize_partisan_topology(topology) do overlay_config = get_overlay_config_for_topology(topology)
case Application.get_env(:partisan, :overlay, nil) do
nil ->
Application.put_env(:partisan, :overlay, overlay_config)
Logger.info("Initialized Partisan with #{topology} topology")
_ ->
Logger.info("Partisan already configured, updating overlay")
end
end
defp get_overlay_config_for_topology(:full_mesh) do [{:overlay, :full_mesh, %{channels: [:coordination, :data]}}] end
defp get_overlay_config_for_topology(:hyparview) do [{:overlay, :hyparview, %{ channels: [:coordination, :data, :gossip], active_view_size: 6, passive_view_size: 30, arwl: 6 }}] end
defp get_overlay_config_for_topology(:client_server) do [{:overlay, :client_server, %{ channels: [:coordination, :data], server_selection_strategy: :round_robin }}] end
defp get_overlay_config_for_topology(:pub_sub) do [{:overlay, :plumtree, %{ channels: [:events, :coordination], fanout: 5, lazy_push_probability: 0.25 }}] end
defp perform_topology_switch(new_topology, state) do old_topology = state.current_topology
try do
# Prepare new overlay configuration
new_overlay_config = get_overlay_config_for_topology(new_topology)
# Perform graceful transition
case graceful_topology_transition(old_topology, new_topology, new_overlay_config) do
:ok ->
new_state = %{state |
current_topology: new_topology,
overlay_config: new_overlay_config,
topology_history: [
%{
from: old_topology,
to: new_topology,
timestamp: :os.system_time(:millisecond),
reason: :manual_switch
} | state.topology_history
]
}
{:ok, new_state}
{:error, reason} ->
{:error, reason}
end
rescue
error ->
Logger.error("Topology switch error: #{inspect(error)}")
{:error, {:switch_failed, error}}
end
end
defp graceful_topology_transition(old_topology, new_topology, new_config) do Logger.info(“Starting graceful transition from #{old_topology} to #{new_topology}”)
# Step 1: Announce topology change to cluster
announce_topology_change(new_topology)
# Step 2: Update configuration
Application.put_env(:partisan, :overlay, new_config)
# Step 3: Verify topology is working
case verify_topology_health() do
:ok ->
Logger.info("Topology transition completed successfully")
:ok
{:error, reason} ->
Logger.error("Topology verification failed: #{reason}")
{:error, {:verification_failed, reason}}
end
end
Performance Monitoring and Analysis
defp collect_performance_metrics() do %{ node_count: get_cluster_size(), message_latency: measure_message_latency(), connection_count: count_active_connections(), bandwidth_utilization: measure_bandwidth_utilization(), partition_events: count_recent_partitions(), timestamp: :os.system_time(:millisecond) } end
defp determine_optimal_topology(workload_type, state) do cluster_size = get_cluster_size()
case {workload_type, cluster_size} do
{:low_latency, size} when size <= 10 ->
:full_mesh
{:high_throughput, size} when size <= 100 ->
:hyparview
{:massive_scale, size} when size > 100 ->
:client_server
{:event_driven, _size} ->
:pub_sub
{_workload, size} when size <= 5 ->
:full_mesh
{_workload, size} when size <= 50 ->
:hyparview
{_workload, _size} ->
:client_server
end
end
defp should_auto_optimize?(state) do current_metrics = state.performance_metrics current_topology = state.current_topology
# Analyze performance indicators
issues = []
issues = if Map.get(current_metrics, :message_latency, 0) > 100 do
[:high_latency | issues]
else
issues
end
issues = if Map.get(current_metrics, :partition_events, 0) > 5 do
[:frequent_partitions | issues]
else
issues
end
issues = if Map.get(current_metrics, :bandwidth_utilization, 0) > 0.8 do
[:high_bandwidth | issues]
else
issues
end
# Determine if optimization is needed
case {issues, current_topology} do
{[], _topology} ->
:no
{[:high_latency], :hyparview} when get_cluster_size() <= 10 ->
{:yes, :full_mesh}
{[:frequent_partitions], :full_mesh} when get_cluster_size() > 20 ->
{:yes, :hyparview}
{[:high_bandwidth], topology} when topology != :client_server and get_cluster_size() > 50 ->
{:yes, :client_server}
_other ->
:no
end
end
defp analyze_topology_performance(state) do current_metrics = state.performance_metrics history = state.topology_history
%{
current_performance: current_metrics,
topology_efficiency: calculate_topology_efficiency(current_metrics, state.current_topology),
recommendations: generate_topology_recommendations(current_metrics, state.current_topology),
historical_performance: analyze_historical_performance(history),
cluster_health: assess_cluster_health()
}
end
Utility Functions
defp initialize_overlay_config() do get_overlay_config_for_topology(:full_mesh) end
defp can_switch_topology?(state) do last_switch_time = get_last_switch_time(state) current_time = :os.system_time(:millisecond)
case last_switch_time do
nil -> true
last_time -> (current_time - last_time) > state.switch_cooldown
end
end
defp get_last_switch_time(state) do case state.topology_history do [] -> nil [latest | _] -> latest.timestamp end end
defp schedule_performance_monitoring() do Process.send_after(self(), :monitor_performance, 30_000) # Every 30 seconds end
defp get_cluster_size() do length([Node.self() | Node.list()]) end
defp announce_topology_change(new_topology) do message = {:topology_change_announcement, new_topology, Node.self()} # Would send via Partisan when available Logger.info(“Announcing topology change to #{new_topology}”) end
defp verify_topology_health() do # Simple health check - in real implementation would verify Partisan connectivity :timer.sleep(1000) :ok end
Performance measurement stubs (would be implemented with actual metrics)
defp measure_message_latency(), do: :rand.uniform(50) + 10 defp count_active_connections(), do: get_cluster_size() - 1 defp measure_bandwidth_utilization(), do: :rand.uniform(100) / 100 defp count_recent_partitions(), do: 0 defp get_current_performance_metrics(), do: %{} defp calculate_topology_efficiency(_metrics, _topology), do: 0.85 defp generate_topology_recommendations(_metrics, _topology), do: [] defp analyze_historical_performance(_history), do: %{} defp assess_cluster_health(), do: :healthy end
Foundation.Distributed.Channels - Multi-Channel Communication
lib/foundation/distributed/channels.ex
defmodule Foundation.Distributed.Channels do @moduledoc """ Multi-channel communication system that eliminates head-of-line blocking.
Provides separate channels for different types of traffic:
- :coordination - High-priority cluster coordination messages
- :data - Application data transfer
- :gossip - Background gossip and maintenance
- :events - Event streaming and notifications """
use GenServer require Logger
defstruct [ :channel_registry, :routing_table, :performance_metrics, :channel_configs, :load_balancer ]
Public API
@doc """ Sends a message on a specific channel. """ def send_message(channel, destination, message, opts \ []) do GenServer.call(MODULE, {:send_message, channel, destination, message, opts}) end
@doc """ Broadcasts a message to all nodes on a specific channel. """ def broadcast(channel, message, opts \ []) do GenServer.call(MODULE, {:broadcast, channel, message, opts}) end
@doc """ Sets up message routing rules for intelligent channel selection. """ def configure_routing(rules) do GenServer.call(MODULE, {:configure_routing, rules}) end
@doc """ Monitors channel performance and utilization. """ def get_channel_metrics() do GenServer.call(MODULE, :get_channel_metrics) end
GenServer Implementation
def start_link(opts \ []) do GenServer.start_link(MODULE, opts, name: MODULE) end
@impl true def init(opts) do state = %MODULE{ channel_registry: initialize_channel_registry(), routing_table: initialize_routing_table(), performance_metrics: %{}, channel_configs: initialize_channel_configs(), load_balancer: initialize_load_balancer() }
# Set up Partisan channels (when available)
setup_channels(state.channel_configs)
# Start performance monitoring
schedule_metrics_collection()
{:ok, state}
end
@impl true def handle_call({:send_message, channel, destination, message, opts}, _from, state) do case route_message(channel, destination, message, opts, state) do {:ok, routing_info} -> result = execute_message_send(routing_info) update_metrics(channel, routing_info, result, state) {:reply, result, state} {:error, reason} -> {:reply, {:error, reason}, state} end end
@impl true def handle_call({:broadcast, channel, message, opts}, _from, state) do case route_broadcast(channel, message, opts, state) do {:ok, routing_info} -> result = execute_broadcast(routing_info) update_metrics(channel, routing_info, result, state) {:reply, result, state} {:error, reason} -> {:reply, {:error, reason}, state} end end
@impl true def handle_call({:configure_routing, rules}, _from, state) do new_routing_table = apply_routing_rules(state.routing_table, rules) new_state = %{state | routing_table: new_routing_table} {:reply, :ok, new_state} end
@impl true def handle_call(:get_channel_metrics, _from, state) do metrics = compile_channel_metrics(state) {:reply, metrics, state} end
@impl true def handle_info(:collect_metrics, state) do new_metrics = collect_channel_performance_metrics() new_state = %{state | performance_metrics: new_metrics}
# Check for channel optimization opportunities
optimize_channels_if_needed(new_state)
schedule_metrics_collection()
{:noreply, new_state}
end
Channel Setup and Configuration
defp initialize_channel_configs() do %{ coordination: %{ priority: :high, reliability: :guaranteed, max_connections: 3, buffer_size: 1000, compression: false },
data: %{
priority: :medium,
reliability: :best_effort,
max_connections: 5,
buffer_size: 10000,
compression: true
},
gossip: %{
priority: :low,
reliability: :best_effort,
max_connections: 1,
buffer_size: 5000,
compression: true
},
events: %{
priority: :medium,
reliability: :at_least_once,
max_connections: 2,
buffer_size: 5000,
compression: false
}
}
end
defp setup_channels(channel_configs) do Enum.each(channel_configs, fn {channel_name, config} -> Logger.info(“Setting up channel #{channel_name} with config: #{inspect(config)}”) # Would configure Partisan channels when available end) end
Message Routing Implementation
defp route_message(channel, destination, message, opts, state) do # Determine optimal routing strategy routing_strategy = determine_routing_strategy(channel, destination, message, opts, state)
case routing_strategy do
{:direct, target_node} ->
{:ok, %{
strategy: :direct,
channel: channel,
destination: target_node,
message: message,
opts: opts
}}
{:load_balanced, candidate_nodes} ->
optimal_node = select_optimal_node(candidate_nodes, state.load_balancer)
{:ok, %{
strategy: :load_balanced,
channel: channel,
destination: optimal_node,
message: message,
opts: opts
}}
{:error, reason} ->
{:error, reason}
end
end
defp determine_routing_strategy(channel, destination, message, opts, state) do case destination do node when is_atom(node) -> # Direct node targeting if Node.alive?(node) do {:direct, node} else {:error, {:node_not_available, node}} end
:all ->
# Broadcast to all nodes
{:broadcast_all, Node.list([:visible])}
{:service, service_name} ->
# Route to service instances
case find_service_instances(service_name) do
[] -> {:error, {:no_service_instances, service_name}}
instances -> {:load_balanced, instances}
end
{:capability, capability} ->
# Route to nodes with specific capability
case find_nodes_with_capability(capability) do
[] -> {:error, {:no_capable_nodes, capability}}
nodes -> {:load_balanced, nodes}
end
end
end
defp execute_message_send(routing_info) do case routing_info.strategy do :direct -> # Would use Partisan when available, fallback to standard send send({:foundation_distributed, routing_info.destination}, routing_info.message) :ok
:load_balanced ->
send({:foundation_distributed, routing_info.destination}, routing_info.message)
:ok
end
end
defp route_broadcast(channel, message, opts, state) do broadcast_strategy = determine_broadcast_strategy(channel, message, opts)
case broadcast_strategy do
:simple_broadcast ->
{:ok, %{
strategy: :simple_broadcast,
channel: channel,
message: message,
targets: Node.list([:visible])
}}
:staged_broadcast ->
{:ok, %{
strategy: :staged_broadcast,
channel: channel,
message: message,
stages: calculate_broadcast_stages(Node.list([:visible]))
}}
:gossip_broadcast ->
{:ok, %{
strategy: :gossip_broadcast,
channel: channel,
message: message,
fanout: Keyword.get(opts, :fanout, 3)
}}
end
end
defp determine_broadcast_strategy(channel, message, opts) do message_size = estimate_message_size(message) urgency = Keyword.get(opts, :urgency, :normal)
cond do
urgency == :high and message_size < 1024 ->
:simple_broadcast
message_size > 10_240 ->
:staged_broadcast
channel == :gossip ->
:gossip_broadcast
true ->
:simple_broadcast
end
end
defp execute_broadcast(routing_info) do case routing_info.strategy do :simple_broadcast -> Enum.each(routing_info.targets, fn node -> send({:foundation_distributed, node}, routing_info.message) end) :ok
:staged_broadcast ->
execute_staged_broadcast(routing_info)
:gossip_broadcast ->
execute_gossip_broadcast(routing_info)
end
end
Advanced Broadcasting Strategies
defp execute_staged_broadcast(routing_info) do stages = routing_info.stages
# Send to first stage immediately
first_stage = hd(stages)
Enum.each(first_stage, fn node ->
send({:foundation_distributed, node}, routing_info.message)
end)
# Schedule remaining stages
remaining_stages = tl(stages)
Enum.with_index(remaining_stages, 1)
|> Enum.each(fn {stage, index} ->
delay = index * 100 # 100ms between stages
spawn(fn ->
:timer.sleep(delay)
Enum.each(stage, fn node ->
send({:foundation_distributed, node}, routing_info.message)
end)
end)
end)
:ok
end
defp execute_gossip_broadcast(routing_info) do fanout = routing_info.fanout available_nodes = Node.list([:visible])
# Select random subset for gossip
gossip_targets = Enum.take_random(available_nodes, fanout)
gossip_message = %{
type: :gossip_broadcast,
original_message: routing_info.message,
ttl: 3, # Time to live
source: Node.self()
}
Enum.each(gossip_targets, fn node ->
send({:foundation_distributed, node}, gossip_message)
end)
:ok
end
Performance Monitoring and Optimization
defp collect_channel_performance_metrics() do channels = [:coordination, :data, :gossip, :events]
Enum.into(channels, %{}, fn channel ->
metrics = %{
message_count: get_channel_message_count(channel),
bytes_transferred: get_channel_bytes_transferred(channel),
average_latency: get_channel_average_latency(channel),
error_rate: get_channel_error_rate(channel),
utilization: get_channel_utilization(channel)
}
{channel, metrics}
end)
end
defp optimize_channels_if_needed(state) do # Analyze performance metrics and optimize channels Enum.each(state.performance_metrics, fn {channel, metrics} -> cond do metrics[:utilization] > 0.9 -> Logger.info(“Channel #{channel} highly utilized, consider scaling”) scale_channel_if_possible(channel)
metrics[:error_rate] > 0.1 ->
Logger.warning("Channel #{channel} has high error rate: #{metrics[:error_rate]}")
diagnose_channel_issues(channel)
true ->
:ok
end
end)
end
Utility Functions
defp initialize_channel_registry() do :ets.new(:channel_registry, [:set, :protected]) end
defp initialize_routing_table() do %{ default_routes: %{}, service_routes: %{}, capability_routes: %{} } end
defp initialize_load_balancer() do %{ strategy: :round_robin, node_weights: %{}, health_status: %{} } end
defp schedule_metrics_collection() do Process.send_after(self(), :collect_metrics, 10_000) # Every 10 seconds end
defp find_service_instances(service_name) do # Would integrate with service registry case Foundation.ServiceRegistry.lookup(:default, service_name) do {:ok, _pid} -> [Node.self()] _ -> [] end end
defp find_nodes_with_capability(capability) do # Would integrate with capability registry [Node.self()] end
defp select_optimal_node(candidate_nodes, load_balancer) do case load_balancer.strategy do :round_robin -> # Simple round robin selection Enum.random(candidate_nodes)
:weighted ->
# Weighted selection based on node capacity
select_weighted_node(candidate_nodes, load_balancer.node_weights)
:health_based ->
# Select healthiest node
select_healthiest_node(candidate_nodes, load_balancer.health_status)
end
end
defp calculate_broadcast_stages(nodes) do # Split nodes into stages for staged broadcasting stage_size = max(1, div(length(nodes), 3)) Enum.chunk_every(nodes, stage_size) end
defp estimate_message_size(message) do # Rough estimation without full serialization try do :erlang.external_size(message) rescue _ -> 1000 # Default estimate end end
Performance measurement stubs (would be implemented with actual metrics)
defp get_channel_message_count(_channel), do: :rand.uniform(1000) defp get_channel_bytes_transferred(_channel), do: :rand.uniform(1_000_000) defp get_channel_average_latency(_channel), do: :rand.uniform(50) + 5 defp get_channel_error_rate(_channel), do: :rand.uniform(10) / 100 defp get_channel_utilization(_channel), do: :rand.uniform(100) / 100
defp scale_channel_if_possible(_channel), do: :ok defp diagnose_channel_issues(_channel), do: :ok defp select_weighted_node(nodes, _weights), do: Enum.random(nodes) defp select_healthiest_node(nodes, _health), do: Enum.random(nodes) defp apply_routing_rules(routing_table, _rules), do: routing_table defp compile_channel_metrics(state), do: state.performance_metrics defp update_metrics(_channel, _routing_info, _result, _state), do: :ok end
Foundation.Distributed.Discovery - Intelligent Service Discovery
lib/foundation/distributed/discovery.ex
defmodule Foundation.Distributed.Discovery do @moduledoc """ Intelligent service discovery that works across cluster topologies.
Provides capability-based service matching, health-aware selection, and cross-cluster service federation. """
use GenServer require Logger
defstruct [ :discovery_strategies, :service_cache, :health_monitors, :capability_index ]
Public API
@doc """ Discovers services based on criteria.
Examples
# Find all database services
Foundation.Distributed.Discovery.discover_services(service_type: :database)
# Find services with specific capabilities
Foundation.Distributed.Discovery.discover_services(
capabilities: [:read_replica, :high_availability]
)
# Find healthy services on specific nodes
Foundation.Distributed.Discovery.discover_services(
health_status: :healthy,
nodes: [:node1, :node2]
)
""" def discover_services(criteria \ []) do GenServer.call(MODULE, {:discover_services, criteria}) end
@doc """ Registers a service with capabilities. """ def register_service(service_name, pid, capabilities \ []) do GenServer.call(MODULE, {:register_service, service_name, pid, capabilities}) end
@doc """ Gets the health status of discovered services. """ def get_service_health(service_name) do GenServer.call(MODULE, {:get_service_health, service_name}) end
GenServer Implementation
def start_link(opts \ []) do GenServer.start_link(MODULE, opts, name: MODULE) end
@impl true def init(opts) do state = %MODULE{ discovery_strategies: [:local, :cluster, :external], service_cache: %{}, health_monitors: %{}, capability_index: %{} }
# Start health monitoring
start_health_monitoring()
{:ok, state}
end
@impl true def handle_call({:discover_services, criteria}, _from, state) do case discover_with_criteria(criteria, state) do {:ok, services} -> {:reply, {:ok, services}, state} {:error, reason} -> {:reply, {:error, reason}, state} end end
@impl true def handle_call({:register_service, service_name, pid, capabilities}, _from, state) do new_state = register_service_locally(service_name, pid, capabilities, state) {:reply, :ok, new_state} end
@impl true def handle_call({:get_service_health, service_name}, _from, state) do health = get_health_status(service_name, state) {:reply, health, state} end
Service Discovery Implementation
defp discover_with_criteria(criteria, state) do # Start with local services local_matches = find_local_matches(criteria, state)
# Expand to cluster if needed
cluster_matches = if :cluster in state.discovery_strategies do
find_cluster_matches(criteria)
else
[]
end
# Combine and filter results
all_matches = local_matches ++ cluster_matches
filtered_matches = apply_discovery_filters(all_matches, criteria)
{:ok, filtered_matches}
end
defp find_local_matches(criteria, state) do Enum.filter(state.service_cache, fn {_name, service} -> matches_discovery_criteria?(service, criteria) end) |> Enum.map(fn {_name, service} -> service end) end
defp find_cluster_matches(criteria) do # Would query other nodes in cluster # For now, return empty list [] end
defp matches_discovery_criteria?(service, criteria) do Enum.all?(criteria, fn {key, value} -> case key do :service_type -> Map.get(service, :service_type) == value
:capabilities ->
required_caps = List.wrap(value)
service_caps = Map.get(service, :capabilities, [])
Enum.all?(required_caps, &(&1 in service_caps))
:health_status ->
Map.get(service, :health_status) == value
:nodes ->
service_node = Map.get(service, :node)
service_node in List.wrap(value)
_ ->
true
end
end)
end
defp register_service_locally(service_name, pid, capabilities, state) do service = %{ name: service_name, pid: pid, node: Node.self(), capabilities: capabilities, health_status: :healthy, registered_at: :os.system_time(:millisecond) }
new_cache = Map.put(state.service_cache, service_name, service)
new_capability_index = update_capability_index(state.capability_index, service_name, capabilities)
%{state |
service_cache: new_cache,
capability_index: new_capability_index
}
end
defp update_capability_index(index, service_name, capabilities) do Enum.reduce(capabilities, index, fn capability, acc -> services = Map.get(acc, capability, []) Map.put(acc, capability, [service_name | services]) end) end
defp apply_discovery_filters(services, criteria) do # Apply additional filters like proximity, load, etc. services end
defp get_health_status(service_name, state) do case Map.get(state.service_cache, service_name) do nil -> {:error, :service_not_found} service -> {:ok, Map.get(service, :health_status, :unknown)} end end
defp start_health_monitoring() do spawn_link(fn -> health_monitoring_loop() end) end
defp health_monitoring_loop() do # Would monitor service health :timer.sleep(30_000) health_monitoring_loop() end end