Rate Limiting

Understanding and managing API rate limits for optimal BroxiAI integration

Learn how to understand, monitor, and work within BroxiAI's API rate limits to ensure reliable and efficient application performance.

Rate Limiting Overview

What is Rate Limiting?

Rate limiting controls the number of API requests you can make within a specific time period. This ensures fair usage across all users and maintains system stability and performance.

Benefits of Rate Limiting

  • Prevents system overload and ensures stability

  • Guarantees fair access for all users

  • Protects against abuse and misuse

  • Maintains consistent performance

  • Enables predictable cost management

Rate Limit Structure

Standard Rate Limits

Rate Limits by Plan:
  Free Tier:
    requests_per_minute: 20
    requests_per_hour: 1000
    requests_per_day: 10000
    concurrent_requests: 2

  Pro Plan:
    requests_per_minute: 100
    requests_per_hour: 6000
    requests_per_day: 100000
    concurrent_requests: 5

  Enterprise Plan:
    requests_per_minute: 500
    requests_per_hour: 30000
    requests_per_day: 1000000
    concurrent_requests: 20

  Custom Enterprise:
    requests_per_minute: "negotiable"
    requests_per_hour: "negotiable"
    requests_per_day: "negotiable"
    concurrent_requests: "negotiable"

Understanding Rate Limit Headers

Response Headers

Rate Limit Information Headers

HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 95
X-RateLimit-Reset: 1642694400
X-RateLimit-Window: 60
X-RateLimit-Retry-After: 45
Content-Type: application/json

Header Explanations

  • X-RateLimit-Limit: Maximum requests allowed in the current window

  • X-RateLimit-Remaining: Requests remaining in current window

  • X-RateLimit-Reset: Unix timestamp when the rate limit resets

  • X-RateLimit-Window: Rate limit window in seconds

  • X-RateLimit-Retry-After: Seconds to wait before retrying (when rate limited)

Rate Limit Response

When Rate Limited (429 Status)

HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1642694460
X-RateLimit-Retry-After: 60
Content-Type: application/json

{
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "Rate limit exceeded. Please retry after 60 seconds.",
    "details": {
      "limit": 100,
      "window": "1 minute",
      "reset_time": "2024-01-20T10:31:00Z"
    }
  },
  "request_id": "req_abc123"
}

Rate Limit Implementation

Basic Rate Limit Handling

Python Implementation

import time
import requests
from datetime import datetime, timedelta

class BroxiRateLimiter:
    def __init__(self, api_token):
        self.api_token = api_token
        self.base_url = "https://api.broxi.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json"
        })
        
        # Rate limit tracking
        self.requests_made = 0
        self.window_start = time.time()
        self.rate_limit_info = {}
    
    def make_request(self, method, endpoint, **kwargs):
        """Make API request with rate limit handling"""
        
        # Check if we need to wait
        self.check_rate_limit()
        
        url = f"{self.base_url}{endpoint}"
        response = self.session.request(method, url, **kwargs)
        
        # Update rate limit info from headers
        self.update_rate_limit_info(response.headers)
        
        # Handle rate limit exceeded
        if response.status_code == 429:
            retry_after = int(response.headers.get('X-RateLimit-Retry-After', 60))
            print(f"Rate limit exceeded. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
            
            # Retry the request
            return self.make_request(method, endpoint, **kwargs)
        
        response.raise_for_status()
        return response.json()
    
    def check_rate_limit(self):
        """Check if we need to wait before making request"""
        
        if not self.rate_limit_info:
            return  # No rate limit info yet
        
        remaining = self.rate_limit_info.get('remaining', 1)
        reset_time = self.rate_limit_info.get('reset', 0)
        
        if remaining <= 0:
            wait_time = reset_time - time.time()
            if wait_time > 0:
                print(f"Rate limit reached. Waiting {wait_time:.1f} seconds...")
                time.sleep(wait_time)
    
    def update_rate_limit_info(self, headers):
        """Update rate limit information from response headers"""
        
        self.rate_limit_info = {
            'limit': int(headers.get('X-RateLimit-Limit', 0)),
            'remaining': int(headers.get('X-RateLimit-Remaining', 0)),
            'reset': int(headers.get('X-RateLimit-Reset', 0)),
            'window': int(headers.get('X-RateLimit-Window', 60))
        }
    
    def get_rate_limit_status(self):
        """Get current rate limit status"""
        
        if not self.rate_limit_info:
            return "No rate limit information available"
        
        reset_time = datetime.fromtimestamp(self.rate_limit_info['reset'])
        
        return {
            "limit": self.rate_limit_info['limit'],
            "remaining": self.rate_limit_info['remaining'],
            "reset_time": reset_time.isoformat(),
            "window_seconds": self.rate_limit_info['window'],
            "usage_percentage": (
                (self.rate_limit_info['limit'] - self.rate_limit_info['remaining']) / 
                self.rate_limit_info['limit'] * 100
            )
        }

# Usage example
client = BroxiRateLimiter("your_api_token")

# Make requests with automatic rate limiting
result = client.make_request("POST", "/flows/my-workflow/run", json={
    "input": "Hello, world!"
})

# Check rate limit status
status = client.get_rate_limit_status()
print(f"Rate limit usage: {status['usage_percentage']:.1f}%")

JavaScript Implementation

class BroxiRateLimiter {
    constructor(apiToken) {
        this.apiToken = apiToken;
        this.baseURL = 'https://api.broxi.ai/v1';
        this.rateLimitInfo = {};
    }
    
    async makeRequest(method, endpoint, data = null) {
        // Check if we need to wait
        await this.checkRateLimit();
        
        const url = `${this.baseURL}${endpoint}`;
        const options = {
            method: method,
            headers: {
                'Authorization': `Bearer ${this.apiToken}`,
                'Content-Type': 'application/json'
            }
        };
        
        if (data) {
            options.body = JSON.stringify(data);
        }
        
        try {
            const response = await fetch(url, options);
            
            // Update rate limit info
            this.updateRateLimitInfo(response.headers);
            
            // Handle rate limiting
            if (response.status === 429) {
                const retryAfter = parseInt(response.headers.get('X-RateLimit-Retry-After') || '60');
                console.log(`Rate limit exceeded. Waiting ${retryAfter} seconds...`);
                
                await this.sleep(retryAfter * 1000);
                return this.makeRequest(method, endpoint, data);
            }
            
            if (!response.ok) {
                throw new Error(`HTTP ${response.status}: ${response.statusText}`);
            }
            
            return response.json();
        } catch (error) {
            console.error('API request failed:', error);
            throw error;
        }
    }
    
    async checkRateLimit() {
        if (!this.rateLimitInfo.remaining) return;
        
        const { remaining, reset } = this.rateLimitInfo;
        
        if (remaining <= 0) {
            const waitTime = reset - Math.floor(Date.now() / 1000);
            if (waitTime > 0) {
                console.log(`Rate limit reached. Waiting ${waitTime} seconds...`);
                await this.sleep(waitTime * 1000);
            }
        }
    }
    
    updateRateLimitInfo(headers) {
        this.rateLimitInfo = {
            limit: parseInt(headers.get('X-RateLimit-Limit') || '0'),
            remaining: parseInt(headers.get('X-RateLimit-Remaining') || '0'),
            reset: parseInt(headers.get('X-RateLimit-Reset') || '0'),
            window: parseInt(headers.get('X-RateLimit-Window') || '60')
        };
    }
    
    getRateLimitStatus() {
        if (!this.rateLimitInfo.limit) {
            return "No rate limit information available";
        }
        
        const usagePercentage = 
            ((this.rateLimitInfo.limit - this.rateLimitInfo.remaining) / 
             this.rateLimitInfo.limit) * 100;
        
        return {
            limit: this.rateLimitInfo.limit,
            remaining: this.rateLimitInfo.remaining,
            resetTime: new Date(this.rateLimitInfo.reset * 1000).toISOString(),
            windowSeconds: this.rateLimitInfo.window,
            usagePercentage: usagePercentage.toFixed(1)
        };
    }
    
    sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
}

// Usage
const client = new BroxiRateLimiter('your_api_token');

// Make requests with rate limiting
client.makeRequest('POST', '/flows/my-workflow/run', {
    input: 'Hello, world!'
}).then(result => {
    console.log('Result:', result);
    console.log('Rate limit status:', client.getRateLimitStatus());
}).catch(error => {
    console.error('Error:', error);
});

Advanced Rate Limiting Strategies

Exponential Backoff

Exponential Backoff Implementation

import random
import time

class ExponentialBackoffRateLimiter:
    def __init__(self, api_token, base_delay=1, max_delay=60, max_retries=5):
        self.api_token = api_token
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.max_retries = max_retries
    
    def make_request_with_backoff(self, request_func, *args, **kwargs):
        """Make request with exponential backoff on rate limit"""
        
        for attempt in range(self.max_retries):
            try:
                return request_func(*args, **kwargs)
            
            except RateLimitError as e:
                if attempt == self.max_retries - 1:
                    raise e  # Last attempt, re-raise the error
                
                # Calculate delay with jitter
                delay = min(
                    self.base_delay * (2 ** attempt) + random.uniform(0, 1),
                    self.max_delay
                )
                
                print(f"Rate limited (attempt {attempt + 1}). Retrying in {delay:.1f} seconds...")
                time.sleep(delay)
        
        raise Exception("Max retries exceeded")
    
    def adaptive_backoff(self, success_rate):
        """Adapt backoff strategy based on success rate"""
        
        if success_rate > 0.9:
            # High success rate, be more aggressive
            self.base_delay = max(0.5, self.base_delay * 0.9)
        elif success_rate < 0.7:
            # Low success rate, be more conservative
            self.base_delay = min(5.0, self.base_delay * 1.2)

Request Queuing

Queue-Based Rate Limiting

import asyncio
import time
from collections import deque

class QueuedRateLimiter:
    def __init__(self, api_token, rate_limit=100, window=60):
        self.api_token = api_token
        self.rate_limit = rate_limit
        self.window = window
        self.request_queue = deque()
        self.request_times = deque()
        self.processing = False
    
    async def add_request(self, request_func, *args, **kwargs):
        """Add request to queue"""
        
        future = asyncio.Future()
        self.request_queue.append((request_func, args, kwargs, future))
        
        # Start processing if not already running
        if not self.processing:
            asyncio.create_task(self.process_queue())
        
        return await future
    
    async def process_queue(self):
        """Process queued requests respecting rate limits"""
        
        self.processing = True
        
        while self.request_queue:
            # Check if we can make a request
            if not self.can_make_request():
                await asyncio.sleep(1)  # Wait and check again
                continue
            
            # Get next request
            request_func, args, kwargs, future = self.request_queue.popleft()
            
            try:
                # Make the request
                result = await request_func(*args, **kwargs)
                future.set_result(result)
                
                # Record request time
                self.request_times.append(time.time())
                
            except Exception as e:
                future.set_exception(e)
        
        self.processing = False
    
    def can_make_request(self):
        """Check if we can make a request within rate limits"""
        
        current_time = time.time()
        
        # Remove old request times outside the window
        while (self.request_times and 
               current_time - self.request_times[0] > self.window):
            self.request_times.popleft()
        
        # Check if we're under the rate limit
        return len(self.request_times) < self.rate_limit
    
    def get_queue_status(self):
        """Get current queue status"""
        
        return {
            "queued_requests": len(self.request_queue),
            "requests_in_window": len(self.request_times),
            "rate_limit": self.rate_limit,
            "window_seconds": self.window,
            "processing": self.processing
        }

Distributed Rate Limiting

Redis-Based Distributed Rate Limiting

import redis
import time
import json

class DistributedRateLimiter:
    def __init__(self, api_token, redis_client, rate_limit=100, window=60):
        self.api_token = api_token
        self.redis_client = redis_client
        self.rate_limit = rate_limit
        self.window = window
        self.key_prefix = f"rate_limit:{api_token}"
    
    def can_make_request(self, identifier="default"):
        """Check if request can be made (sliding window)"""
        
        key = f"{self.key_prefix}:{identifier}"
        current_time = time.time()
        
        # Use Redis pipeline for atomic operations
        pipe = self.redis_client.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, current_time - self.window)
        
        # Count current requests
        pipe.zcard(key)
        
        # Add current request
        pipe.zadd(key, {str(current_time): current_time})
        
        # Set expiry
        pipe.expire(key, self.window)
        
        results = pipe.execute()
        current_count = results[1]
        
        return current_count < self.rate_limit
    
    def record_request(self, identifier="default"):
        """Record a request in the distributed rate limiter"""
        
        key = f"{self.key_prefix}:{identifier}"
        current_time = time.time()
        
        self.redis_client.zadd(key, {str(current_time): current_time})
        self.redis_client.expire(key, self.window)
    
    def get_remaining_requests(self, identifier="default"):
        """Get remaining requests in current window"""
        
        key = f"{self.key_prefix}:{identifier}"
        current_time = time.time()
        
        # Clean old entries and count
        self.redis_client.zremrangebyscore(key, 0, current_time - self.window)
        current_count = self.redis_client.zcard(key)
        
        return max(0, self.rate_limit - current_count)
    
    def get_reset_time(self, identifier="default"):
        """Get time when rate limit resets"""
        
        key = f"{self.key_prefix}:{identifier}"
        
        # Get oldest request in window
        oldest_requests = self.redis_client.zrange(key, 0, 0, withscores=True)
        
        if oldest_requests:
            oldest_time = oldest_requests[0][1]
            return oldest_time + self.window
        
        return time.time()

Rate Limit Monitoring

Real-Time Monitoring

Rate Limit Monitoring Dashboard

class RateLimitMonitor:
    def __init__(self, api_token):
        self.api_token = api_token
        self.metrics = {
            "requests_made": 0,
            "rate_limited_requests": 0,
            "success_rate": 100.0,
            "average_response_time": 0,
            "peak_usage": 0
        }
        self.request_history = []
    
    def record_request(self, success, response_time, rate_limited=False):
        """Record request metrics"""
        
        self.metrics["requests_made"] += 1
        
        if rate_limited:
            self.metrics["rate_limited_requests"] += 1
        
        # Update success rate
        if success:
            successful_requests = (
                self.metrics["requests_made"] - 
                self.metrics["rate_limited_requests"]
            )
            self.metrics["success_rate"] = (
                successful_requests / self.metrics["requests_made"] * 100
            )
        
        # Update average response time
        total_time = (
            self.metrics["average_response_time"] * 
            (self.metrics["requests_made"] - 1) + response_time
        )
        self.metrics["average_response_time"] = (
            total_time / self.metrics["requests_made"]
        )
        
        # Record in history
        self.request_history.append({
            "timestamp": time.time(),
            "success": success,
            "response_time": response_time,
            "rate_limited": rate_limited
        })
        
        # Keep only last 1000 requests
        if len(self.request_history) > 1000:
            self.request_history.pop(0)
    
    def get_usage_patterns(self):
        """Analyze usage patterns"""
        
        if not self.request_history:
            return {}
        
        # Calculate patterns
        current_time = time.time()
        last_hour = [r for r in self.request_history 
                    if current_time - r["timestamp"] < 3600]
        last_minute = [r for r in self.request_history 
                      if current_time - r["timestamp"] < 60]
        
        return {
            "requests_last_hour": len(last_hour),
            "requests_last_minute": len(last_minute),
            "rate_limited_last_hour": sum(1 for r in last_hour if r["rate_limited"]),
            "average_response_time_hour": (
                sum(r["response_time"] for r in last_hour) / len(last_hour)
                if last_hour else 0
            ),
            "peak_minute_usage": max(
                len([r for r in self.request_history 
                    if abs(r["timestamp"] - t) < 60])
                for t in range(int(current_time - 3600), int(current_time), 60)
            ) if len(self.request_history) > 0 else 0
        }
    
    def generate_report(self):
        """Generate rate limit usage report"""
        
        patterns = self.get_usage_patterns()
        
        return {
            "summary": self.metrics,
            "patterns": patterns,
            "recommendations": self.get_recommendations(),
            "report_time": time.time()
        }
    
    def get_recommendations(self):
        """Get optimization recommendations"""
        
        recommendations = []
        
        if self.metrics["rate_limited_requests"] > 0:
            rate_limited_percentage = (
                self.metrics["rate_limited_requests"] / 
                self.metrics["requests_made"] * 100
            )
            
            if rate_limited_percentage > 10:
                recommendations.append(
                    "High rate limiting detected. Consider implementing "
                    "request queuing or upgrading your plan."
                )
            elif rate_limited_percentage > 5:
                recommendations.append(
                    "Moderate rate limiting detected. Consider adding "
                    "exponential backoff to your requests."
                )
        
        if self.metrics["average_response_time"] > 5:
            recommendations.append(
                "High average response time. Consider optimizing your "
                "requests or implementing caching."
            )
        
        patterns = self.get_usage_patterns()
        if patterns.get("peak_minute_usage", 0) > 80:  # 80% of typical limits
            recommendations.append(
                "Peak usage approaching limits. Consider smoothing "
                "request distribution or upgrading plan."
            )
        
        return recommendations

Alerting and Notifications

Rate Limit Alerting System

class RateLimitAlerting:
    def __init__(self, monitor, alert_thresholds=None):
        self.monitor = monitor
        self.alert_thresholds = alert_thresholds or {
            "rate_limited_percentage": 5.0,
            "usage_percentage": 80.0,
            "response_time": 10.0
        }
        self.alert_channels = []
    
    def add_alert_channel(self, channel):
        """Add alert notification channel"""
        self.alert_channels.append(channel)
    
    def check_alerts(self):
        """Check for alert conditions"""
        
        metrics = self.monitor.metrics
        patterns = self.monitor.get_usage_patterns()
        alerts = []
        
        # Check rate limiting percentage
        if metrics["requests_made"] > 0:
            rate_limited_pct = (
                metrics["rate_limited_requests"] / 
                metrics["requests_made"] * 100
            )
            if rate_limited_pct > self.alert_thresholds["rate_limited_percentage"]:
                alerts.append({
                    "type": "rate_limiting",
                    "severity": "warning",
                    "message": f"Rate limiting at {rate_limited_pct:.1f}%",
                    "threshold": self.alert_thresholds["rate_limited_percentage"]
                })
        
        # Check usage percentage
        usage_pct = patterns.get("requests_last_minute", 0)
        if usage_pct > self.alert_thresholds["usage_percentage"]:
            alerts.append({
                "type": "high_usage",
                "severity": "warning",
                "message": f"High usage: {usage_pct} requests/minute",
                "threshold": self.alert_thresholds["usage_percentage"]
            })
        
        # Check response time
        if metrics["average_response_time"] > self.alert_thresholds["response_time"]:
            alerts.append({
                "type": "slow_response",
                "severity": "warning",
                "message": f"Slow responses: {metrics['average_response_time']:.1f}s avg",
                "threshold": self.alert_thresholds["response_time"]
            })
        
        # Send alerts
        for alert in alerts:
            self.send_alert(alert)
        
        return alerts
    
    def send_alert(self, alert):
        """Send alert to configured channels"""
        
        for channel in self.alert_channels:
            try:
                channel.send(alert)
            except Exception as e:
                print(f"Failed to send alert via {channel}: {e}")

# Alert channel implementations
class SlackAlertChannel:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url
    
    def send(self, alert):
        payload = {
            "text": f"⚠️ BroxiAI Rate Limit Alert",
            "attachments": [{
                "color": "warning",
                "fields": [
                    {"title": "Type", "value": alert["type"], "short": True},
                    {"title": "Severity", "value": alert["severity"], "short": True},
                    {"title": "Message", "value": alert["message"], "short": False}
                ]
            }]
        }
        
        requests.post(self.webhook_url, json=payload)

class EmailAlertChannel:
    def __init__(self, smtp_config, recipients):
        self.smtp_config = smtp_config
        self.recipients = recipients
    
    def send(self, alert):
        # Implementation for email alerts
        pass

Optimization Strategies

Request Batching

Batch Request Implementation

class BatchRequestManager:
    def __init__(self, api_client, batch_size=10, flush_interval=5):
        self.api_client = api_client
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.pending_requests = []
        self.last_flush = time.time()
    
    def add_request(self, workflow_id, input_data, callback=None):
        """Add request to batch"""
        
        request = {
            "workflow_id": workflow_id,
            "input": input_data,
            "callback": callback,
            "timestamp": time.time()
        }
        
        self.pending_requests.append(request)
        
        # Check if we should flush
        if (len(self.pending_requests) >= self.batch_size or
            time.time() - self.last_flush > self.flush_interval):
            self.flush_batch()
    
    def flush_batch(self):
        """Flush pending requests as batch"""
        
        if not self.pending_requests:
            return
        
        batch = self.pending_requests.copy()
        self.pending_requests.clear()
        self.last_flush = time.time()
        
        # Group by workflow_id for efficiency
        grouped_requests = {}
        for request in batch:
            workflow_id = request["workflow_id"]
            if workflow_id not in grouped_requests:
                grouped_requests[workflow_id] = []
            grouped_requests[workflow_id].append(request)
        
        # Process each group
        for workflow_id, requests in grouped_requests.items():
            self.process_batch_group(workflow_id, requests)
    
    def process_batch_group(self, workflow_id, requests):
        """Process a group of requests for the same workflow"""
        
        try:
            # Prepare batch payload
            batch_inputs = [req["input"] for req in requests]
            
            # Make batch API call (if supported)
            results = self.api_client.batch_run(workflow_id, batch_inputs)
            
            # Call callbacks with results
            for request, result in zip(requests, results):
                if request["callback"]:
                    request["callback"](result)
                    
        except Exception as e:
            # Handle batch failure - fall back to individual requests
            print(f"Batch processing failed: {e}. Falling back to individual requests.")
            
            for request in requests:
                try:
                    result = self.api_client.run(
                        request["workflow_id"], 
                        request["input"]
                    )
                    if request["callback"]:
                        request["callback"](result)
                except Exception as individual_error:
                    if request["callback"]:
                        request["callback"]({"error": str(individual_error)})

Caching Strategies

Intelligent Response Caching

import hashlib
import json
from datetime import datetime, timedelta

class ResponseCache:
    def __init__(self, ttl_seconds=3600, max_size=1000):
        self.cache = {}
        self.ttl_seconds = ttl_seconds
        self.max_size = max_size
        self.access_times = {}
    
    def get_cache_key(self, workflow_id, input_data, variables=None):
        """Generate cache key for request"""
        
        cache_data = {
            "workflow_id": workflow_id,
            "input": input_data,
            "variables": variables or {}
        }
        
        cache_string = json.dumps(cache_data, sort_keys=True)
        return hashlib.md5(cache_string.encode()).hexdigest()
    
    def get(self, workflow_id, input_data, variables=None):
        """Get cached response if available"""
        
        cache_key = self.get_cache_key(workflow_id, input_data, variables)
        
        if cache_key in self.cache:
            cached_item = self.cache[cache_key]
            
            # Check if cache is still valid
            if datetime.now() < cached_item["expires_at"]:
                # Update access time
                self.access_times[cache_key] = datetime.now()
                return cached_item["response"]
            else:
                # Remove expired cache
                del self.cache[cache_key]
                if cache_key in self.access_times:
                    del self.access_times[cache_key]
        
        return None
    
    def set(self, workflow_id, input_data, response, variables=None):
        """Cache response"""
        
        cache_key = self.get_cache_key(workflow_id, input_data, variables)
        
        # Check cache size and evict if necessary
        if len(self.cache) >= self.max_size:
            self.evict_lru()
        
        self.cache[cache_key] = {
            "response": response,
            "created_at": datetime.now(),
            "expires_at": datetime.now() + timedelta(seconds=self.ttl_seconds)
        }
        self.access_times[cache_key] = datetime.now()
    
    def evict_lru(self):
        """Evict least recently used cache entry"""
        
        if not self.access_times:
            return
        
        # Find least recently used key
        lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
        
        # Remove from cache
        if lru_key in self.cache:
            del self.cache[lru_key]
        del self.access_times[lru_key]
    
    def clear_expired(self):
        """Clear all expired cache entries"""
        
        current_time = datetime.now()
        expired_keys = []
        
        for key, cached_item in self.cache.items():
            if current_time >= cached_item["expires_at"]:
                expired_keys.append(key)
        
        for key in expired_keys:
            del self.cache[key]
            if key in self.access_times:
                del self.access_times[key]
    
    def get_stats(self):
        """Get cache statistics"""
        
        total_entries = len(self.cache)
        expired_entries = sum(
            1 for item in self.cache.values() 
            if datetime.now() >= item["expires_at"]
        )
        
        return {
            "total_entries": total_entries,
            "valid_entries": total_entries - expired_entries,
            "expired_entries": expired_entries,
            "max_size": self.max_size,
            "utilization": total_entries / self.max_size * 100
        }

Best Practices

Rate Limit Best Practices

Application Design

  • Implement exponential backoff for retries

  • Use request queuing for high-volume applications

  • Cache responses when appropriate

  • Batch requests when possible

  • Monitor rate limit usage continuously

Error Handling

  • Always check rate limit headers

  • Implement graceful degradation

  • Provide user feedback for delays

  • Log rate limiting events for analysis

  • Have fallback mechanisms ready

Performance Optimization

  • Optimize request frequency

  • Use efficient data structures

  • Implement connection pooling

  • Consider async/parallel processing

  • Regular performance monitoring

Monitoring and Alerting

Key Metrics to Track

  • Requests per minute/hour/day

  • Rate limit hit percentage

  • Average response times

  • Success/failure rates

  • Queue depths and wait times

Alert Thresholds

  • Rate limiting > 5% of requests

  • Usage > 80% of limits

  • Response time > 10 seconds

  • Queue depth > 100 requests

  • Success rate < 95%

Troubleshooting

Common Rate Limiting Issues

Sudden Rate Limit Hits

def diagnose_rate_limiting():
    """Diagnose rate limiting issues"""
    
    issues = []
    
    # Check request pattern
    if high_burst_detected():
        issues.append("High burst traffic detected - implement request smoothing")
    
    # Check concurrent requests
    if high_concurrency_detected():
        issues.append("Too many concurrent requests - implement connection pooling")
    
    # Check caching
    if low_cache_hit_rate():
        issues.append("Low cache hit rate - review caching strategy")
    
    # Check batching
    if individual_requests_high():
        issues.append("Many individual requests - consider batch processing")
    
    return issues

Performance Issues

  • Monitor request queuing delays

  • Check for memory leaks in rate limiters

  • Verify efficient data structures

  • Analyze request distribution patterns

Next Steps

After implementing rate limiting:

  1. Monitor Usage: Track rate limit metrics continuously

  2. Optimize Patterns: Adjust request patterns based on data

  3. Scale Planning: Plan for growth and usage increases

  4. Team Training: Educate team on rate limiting best practices

  5. Regular Review: Periodically review and optimize strategies


Last updated