← Back to 002 foundation enhancements

002 1of10

Documentation for 002_1of10 from the Ds ex repository.

Of course. Here is a detailed technical document for the “Enhanced RateLimiter with Multiple Buckets & Backpressure” feature enhancement.


Technical Specification: Enhanced Foundation.Infrastructure.RateLimiter

Document Version: 1.0 Author: AI Assistant Status: PROPOSED

1. Overview

This document details the design and implementation of an enhanced RateLimiter module for the foundation library. The current implementation is effective for simple rate limits (e.g., N requests per time window) but needs to evolve to support the complex, multi-dimensional rate-limiting schemes common in modern cloud services and AI APIs.

The primary driver for this enhancement is the DSPEx project, which must interact with LLM providers that enforce simultaneous limits on Requests Per Minute (RPM) and Tokens Per Minute (TPM).

This enhancement will introduce two key capabilities:

  1. Multi-Bucket Rate Limiting: The ability to check and update multiple rate limit buckets (e.g., one for requests, one for tokens) in a single, atomic operation.
  2. Backpressure Mechanism: A new check_and_wait_rate/5 function that, instead of failing immediately when a limit is exceeded, will intelligently wait until the limit has reset, providing automatic backpressure for high-throughput clients.

2. Problem Statement & Use Case

A DSPEx.Client.LM must manage requests to an OpenAI endpoint which has the following limits for a given model:

  • RPM Limit: 10,000 requests per minute.
  • TPM Limit: 2,000,000 tokens per minute.

When the client is about to send a new request, it must check if this single request would violate either limit. The request has a cost in both dimensions: 1 request and N tokens (e.g., 1500 tokens).

  • If (current_requests + 1) > 10,000, the request should be delayed.
  • If (current_tokens + 1500) > 2,000,000, the request should be delayed.

The current RateLimiter.check_rate/5 can only check one of these dimensions at a time, leading to potential race conditions or complex, inefficient client-side logic that involves checking one limit, then the other.


3. Proposed API Changes

The public-facing API in Foundation.Infrastructure.RateLimiter will be updated.

3.1. check_rate/5 (Enhanced)

The function signature will be modified to accept a list of costs and limits.

New Signature:

@spec check_rate(
  entity_id :: entity_id(), 
  operation :: operation(), 
  costs :: [{atom(), pos_integer()}],
  limits :: [{atom(), {rate_limit(), time_window()}}],
  metadata :: map()
) :: :ok | {:error, Error.t()}
  • costs: A keyword list specifying the “cost” of the current operation in each dimension. The key is the name of the bucket (e.g., :rpm, :tpm), and the value is the amount to increment by.
  • limits: A keyword list defining the rules for each bucket. The key matches the bucket name, and the value is a {limit, window_ms} tuple.

Example Usage (from DSPEx):

request_cost = [rpm: 1, tpm: 1500]
provider_limits = [rpm: {10_000, 60_000}, tpm: {2_000_000, 60_000}]

case RateLimiter.check_rate("my_openai_key", :api_call, request_cost, provider_limits) do
  :ok ->
    # Proceed with API call
  {:error, %Error{error_type: :rate_limit_exceeded, context: ctx}} ->
    # Handle failure, ctx will contain %{bucket: :tpm, ...}
end
3.2. check_and_wait_rate/5 (New Function)

This new function provides the backpressure mechanism. It has the same signature as the enhanced check_rate/5.

New Signature:

@spec check_and_wait_rate(
  entity_id :: entity_id(), 
  operation :: operation(), 
  costs :: [{atom(), pos_integer()}],
  limits :: [{atom(), {rate_limit(), time_window()}}],
  metadata :: map()
) :: :ok | {:error, Error.t()}

Behavior:

  • If all limits are clear, it returns :ok immediately.
  • If one or more limits are exceeded, it does not return an error. Instead, it calculates the longest time it needs to wait for all relevant buckets to have sufficient capacity.
  • It then blocks (using Process.sleep/1) for that duration and returns :ok.
  • It only returns {:error, ...} if there is an underlying system failure (e.g., the Hammer backend is down).

Example Usage (from DSPEx.Evaluate):

# Inside a Task.async_stream worker
def evaluate_example(program, example) do
  request_cost = # ... calculate token cost ...
  provider_limits = # ... get from config ...

  # This call will automatically block if we're sending too many requests,
  # providing backpressure to the entire evaluation stream.
  :ok = RateLimiter.check_and_wait_rate(
    "my_api_key", :evaluation, request_cost, provider_limits
  )

  # Now we can safely make the API call
  program.forward(example)
end

4. Internal Implementation Details

To support these new APIs, the underlying implementation needs significant changes.

4.1. Multi-Bucket Hammer Backend

The Hammer library is designed around a single bucket per key. To support multi-dimensional limits, we will namespace the keys.

The build_rate_key/3 private function will be introduced:

# in foundation/infrastructure/rate_limiter.ex

defp build_rate_key(entity_id, operation, bucket_name) do
  # e.g., "foundation:my_openai_key:api_call:rpm"
  "foundation:#{entity_id}:#{operation}:#{bucket_name}"
end

The check_rate and check_and_wait_rate functions will iterate through the provided costs and limits, checking each bucket individually.

4.2. check_rate Logic
# in foundation/infrastructure/rate_limiter.ex

def check_rate(entity_id, operation, costs, limits, metadata \\ %{}) do
  # Use Enum.reduce_while to check each bucket and stop on the first failure.
  Enum.reduce_while(costs, :ok, fn {bucket, cost}, _acc ->
    case Keyword.get(limits, bucket) do
      {limit, window} ->
        key = build_rate_key(entity_id, operation, bucket)
        
        # We need a new Hammer function: check_and_incr
        # It checks if `cost` can be added without exceeding `limit`.
        # If yes, it increments and returns {:allow, new_count}.
        # If no, it returns {:deny, current_count}.
        case HammerBackend.check_and_incr(key, window, limit, cost) do
          {:allow, _count} ->
            {:cont, :ok} # Continue to check the next bucket
            
          {:deny, _count} ->
            error = Error.new(:rate_limit_exceeded, "...", context: %{bucket: bucket})
            {:halt, {:error, error}} # Stop and return error
        end
      
      nil ->
        # Cost specified for a bucket with no defined limit
        error = Error.new(:invalid_argument, "No limit defined for bucket: #{bucket}")
        {:halt, {:error, error}}
    end
  end)
end

Note: This requires a new function in the HammerBackend that can atomically check-and-increment by a variable amount. If Hammer doesn’t support this, we would need to implement it ourselves using an ETS update_counter with a match spec, or protect the read-increment-write sequence with a GenServer.

4.3. check_and_wait_rate Logic

This is the more complex implementation.

# in foundation/infrastructure/rate_limiter.ex

def check_and_wait_rate(entity_id, operation, costs, limits, metadata \\ %{}) do
  # 1. Get the status of all relevant buckets.
  bucket_statuses =
    costs
    |> Enum.map(fn {bucket, cost} ->
      {limit, window} = Keyword.get(limits, bucket, {nil, nil})
      key = build_rate_key(entity_id, operation, bucket)
      
      # We need a new Hammer function: get_bucket_status
      # It should return {current_count, time_until_reset_ms}
      {:ok, {count, ttl_ms}} = HammerBackend.get_bucket_status(key, window)
      
      %{bucket: bucket, cost: cost, limit: limit, count: count, ttl_ms: ttl_ms}
    end)
    
  # 2. Calculate the required wait time for each bucket.
  wait_times =
    bucket_statuses
    |> Enum.map(fn status ->
      if (status.count + status.cost) > status.limit do
        # We are over the limit, we must wait for the bucket to reset.
        status.ttl_ms
      else
        0 # No wait needed for this bucket
      end
    end)
    
  # 3. Find the longest required wait time.
  longest_wait_ms = Enum.max(wait_times ++ [0])
  
  # 4. Perform the wait (backpressure) and then increment the counters.
  if longest_wait_ms > 0 do
    Logger.debug("Rate limit backpressure applied. Waiting for #{longest_wait_ms}ms.")
    Telemetry.emit_counter([:foundation, :rate_limiter, :backpressure_wait], %{duration_ms: longest_wait_ms})
    Process.sleep(longest_wait_ms)
  end
  
  # 5. Atomically increment all bucket counters *after* the wait.
  Enum.each(costs, fn {bucket, cost} ->
    {limit, window} = Keyword.get(limits, bucket)
    key = build_rate_key(entity_id, operation, bucket)
    HammerBackend.incr(key, window, limit, cost) # A function that just increments
  end)
  
  :ok
end

New HammerBackend Functions Required:

The logic above assumes the existence of new, more powerful functions in our Hammer wrapper.

  1. check_and_incr(key, window, limit, cost):

    • Atomically checks if current_count + cost <= limit.
    • If true, increments the counter by cost and returns {:allow, new_count}.
    • If false, does nothing and returns {:deny, current_count}.
    • This is the core of the enhanced check_rate.
  2. get_bucket_status(key, window):

    • Returns {:ok, {count, ttl_ms}} where ttl_ms is the time remaining until the current bucket expires.
    • This is essential for the check_and_wait_rate backpressure calculation.
  3. incr(key, window, limit, cost):

    • Unconditionally increments the bucket by cost. Used by check_and_wait_rate after the wait has been performed.

5. Telemetry and Observability

The enhanced RateLimiter will emit more detailed telemetry events.

  • [:foundation, :rate_limiter, :request_denied]
    • Metadata: Will now include a :bucket key (e.g., :tpm) to identify which specific limit was exceeded.
  • [:foundation, :rate_limiter, :backpressure_wait] (New Event)
    • Measurements: %{duration_ms: integer()}
    • Metadata: %{entity_id: ..., operation: ...}
    • This metric is critical for monitoring system throughput and identifying when backpressure is being applied. High values or frequent occurrences of this event indicate the system is consistently trying to exceed API limits.

6. Conclusion

This enhancement transforms the RateLimiter from a simple request counter into a sophisticated traffic-shaping and backpressure mechanism. It directly addresses the complex requirements of modern AI APIs and provides an invaluable feature for the high-throughput optimizers planned for DSPEx. By implementing automatic waiting (check_and_wait_rate), we dramatically simplify the client-side code, making the entire system more robust, predictable, and easier to manage. This is a high-value investment for the foundation library.