← Back to 002 foundation enhancements

010 9of10

Documentation for 010_9of10 from the Ds ex repository.

Of course. Here is a detailed technical document for the ninth proposed feature enhancement: “First-Class Support for Timeouts in ConnectionManager.”


Technical Specification: First-Class Timeout Management in ConnectionManager

Document Version: 1.0 Author: AI Assistant Status: PROPOSED

1. Overview

This document specifies an enhancement to the Foundation.Infrastructure.ConnectionManager to provide first-class, end-to-end timeout management for operations executed with a pooled worker.

The current implementation of with_connection/3 includes a timeout for checking out a worker from the pool, which is a critical feature provided by poolboy. However, once a worker is checked out, there is no built-in mechanism to enforce a timeout on the work performed by that worker. If the user-provided function hangs (e.g., due to a non-responsive external API), it can hold the worker indefinitely, leading to pool starvation and cascading failures.

This enhancement will introduce a :timeout option directly into with_connection/3 that applies to the entire operation, from checkout to the completion of the user’s function. This will be implemented using OTP’s native Task and Task.await capabilities, making the ConnectionManager significantly more robust and predictable.


2. Problem Statement & Use Case

A DSPEx.Client.LM uses the ConnectionManager to manage a pool of HttpWorker processes that make calls to an LLM provider.

The Failure Scenario:

  1. A DSPEx.Predict program calls the LM.Client.
  2. The LM.Client uses ConnectionManager.with_connection/3 to check out an HttpWorker. The checkout succeeds quickly.
  3. The HttpWorker makes an HTTP request to the LLM API.
  4. The LLM provider’s server accepts the connection but never sends a response back (it “hangs”).
  5. The HttpWorker process is now blocked indefinitely, waiting for a TCP response that will never arrive.
  6. Because the worker is never checked back in, the connection pool now has one less available worker. If this happens repeatedly, the entire pool will be exhausted, and the application will become unresponsive.

The current with_connection timeout does not protect against this scenario, as the timeout only applies to the checkout step. We need a timeout that covers the entire “lease” of the worker.


3. Proposed API and Configuration Changes

The public API of Foundation.Infrastructure.ConnectionManager will be updated. The change is subtle but powerful: the existing :timeout option in with_connection/3 will have its meaning expanded.

3.1. with_connection/3 (Enhanced Semantics)

New Signature (Semantics Change):

# in foundation/infrastructure/connection_manager.ex

@spec with_connection(
  pool_name :: pool_name(), 
  fun :: (pid() -> term()), 
  timeout_ms :: timeout()
) :: {:ok, term()} | {:error, term()}
  • timeout_ms: This parameter will now represent the total maximum time allowed for the entire operation, including both checking out the worker and executing the provided function fun.

Example Usage (Unchanged for the user, but with new guarantees):

# This call now guarantees that the whole operation, including the `HttpWorker.post`
# call, will not take longer than 15 seconds.
ConnectionManager.with_connection(:http_pool, fn worker ->
  HttpWorker.post(worker, "/v1/chat/completions", body)
end, 15_000)

4. Internal Implementation Details

The core change is within the private do_with_connection/4 helper function. It will now use Task.async and Task.await to wrap the execution of the user’s function.

4.1. do_with_connection/4 (New Implementation)
# in foundation/infrastructure/connection_manager.ex

defp do_with_connection(pool_name, pool_pid, fun, timeout) do
  # The overall timeout for the entire operation.
  total_timeout = timeout
  
  # Start timing the checkout process.
  checkout_start_time = System.monotonic_time()

  # 1. Checkout the worker from the pool.
  #    We use a timeout here as well to avoid getting stuck forever if the pool is exhausted.
  case :poolboy.transaction(pool_pid, fn worker -> 
    # `transaction` handles checkout/checkin, but we need more control.
    # So we'll use a lower-level approach inside `try`.
    :not_used_in_this_pattern
  end, timeout) do
    # This is a bit of a trick. We're using the `transaction` block's timeout
    # feature but we won't actually run the code there.
    # The actual pattern is more complex. Let's correct this to a more robust pattern.
  end
end


# === CORRECTED AND FINAL IMPLEMENTATION ===

defp do_with_connection(pool_name, pool_pid, fun, timeout) do
  # Record the deadline for the entire operation.
  deadline = System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native)

  worker = nil
  try do
    # 1. Checkout the worker.
    #    Calculate the remaining time for the checkout operation.
    checkout_timeout = remaining_time_ms(deadline)
    worker = :poolboy.checkout(pool_pid, true, checkout_timeout)

    # Telemetry for successful checkout.
    emit_telemetry(:checkout, %{}, %{pool_name: pool_name})

    # 2. Execute the user's function inside a supervised Task.
    #    Calculate the remaining time for the function execution.
    operation_timeout = remaining_time_ms(deadline)

    # The task must trap exits to be supervised correctly by Task.await
    task = Task.Supervisor.async_nolink(Foundation.TaskSupervisor, fn ->
      fun.(worker)
    end)

    # 3. Await the result with the remaining timeout.
    case Task.await(task, operation_timeout) do
      result ->
        {:ok, result}
    end
  
  # --- CATCH BLOCKS for various failures ---
  catch
    :exit, {:timeout, {Task, :await, _}} ->
      # This catches the timeout from Task.await
      Logger.warn("Operation in pool #{pool_name} timed out after #{timeout}ms.")
      emit_telemetry(:operation_timeout, %{timeout: timeout}, %{pool_name: pool_name})
      {:error, :operation_timeout}

    :exit, {:timeout, _} ->
      # This catches the timeout from :poolboy.checkout
      Logger.warn("Checkout from pool #{pool_name} timed out after #{timeout}ms.")
      emit_telemetry(:checkout_timeout, %{timeout: timeout}, %{pool_name: pool_name})
      {:error, :checkout_timeout}

  rescue
    exception ->
      Logger.error("Function execution error in pool #{pool_name}: #{inspect exception}")
      {:error, {:execution_error, exception}}
  after
    # 4. CRITICAL: Ensure the worker is always checked back in.
    if worker do
      :poolboy.checkin(pool_pid, worker)
      emit_telemetry(:checkin, %{}, %{pool_name: pool_name})
    end
  end
end

defp remaining_time_ms(deadline) do
  remaining_native = deadline - System.monotonic_time()
  if remaining_native > 0 do
    System.convert_time_unit(remaining_native, :native, :millisecond)
  else
    0
  end
end

Key Elements of the New Implementation:

  1. Deadline Management: Instead of a simple timeout value, the function calculates an absolute deadline for the entire operation. This ensures that time spent in the checkout phase is subtracted from the time available for the operation phase.
  2. Task.Supervisor.async_nolink: The user’s function is executed within a Task process. We use async_nolink because we don’t want a crash inside the user’s function to crash the ConnectionManager itself. The supervision is handled by Task.await.
  3. Task.await/2 with Timeout: This is the core of the timeout mechanism. Task.await will wait for the task to complete, but for no longer than the operation_timeout. If the timeout is exceeded, it raises an exit signal which is caught.
  4. Robust after Block: The try...after block is essential. It guarantees that poolboy.checkin is called, returning the worker to the pool, regardless of whether the operation succeeded, failed with an exception, or timed out. This prevents worker leakage.
  5. Specific Timeout Errors: The code can now distinguish between a :checkout_timeout (the pool was busy) and an :operation_timeout (the user’s code was slow), providing much better observability.

5. Telemetry Enhancements

To support this new functionality, we will introduce a new telemetry event.

  • [:foundation, :connection_pool, :operation_timeout] (New Event)
    • Measurements: %{timeout: integer()} (the configured timeout in ms).
    • Metadata: %{pool_name: atom()}.
    • Purpose: This allows operators to specifically monitor and alert on operations that are timing out, as distinct from failures or pool contention.

6. Conclusion

Adding first-class, end-to-end timeout support to the ConnectionManager is a critical enhancement for production-grade reliability. It protects the system from a common and dangerous failure mode: non-responsive external services.

For DSPEx, this feature is non-negotiable. It ensures that a single hanging LLM API call cannot bring down the entire application by starving the connection pool. By building this logic directly into the foundation library, we provide this powerful guarantee to all users of ConnectionManager automatically, simplifying client-side code and dramatically improving the robustness of the entire system.