Authentication
Complete guide to BroxiAI API authentication methods and security
Learn how to securely authenticate with the BroxiAI API using various authentication methods and implement proper security practices.
Authentication Overview
Supported Authentication Methods
API Token Authentication (Recommended)
Bearer token authentication
Long-lived tokens for server-to-server
Fine-grained permissions
Easy token rotation
OAuth 2.0 (Enterprise)
Standard OAuth 2.0 flows
User delegation
Scope-based permissions
Integration with identity providers
JWT Tokens (Advanced)
Short-lived tokens
Stateless authentication
Custom claims support
High-security applications
API Token Authentication
Getting Your API Token
Step 1: Generate Token
Log into your BroxiAI dashboard
Navigate to Settings → API Keys
Click "Generate New Token"
Set token name and permissions
Copy and securely store the token
Token Properties
{
"token_id": "tk_abc123...",
"name": "Production API Token",
"permissions": ["workflows:read", "workflows:execute"],
"created_at": "2024-01-15T10:30:00Z",
"expires_at": "2025-01-15T10:30:00Z",
"last_used": "2024-01-20T14:22:00Z"
}Using API Tokens
Basic Authentication
curl -X GET "https://api.broxi.ai/v1/flows" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json"Python Example
import requests
class BroxiClient:
def __init__(self, api_token):
self.api_token = api_token
self.base_url = "https://api.broxi.ai/v1"
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
"User-Agent": "BroxiAI-Python-Client/1.0"
})
def get_workflows(self):
response = self.session.get(f"{self.base_url}/flows")
response.raise_for_status()
return response.json()
def execute_workflow(self, workflow_id, input_data):
payload = {"input": input_data}
response = self.session.post(
f"{self.base_url}/flows/{workflow_id}/run",
json=payload
)
response.raise_for_status()
return response.json()
# Usage
client = BroxiClient("your_api_token")
result = client.execute_workflow("flow_123", "Hello, world!")JavaScript Example
class BroxiAPI {
constructor(apiToken) {
this.apiToken = apiToken;
this.baseURL = 'https://api.broxi.ai/v1';
}
async makeRequest(endpoint, options = {}) {
const url = `${this.baseURL}${endpoint}`;
const headers = {
'Authorization': `Bearer ${this.apiToken}`,
'Content-Type': 'application/json',
...options.headers
};
const response = await fetch(url, {
...options,
headers
});
if (!response.ok) {
throw new Error(`API request failed: ${response.statusText}`);
}
return response.json();
}
async runWorkflow(workflowId, input) {
return this.makeRequest(`/flows/${workflowId}/run`, {
method: 'POST',
body: JSON.stringify({ input })
});
}
}
// Usage
const client = new BroxiAPI('your_api_token');
client.runWorkflow('flow_123', 'Hello, world!')
.then(result => console.log(result))
.catch(error => console.error(error));Token Management
Token Permissions
Permission Scopes:
workflows:read: "View workflow configurations"
workflows:execute: "Execute workflows"
workflows:write: "Create and modify workflows"
workflows:delete: "Delete workflows"
analytics:read: "Access usage analytics"
team:read: "View team information"
team:write: "Manage team members"
admin:full: "Full administrative access"Token Security Best Practices
import os
from cryptography.fernet import Fernet
class SecureTokenManager:
def __init__(self):
self.encryption_key = os.getenv('ENCRYPTION_KEY')
self.cipher = Fernet(self.encryption_key)
def store_token(self, token):
"""Encrypt and store API token"""
encrypted_token = self.cipher.encrypt(token.encode())
# Store encrypted_token in secure storage
return encrypted_token
def retrieve_token(self):
"""Retrieve and decrypt API token"""
encrypted_token = self.get_from_secure_storage()
token = self.cipher.decrypt(encrypted_token).decode()
return token
def rotate_token(self, old_token, new_token):
"""Safely rotate API tokens"""
# Test new token
if self.test_token_validity(new_token):
self.store_token(new_token)
# Revoke old token after grace period
self.schedule_token_revocation(old_token, delay=3600)
return True
return FalseOAuth 2.0 Authentication
OAuth 2.0 Flow
Authorization Code Flow
sequenceDiagram
participant User
participant App
participant BroxiAI
participant API
User->>App: Initiate OAuth
App->>BroxiAI: Redirect to authorization
User->>BroxiAI: Login and authorize
BroxiAI->>App: Authorization code
App->>BroxiAI: Exchange code for token
BroxiAI->>App: Access token + refresh token
App->>API: API request with token
API->>App: API responseOAuth Configuration
Application Registration
{
"client_id": "broxi_app_12345",
"client_secret": "secret_abcdef...",
"redirect_uris": [
"https://yourapp.com/auth/callback",
"https://localhost:3000/auth/callback"
],
"scopes": [
"workflows:read",
"workflows:execute",
"user:profile"
],
"grant_types": [
"authorization_code",
"refresh_token"
]
}OAuth Implementation Example
from authlib.integrations.requests_client import OAuth2Session
class BroxiOAuthClient:
def __init__(self, client_id, client_secret, redirect_uri):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.authorization_endpoint = "https://api.broxi.ai/oauth/authorize"
self.token_endpoint = "https://api.broxi.ai/oauth/token"
def get_authorization_url(self, scopes=None):
"""Generate authorization URL"""
session = OAuth2Session(
self.client_id,
redirect_uri=self.redirect_uri,
scope=scopes
)
authorization_url, state = session.authorization_url(
self.authorization_endpoint
)
return authorization_url, state
def exchange_code_for_token(self, authorization_code):
"""Exchange authorization code for access token"""
session = OAuth2Session(self.client_id)
token = session.fetch_token(
self.token_endpoint,
authorization_response=authorization_code,
client_secret=self.client_secret
)
return token
def refresh_access_token(self, refresh_token):
"""Refresh expired access token"""
session = OAuth2Session(self.client_id)
token = session.refresh_token(
self.token_endpoint,
refresh_token=refresh_token,
client_secret=self.client_secret
)
return tokenOAuth Token Management
Token Response Format
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "workflows:read workflows:execute user:profile",
"issued_at": 1642694400
}Automatic Token Refresh
import time
import threading
class TokenManager:
def __init__(self, oauth_client):
self.oauth_client = oauth_client
self.current_token = None
self.refresh_thread = None
self.lock = threading.Lock()
def set_token(self, token):
"""Set current token and schedule refresh"""
with self.lock:
self.current_token = token
self.schedule_refresh()
def get_valid_token(self):
"""Get current valid token, refresh if needed"""
with self.lock:
if self.is_token_expired():
self.refresh_token()
return self.current_token['access_token']
def is_token_expired(self):
"""Check if current token is expired"""
if not self.current_token:
return True
expires_at = (
self.current_token['issued_at'] +
self.current_token['expires_in']
)
return time.time() >= expires_at - 300 # 5min buffer
def refresh_token(self):
"""Refresh the current token"""
if self.current_token and 'refresh_token' in self.current_token:
new_token = self.oauth_client.refresh_access_token(
self.current_token['refresh_token']
)
self.current_token = new_token
self.schedule_refresh()JWT Token Authentication
JWT Implementation
JWT Token Structure
{
"header": {
"alg": "HS256",
"typ": "JWT"
},
"payload": {
"sub": "user_123",
"iss": "broxi.ai",
"aud": "api.broxi.ai",
"exp": 1642694400,
"iat": 1642690800,
"scopes": ["workflows:execute"],
"team_id": "team_456"
},
"signature": "signature_hash"
}JWT Generation Example
import jwt
import time
from datetime import datetime, timedelta
class JWTManager:
def __init__(self, secret_key, algorithm='HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def generate_token(self, user_id, scopes, expires_in=3600):
"""Generate JWT token"""
now = datetime.utcnow()
payload = {
'sub': user_id,
'iss': 'your-app',
'aud': 'api.broxi.ai',
'exp': now + timedelta(seconds=expires_in),
'iat': now,
'scopes': scopes
}
token = jwt.encode(
payload,
self.secret_key,
algorithm=self.algorithm
)
return token
def verify_token(self, token):
"""Verify and decode JWT token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
audience='api.broxi.ai'
)
return payload
except jwt.ExpiredSignatureError:
raise Exception("Token has expired")
except jwt.InvalidTokenError:
raise Exception("Invalid token")Error Handling
Authentication Errors
Common Error Responses
{
"error": {
"code": "UNAUTHORIZED",
"message": "Invalid or expired API token",
"details": {
"reason": "token_expired",
"expires_at": "2024-01-15T10:30:00Z"
}
},
"request_id": "req_abc123"
}Error Handling Implementation
class AuthenticationError(Exception):
def __init__(self, message, error_code=None, details=None):
self.message = message
self.error_code = error_code
self.details = details
super().__init__(self.message)
class BroxiAPIClient:
def __init__(self, token_manager):
self.token_manager = token_manager
def make_authenticated_request(self, method, endpoint, **kwargs):
"""Make authenticated API request with error handling"""
max_retries = 3
for attempt in range(max_retries):
try:
token = self.token_manager.get_valid_token()
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {token}'
kwargs['headers'] = headers
response = requests.request(method, endpoint, **kwargs)
if response.status_code == 401:
# Token expired or invalid
if attempt < max_retries - 1:
self.token_manager.refresh_token()
continue
else:
raise AuthenticationError(
"Authentication failed after retries",
"AUTH_FAILED"
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
error_data = e.response.json()
raise AuthenticationError(
error_data['error']['message'],
error_data['error']['code'],
error_data['error'].get('details')
)
raiseSecurity Best Practices
Token Security
Secure Token Storage
import keyring
from cryptography.fernet import Fernet
class SecureTokenStorage:
def __init__(self, service_name="broxi-api"):
self.service_name = service_name
def store_token(self, username, token):
"""Store token in system keychain"""
keyring.set_password(self.service_name, username, token)
def retrieve_token(self, username):
"""Retrieve token from system keychain"""
return keyring.get_password(self.service_name, username)
def delete_token(self, username):
"""Delete token from system keychain"""
keyring.delete_password(self.service_name, username)
# Environment-based token management
class EnvironmentTokenManager:
def __init__(self):
self.token = os.getenv('BROXI_API_TOKEN')
if not self.token:
raise ValueError("BROXI_API_TOKEN environment variable not set")
def get_token(self):
return self.tokenToken Validation
def validate_token_format(token):
"""Validate API token format"""
if not token:
raise ValueError("Token cannot be empty")
if not token.startswith('sk-'):
raise ValueError("Invalid token format")
if len(token) < 32:
raise ValueError("Token too short")
return True
def test_token_validity(token):
"""Test token validity with API"""
try:
response = requests.get(
"https://api.broxi.ai/v1/user",
headers={"Authorization": f"Bearer {token}"},
timeout=10
)
return response.status_code == 200
except requests.RequestException:
return FalseNetwork Security
Request Security
import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SecureAPIClient:
def __init__(self, api_token):
self.api_token = api_token
self.session = self.create_secure_session()
def create_secure_session(self):
"""Create secure requests session"""
session = requests.Session()
# Configure SSL/TLS
session.verify = True # Verify SSL certificates
# Configure retries
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
# Set secure headers
session.headers.update({
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
'User-Agent': 'SecureBroxiClient/1.0',
'X-Requested-With': 'XMLHttpRequest'
})
return sessionIP Whitelisting
def configure_ip_whitelist(api_client, allowed_ips):
"""Configure IP whitelist for API access"""
whitelist_config = {
"ip_whitelist": {
"enabled": True,
"allowed_ips": allowed_ips,
"deny_all_others": True
}
}
return api_client.update_security_settings(whitelist_config)
# Example usage
allowed_ips = [
"192.168.1.0/24", # Office network
"10.0.0.0/8", # VPN network
"203.0.113.0/24" # Production servers
]
configure_ip_whitelist(client, allowed_ips)Rate Limiting
Understanding Rate Limits
Rate Limit Headers
HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1642694400
X-RateLimit-Window: 3600
Retry-After: 60Rate Limiting Implementation
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
def can_make_request(self):
"""Check if request can be made within rate limit"""
now = time.time()
# Remove old requests outside window
while self.requests and self.requests[0] <= now - self.time_window:
self.requests.popleft()
return len(self.requests) < self.max_requests
def record_request(self):
"""Record a new request"""
self.requests.append(time.time())
def wait_time(self):
"""Calculate wait time until next request allowed"""
if not self.requests:
return 0
oldest_request = self.requests[0]
wait_time = oldest_request + self.time_window - time.time()
return max(0, wait_time)
class RateLimitedClient:
def __init__(self, api_token, max_requests=100, time_window=3600):
self.api_token = api_token
self.rate_limiter = RateLimiter(max_requests, time_window)
def make_request(self, method, url, **kwargs):
"""Make rate-limited API request"""
if not self.rate_limiter.can_make_request():
wait_time = self.rate_limiter.wait_time()
if wait_time > 0:
time.sleep(wait_time)
self.rate_limiter.record_request()
# Make the actual request
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {self.api_token}'
kwargs['headers'] = headers
return requests.request(method, url, **kwargs)Authentication Testing
Unit Tests
Token Authentication Tests
import unittest
from unittest.mock import patch, Mock
class TestBroxiAuthentication(unittest.TestCase):
def setUp(self):
self.api_token = "sk-test-token-123"
self.client = BroxiAPIClient(self.api_token)
@patch('requests.get')
def test_valid_token_authentication(self, mock_get):
"""Test successful authentication with valid token"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"user": "test_user"}
mock_get.return_value = mock_response
result = self.client.get_user_info()
self.assertEqual(result['user'], 'test_user')
mock_get.assert_called_once()
# Verify Authorization header
call_kwargs = mock_get.call_args[1]
expected_header = f"Bearer {self.api_token}"
self.assertEqual(
call_kwargs['headers']['Authorization'],
expected_header
)
@patch('requests.get')
def test_invalid_token_authentication(self, mock_get):
"""Test authentication failure with invalid token"""
mock_response = Mock()
mock_response.status_code = 401
mock_response.json.return_value = {
"error": {
"code": "UNAUTHORIZED",
"message": "Invalid API token"
}
}
mock_get.return_value = mock_response
with self.assertRaises(AuthenticationError):
self.client.get_user_info()Integration Tests
End-to-End Authentication Test
def test_complete_authentication_flow():
"""Test complete authentication flow"""
# Test token generation
token_manager = TokenManager()
token = token_manager.generate_token(
user_id="test_user",
scopes=["workflows:read"]
)
# Test token validation
assert token_manager.validate_token(token)
# Test API request with token
client = BroxiAPIClient(token)
workflows = client.get_workflows()
assert isinstance(workflows, list)
assert len(workflows) >= 0
def test_token_refresh_flow():
"""Test OAuth token refresh flow"""
oauth_client = BroxiOAuthClient(
client_id="test_client",
client_secret="test_secret",
redirect_uri="http://localhost:8000/callback"
)
# Simulate expired token
expired_token = {
"access_token": "expired_token",
"refresh_token": "valid_refresh_token",
"expires_in": -1 # Already expired
}
# Test token refresh
new_token = oauth_client.refresh_access_token(
expired_token['refresh_token']
)
assert 'access_token' in new_token
assert 'refresh_token' in new_token
assert new_token['expires_in'] > 0Troubleshooting Authentication
Common Issues
Token Not Working
def diagnose_token_issues(token):
"""Diagnose common token issues"""
issues = []
# Check token format
if not token.startswith('sk-'):
issues.append("Invalid token format - should start with 'sk-'")
# Check token length
if len(token) < 32:
issues.append("Token appears to be truncated")
# Test token with API
if not test_token_validity(token):
issues.append("Token rejected by API - may be expired or revoked")
# Check permissions
try:
response = requests.get(
"https://api.broxi.ai/v1/user",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 403:
issues.append("Token has insufficient permissions")
except:
issues.append("Network connectivity issues")
return issuesOAuth Troubleshooting
def troubleshoot_oauth(client_id, client_secret, redirect_uri):
"""Troubleshoot OAuth configuration"""
problems = []
# Check client credentials
try:
# Attempt to get authorization URL
oauth_client = BroxiOAuthClient(
client_id, client_secret, redirect_uri
)
auth_url, state = oauth_client.get_authorization_url()
if not auth_url:
problems.append("Failed to generate authorization URL")
except Exception as e:
problems.append(f"OAuth configuration error: {str(e)}")
# Validate redirect URI
if not redirect_uri.startswith(('http://', 'https://')):
problems.append("Invalid redirect URI format")
return problemsNext Steps
After implementing authentication:
Test Thoroughly: Validate all authentication flows
Monitor Usage: Track authentication metrics
Security Review: Regular security audits
Documentation: Keep authentication docs updated
Team Training: Ensure team understands security practices
Related Guides
API Overview: General API usage
Webhooks: Webhook authentication
Security: Overall security practices
Never expose API tokens in client-side code, logs, or version control. Always use secure storage and transmission methods for authentication credentials.
Proper authentication is the foundation of API security. Choose the method that best fits your use case and always follow security best practices.
Last updated