Of course. Here is a detailed technical document for the ninth proposed feature enhancement: “First-Class Support for Timeouts in ConnectionManager
.”
Technical Specification: First-Class Timeout Management in ConnectionManager
Document Version: 1.0 Author: AI Assistant Status: PROPOSED
1. Overview
This document specifies an enhancement to the Foundation.Infrastructure.ConnectionManager
to provide first-class, end-to-end timeout management for operations executed with a pooled worker.
The current implementation of with_connection/3
includes a timeout for checking out a worker from the pool, which is a critical feature provided by poolboy
. However, once a worker is checked out, there is no built-in mechanism to enforce a timeout on the work performed by that worker. If the user-provided function hangs (e.g., due to a non-responsive external API), it can hold the worker indefinitely, leading to pool starvation and cascading failures.
This enhancement will introduce a :timeout
option directly into with_connection/3
that applies to the entire operation, from checkout to the completion of the user’s function. This will be implemented using OTP’s native Task
and Task.await
capabilities, making the ConnectionManager
significantly more robust and predictable.
2. Problem Statement & Use Case
A DSPEx.Client.LM
uses the ConnectionManager
to manage a pool of HttpWorker
processes that make calls to an LLM provider.
The Failure Scenario:
- A
DSPEx.Predict
program calls theLM.Client
. - The
LM.Client
usesConnectionManager.with_connection/3
to check out anHttpWorker
. The checkout succeeds quickly. - The
HttpWorker
makes an HTTP request to the LLM API. - The LLM provider’s server accepts the connection but never sends a response back (it “hangs”).
- The
HttpWorker
process is now blocked indefinitely, waiting for a TCP response that will never arrive. - Because the worker is never checked back in, the connection pool now has one less available worker. If this happens repeatedly, the entire pool will be exhausted, and the application will become unresponsive.
The current with_connection
timeout does not protect against this scenario, as the timeout only applies to the checkout
step. We need a timeout that covers the entire “lease” of the worker.
3. Proposed API and Configuration Changes
The public API of Foundation.Infrastructure.ConnectionManager
will be updated. The change is subtle but powerful: the existing :timeout
option in with_connection/3
will have its meaning expanded.
3.1. with_connection/3
(Enhanced Semantics)
New Signature (Semantics Change):
# in foundation/infrastructure/connection_manager.ex
@spec with_connection(
pool_name :: pool_name(),
fun :: (pid() -> term()),
timeout_ms :: timeout()
) :: {:ok, term()} | {:error, term()}
timeout_ms
: This parameter will now represent the total maximum time allowed for the entire operation, including both checking out the worker and executing the provided functionfun
.
Example Usage (Unchanged for the user, but with new guarantees):
# This call now guarantees that the whole operation, including the `HttpWorker.post`
# call, will not take longer than 15 seconds.
ConnectionManager.with_connection(:http_pool, fn worker ->
HttpWorker.post(worker, "/v1/chat/completions", body)
end, 15_000)
4. Internal Implementation Details
The core change is within the private do_with_connection/4
helper function. It will now use Task.async
and Task.await
to wrap the execution of the user’s function.
4.1. do_with_connection/4
(New Implementation)
# in foundation/infrastructure/connection_manager.ex
defp do_with_connection(pool_name, pool_pid, fun, timeout) do
# The overall timeout for the entire operation.
total_timeout = timeout
# Start timing the checkout process.
checkout_start_time = System.monotonic_time()
# 1. Checkout the worker from the pool.
# We use a timeout here as well to avoid getting stuck forever if the pool is exhausted.
case :poolboy.transaction(pool_pid, fn worker ->
# `transaction` handles checkout/checkin, but we need more control.
# So we'll use a lower-level approach inside `try`.
:not_used_in_this_pattern
end, timeout) do
# This is a bit of a trick. We're using the `transaction` block's timeout
# feature but we won't actually run the code there.
# The actual pattern is more complex. Let's correct this to a more robust pattern.
end
end
# === CORRECTED AND FINAL IMPLEMENTATION ===
defp do_with_connection(pool_name, pool_pid, fun, timeout) do
# Record the deadline for the entire operation.
deadline = System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native)
worker = nil
try do
# 1. Checkout the worker.
# Calculate the remaining time for the checkout operation.
checkout_timeout = remaining_time_ms(deadline)
worker = :poolboy.checkout(pool_pid, true, checkout_timeout)
# Telemetry for successful checkout.
emit_telemetry(:checkout, %{}, %{pool_name: pool_name})
# 2. Execute the user's function inside a supervised Task.
# Calculate the remaining time for the function execution.
operation_timeout = remaining_time_ms(deadline)
# The task must trap exits to be supervised correctly by Task.await
task = Task.Supervisor.async_nolink(Foundation.TaskSupervisor, fn ->
fun.(worker)
end)
# 3. Await the result with the remaining timeout.
case Task.await(task, operation_timeout) do
result ->
{:ok, result}
end
# --- CATCH BLOCKS for various failures ---
catch
:exit, {:timeout, {Task, :await, _}} ->
# This catches the timeout from Task.await
Logger.warn("Operation in pool #{pool_name} timed out after #{timeout}ms.")
emit_telemetry(:operation_timeout, %{timeout: timeout}, %{pool_name: pool_name})
{:error, :operation_timeout}
:exit, {:timeout, _} ->
# This catches the timeout from :poolboy.checkout
Logger.warn("Checkout from pool #{pool_name} timed out after #{timeout}ms.")
emit_telemetry(:checkout_timeout, %{timeout: timeout}, %{pool_name: pool_name})
{:error, :checkout_timeout}
rescue
exception ->
Logger.error("Function execution error in pool #{pool_name}: #{inspect exception}")
{:error, {:execution_error, exception}}
after
# 4. CRITICAL: Ensure the worker is always checked back in.
if worker do
:poolboy.checkin(pool_pid, worker)
emit_telemetry(:checkin, %{}, %{pool_name: pool_name})
end
end
end
defp remaining_time_ms(deadline) do
remaining_native = deadline - System.monotonic_time()
if remaining_native > 0 do
System.convert_time_unit(remaining_native, :native, :millisecond)
else
0
end
end
Key Elements of the New Implementation:
- Deadline Management: Instead of a simple timeout value, the function calculates an absolute
deadline
for the entire operation. This ensures that time spent in thecheckout
phase is subtracted from the time available for theoperation
phase. Task.Supervisor.async_nolink
: The user’s function is executed within aTask
process. We useasync_nolink
because we don’t want a crash inside the user’s function to crash theConnectionManager
itself. The supervision is handled byTask.await
.Task.await/2
with Timeout: This is the core of the timeout mechanism.Task.await
will wait for thetask
to complete, but for no longer than theoperation_timeout
. If the timeout is exceeded, it raises an exit signal which is caught.- Robust
after
Block: Thetry...after
block is essential. It guarantees thatpoolboy.checkin
is called, returning the worker to the pool, regardless of whether the operation succeeded, failed with an exception, or timed out. This prevents worker leakage. - Specific Timeout Errors: The code can now distinguish between a
:checkout_timeout
(the pool was busy) and an:operation_timeout
(the user’s code was slow), providing much better observability.
5. Telemetry Enhancements
To support this new functionality, we will introduce a new telemetry event.
[:foundation, :connection_pool, :operation_timeout]
(New Event)- Measurements:
%{timeout: integer()}
(the configured timeout in ms). - Metadata:
%{pool_name: atom()}
. - Purpose: This allows operators to specifically monitor and alert on operations that are timing out, as distinct from failures or pool contention.
- Measurements:
6. Conclusion
Adding first-class, end-to-end timeout support to the ConnectionManager
is a critical enhancement for production-grade reliability. It protects the system from a common and dangerous failure mode: non-responsive external services.
For DSPEx
, this feature is non-negotiable. It ensures that a single hanging LLM API call cannot bring down the entire application by starving the connection pool. By building this logic directly into the foundation
library, we provide this powerful guarantee to all users of ConnectionManager
automatically, simplifying client-side code and dramatically improving the robustness of the entire system.