Of course. Here is a detailed technical document for the “Enhanced RateLimiter with Multiple Buckets & Backpressure” feature enhancement.
Technical Specification: Enhanced Foundation.Infrastructure.RateLimiter
Document Version: 1.0 Author: AI Assistant Status: PROPOSED
1. Overview
This document details the design and implementation of an enhanced RateLimiter
module for the foundation
library. The current implementation is effective for simple rate limits (e.g., N requests per time window) but needs to evolve to support the complex, multi-dimensional rate-limiting schemes common in modern cloud services and AI APIs.
The primary driver for this enhancement is the DSPEx
project, which must interact with LLM providers that enforce simultaneous limits on Requests Per Minute (RPM) and Tokens Per Minute (TPM).
This enhancement will introduce two key capabilities:
- Multi-Bucket Rate Limiting: The ability to check and update multiple rate limit buckets (e.g., one for requests, one for tokens) in a single, atomic operation.
- Backpressure Mechanism: A new
check_and_wait_rate/5
function that, instead of failing immediately when a limit is exceeded, will intelligently wait until the limit has reset, providing automatic backpressure for high-throughput clients.
2. Problem Statement & Use Case
A DSPEx.Client.LM
must manage requests to an OpenAI endpoint which has the following limits for a given model:
- RPM Limit: 10,000 requests per minute.
- TPM Limit: 2,000,000 tokens per minute.
When the client is about to send a new request, it must check if this single request would violate either limit. The request has a cost
in both dimensions: 1
request and N
tokens (e.g., 1500 tokens).
- If
(current_requests + 1) > 10,000
, the request should be delayed. - If
(current_tokens + 1500) > 2,000,000
, the request should be delayed.
The current RateLimiter.check_rate/5
can only check one of these dimensions at a time, leading to potential race conditions or complex, inefficient client-side logic that involves checking one limit, then the other.
3. Proposed API Changes
The public-facing API in Foundation.Infrastructure.RateLimiter
will be updated.
3.1. check_rate/5
(Enhanced)
The function signature will be modified to accept a list of costs and limits.
New Signature:
@spec check_rate(
entity_id :: entity_id(),
operation :: operation(),
costs :: [{atom(), pos_integer()}],
limits :: [{atom(), {rate_limit(), time_window()}}],
metadata :: map()
) :: :ok | {:error, Error.t()}
costs
: A keyword list specifying the “cost” of the current operation in each dimension. The key is the name of the bucket (e.g.,:rpm
,:tpm
), and the value is the amount to increment by.limits
: A keyword list defining the rules for each bucket. The key matches the bucket name, and the value is a{limit, window_ms}
tuple.
Example Usage (from DSPEx
):
request_cost = [rpm: 1, tpm: 1500]
provider_limits = [rpm: {10_000, 60_000}, tpm: {2_000_000, 60_000}]
case RateLimiter.check_rate("my_openai_key", :api_call, request_cost, provider_limits) do
:ok ->
# Proceed with API call
{:error, %Error{error_type: :rate_limit_exceeded, context: ctx}} ->
# Handle failure, ctx will contain %{bucket: :tpm, ...}
end
3.2. check_and_wait_rate/5
(New Function)
This new function provides the backpressure mechanism. It has the same signature as the enhanced check_rate/5
.
New Signature:
@spec check_and_wait_rate(
entity_id :: entity_id(),
operation :: operation(),
costs :: [{atom(), pos_integer()}],
limits :: [{atom(), {rate_limit(), time_window()}}],
metadata :: map()
) :: :ok | {:error, Error.t()}
Behavior:
- If all limits are clear, it returns
:ok
immediately. - If one or more limits are exceeded, it does not return an error. Instead, it calculates the longest time it needs to wait for all relevant buckets to have sufficient capacity.
- It then blocks (using
Process.sleep/1
) for that duration and returns:ok
. - It only returns
{:error, ...}
if there is an underlying system failure (e.g., the Hammer backend is down).
Example Usage (from DSPEx.Evaluate
):
# Inside a Task.async_stream worker
def evaluate_example(program, example) do
request_cost = # ... calculate token cost ...
provider_limits = # ... get from config ...
# This call will automatically block if we're sending too many requests,
# providing backpressure to the entire evaluation stream.
:ok = RateLimiter.check_and_wait_rate(
"my_api_key", :evaluation, request_cost, provider_limits
)
# Now we can safely make the API call
program.forward(example)
end
4. Internal Implementation Details
To support these new APIs, the underlying implementation needs significant changes.
4.1. Multi-Bucket Hammer
Backend
The Hammer
library is designed around a single bucket per key. To support multi-dimensional limits, we will namespace the keys.
The build_rate_key/3
private function will be introduced:
# in foundation/infrastructure/rate_limiter.ex
defp build_rate_key(entity_id, operation, bucket_name) do
# e.g., "foundation:my_openai_key:api_call:rpm"
"foundation:#{entity_id}:#{operation}:#{bucket_name}"
end
The check_rate
and check_and_wait_rate
functions will iterate through the provided costs
and limits
, checking each bucket individually.
4.2. check_rate
Logic
# in foundation/infrastructure/rate_limiter.ex
def check_rate(entity_id, operation, costs, limits, metadata \\ %{}) do
# Use Enum.reduce_while to check each bucket and stop on the first failure.
Enum.reduce_while(costs, :ok, fn {bucket, cost}, _acc ->
case Keyword.get(limits, bucket) do
{limit, window} ->
key = build_rate_key(entity_id, operation, bucket)
# We need a new Hammer function: check_and_incr
# It checks if `cost` can be added without exceeding `limit`.
# If yes, it increments and returns {:allow, new_count}.
# If no, it returns {:deny, current_count}.
case HammerBackend.check_and_incr(key, window, limit, cost) do
{:allow, _count} ->
{:cont, :ok} # Continue to check the next bucket
{:deny, _count} ->
error = Error.new(:rate_limit_exceeded, "...", context: %{bucket: bucket})
{:halt, {:error, error}} # Stop and return error
end
nil ->
# Cost specified for a bucket with no defined limit
error = Error.new(:invalid_argument, "No limit defined for bucket: #{bucket}")
{:halt, {:error, error}}
end
end)
end
Note: This requires a new function in the HammerBackend
that can atomically check-and-increment by a variable amount. If Hammer
doesn’t support this, we would need to implement it ourselves using an ETS update_counter
with a match spec, or protect the read-increment-write sequence with a GenServer
.
4.3. check_and_wait_rate
Logic
This is the more complex implementation.
# in foundation/infrastructure/rate_limiter.ex
def check_and_wait_rate(entity_id, operation, costs, limits, metadata \\ %{}) do
# 1. Get the status of all relevant buckets.
bucket_statuses =
costs
|> Enum.map(fn {bucket, cost} ->
{limit, window} = Keyword.get(limits, bucket, {nil, nil})
key = build_rate_key(entity_id, operation, bucket)
# We need a new Hammer function: get_bucket_status
# It should return {current_count, time_until_reset_ms}
{:ok, {count, ttl_ms}} = HammerBackend.get_bucket_status(key, window)
%{bucket: bucket, cost: cost, limit: limit, count: count, ttl_ms: ttl_ms}
end)
# 2. Calculate the required wait time for each bucket.
wait_times =
bucket_statuses
|> Enum.map(fn status ->
if (status.count + status.cost) > status.limit do
# We are over the limit, we must wait for the bucket to reset.
status.ttl_ms
else
0 # No wait needed for this bucket
end
end)
# 3. Find the longest required wait time.
longest_wait_ms = Enum.max(wait_times ++ [0])
# 4. Perform the wait (backpressure) and then increment the counters.
if longest_wait_ms > 0 do
Logger.debug("Rate limit backpressure applied. Waiting for #{longest_wait_ms}ms.")
Telemetry.emit_counter([:foundation, :rate_limiter, :backpressure_wait], %{duration_ms: longest_wait_ms})
Process.sleep(longest_wait_ms)
end
# 5. Atomically increment all bucket counters *after* the wait.
Enum.each(costs, fn {bucket, cost} ->
{limit, window} = Keyword.get(limits, bucket)
key = build_rate_key(entity_id, operation, bucket)
HammerBackend.incr(key, window, limit, cost) # A function that just increments
end)
:ok
end
New HammerBackend
Functions Required:
The logic above assumes the existence of new, more powerful functions in our Hammer
wrapper.
check_and_incr(key, window, limit, cost)
:- Atomically checks if
current_count + cost <= limit
. - If
true
, increments the counter bycost
and returns{:allow, new_count}
. - If
false
, does nothing and returns{:deny, current_count}
. - This is the core of the enhanced
check_rate
.
- Atomically checks if
get_bucket_status(key, window)
:- Returns
{:ok, {count, ttl_ms}}
wherettl_ms
is the time remaining until the current bucket expires. - This is essential for the
check_and_wait_rate
backpressure calculation.
- Returns
incr(key, window, limit, cost)
:- Unconditionally increments the bucket by
cost
. Used bycheck_and_wait_rate
after the wait has been performed.
- Unconditionally increments the bucket by
5. Telemetry and Observability
The enhanced RateLimiter
will emit more detailed telemetry events.
[:foundation, :rate_limiter, :request_denied]
- Metadata: Will now include a
:bucket
key (e.g.,:tpm
) to identify which specific limit was exceeded.
- Metadata: Will now include a
[:foundation, :rate_limiter, :backpressure_wait]
(New Event)- Measurements:
%{duration_ms: integer()}
- Metadata:
%{entity_id: ..., operation: ...}
- This metric is critical for monitoring system throughput and identifying when backpressure is being applied. High values or frequent occurrences of this event indicate the system is consistently trying to exceed API limits.
- Measurements:
6. Conclusion
This enhancement transforms the RateLimiter
from a simple request counter into a sophisticated traffic-shaping and backpressure mechanism. It directly addresses the complex requirements of modern AI APIs and provides an invaluable feature for the high-throughput optimizers planned for DSPEx
. By implementing automatic waiting (check_and_wait_rate
), we dramatically simplify the client-side code, making the entire system more robust, predictable, and easier to manage. This is a high-value investment for the foundation
library.