Authentication

Complete guide to BroxiAI API authentication methods and security

Learn how to securely authenticate with the BroxiAI API using various authentication methods and implement proper security practices.

Authentication Overview

Supported Authentication Methods

API Token Authentication (Recommended)

  • Bearer token authentication

  • Long-lived tokens for server-to-server

  • Fine-grained permissions

  • Easy token rotation

OAuth 2.0 (Enterprise)

  • Standard OAuth 2.0 flows

  • User delegation

  • Scope-based permissions

  • Integration with identity providers

JWT Tokens (Advanced)

  • Short-lived tokens

  • Stateless authentication

  • Custom claims support

  • High-security applications

API Token Authentication

Getting Your API Token

Step 1: Generate Token

  1. Log into your BroxiAI dashboard

  2. Navigate to Settings → API Keys

  3. Click "Generate New Token"

  4. Set token name and permissions

  5. Copy and securely store the token

Token Properties

{
  "token_id": "tk_abc123...",
  "name": "Production API Token",
  "permissions": ["workflows:read", "workflows:execute"],
  "created_at": "2024-01-15T10:30:00Z",
  "expires_at": "2025-01-15T10:30:00Z",
  "last_used": "2024-01-20T14:22:00Z"
}

Using API Tokens

Basic Authentication

curl -X GET "https://api.broxi.ai/v1/flows" \
  -H "Authorization: Bearer YOUR_API_TOKEN" \
  -H "Content-Type: application/json"

Python Example

import requests

class BroxiClient:
    def __init__(self, api_token):
        self.api_token = api_token
        self.base_url = "https://api.broxi.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json",
            "User-Agent": "BroxiAI-Python-Client/1.0"
        })
    
    def get_workflows(self):
        response = self.session.get(f"{self.base_url}/flows")
        response.raise_for_status()
        return response.json()
    
    def execute_workflow(self, workflow_id, input_data):
        payload = {"input": input_data}
        response = self.session.post(
            f"{self.base_url}/flows/{workflow_id}/run",
            json=payload
        )
        response.raise_for_status()
        return response.json()

# Usage
client = BroxiClient("your_api_token")
result = client.execute_workflow("flow_123", "Hello, world!")

JavaScript Example

class BroxiAPI {
    constructor(apiToken) {
        this.apiToken = apiToken;
        this.baseURL = 'https://api.broxi.ai/v1';
    }
    
    async makeRequest(endpoint, options = {}) {
        const url = `${this.baseURL}${endpoint}`;
        const headers = {
            'Authorization': `Bearer ${this.apiToken}`,
            'Content-Type': 'application/json',
            ...options.headers
        };
        
        const response = await fetch(url, {
            ...options,
            headers
        });
        
        if (!response.ok) {
            throw new Error(`API request failed: ${response.statusText}`);
        }
        
        return response.json();
    }
    
    async runWorkflow(workflowId, input) {
        return this.makeRequest(`/flows/${workflowId}/run`, {
            method: 'POST',
            body: JSON.stringify({ input })
        });
    }
}

// Usage
const client = new BroxiAPI('your_api_token');
client.runWorkflow('flow_123', 'Hello, world!')
    .then(result => console.log(result))
    .catch(error => console.error(error));

Token Management

Token Permissions

Permission Scopes:
  workflows:read: "View workflow configurations"
  workflows:execute: "Execute workflows"
  workflows:write: "Create and modify workflows"
  workflows:delete: "Delete workflows"
  
  analytics:read: "Access usage analytics"
  team:read: "View team information"
  team:write: "Manage team members"
  
  admin:full: "Full administrative access"

Token Security Best Practices

import os
from cryptography.fernet import Fernet

class SecureTokenManager:
    def __init__(self):
        self.encryption_key = os.getenv('ENCRYPTION_KEY')
        self.cipher = Fernet(self.encryption_key)
    
    def store_token(self, token):
        """Encrypt and store API token"""
        encrypted_token = self.cipher.encrypt(token.encode())
        # Store encrypted_token in secure storage
        return encrypted_token
    
    def retrieve_token(self):
        """Retrieve and decrypt API token"""
        encrypted_token = self.get_from_secure_storage()
        token = self.cipher.decrypt(encrypted_token).decode()
        return token
    
    def rotate_token(self, old_token, new_token):
        """Safely rotate API tokens"""
        # Test new token
        if self.test_token_validity(new_token):
            self.store_token(new_token)
            # Revoke old token after grace period
            self.schedule_token_revocation(old_token, delay=3600)
            return True
        return False

OAuth 2.0 Authentication

OAuth 2.0 Flow

Authorization Code Flow

sequenceDiagram
    participant User
    participant App
    participant BroxiAI
    participant API
    
    User->>App: Initiate OAuth
    App->>BroxiAI: Redirect to authorization
    User->>BroxiAI: Login and authorize
    BroxiAI->>App: Authorization code
    App->>BroxiAI: Exchange code for token
    BroxiAI->>App: Access token + refresh token
    App->>API: API request with token
    API->>App: API response

OAuth Configuration

Application Registration

{
  "client_id": "broxi_app_12345",
  "client_secret": "secret_abcdef...",
  "redirect_uris": [
    "https://yourapp.com/auth/callback",
    "https://localhost:3000/auth/callback"
  ],
  "scopes": [
    "workflows:read",
    "workflows:execute",
    "user:profile"
  ],
  "grant_types": [
    "authorization_code",
    "refresh_token"
  ]
}

OAuth Implementation Example

from authlib.integrations.requests_client import OAuth2Session

class BroxiOAuthClient:
    def __init__(self, client_id, client_secret, redirect_uri):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.authorization_endpoint = "https://api.broxi.ai/oauth/authorize"
        self.token_endpoint = "https://api.broxi.ai/oauth/token"
        
    def get_authorization_url(self, scopes=None):
        """Generate authorization URL"""
        session = OAuth2Session(
            self.client_id,
            redirect_uri=self.redirect_uri,
            scope=scopes
        )
        authorization_url, state = session.authorization_url(
            self.authorization_endpoint
        )
        return authorization_url, state
    
    def exchange_code_for_token(self, authorization_code):
        """Exchange authorization code for access token"""
        session = OAuth2Session(self.client_id)
        token = session.fetch_token(
            self.token_endpoint,
            authorization_response=authorization_code,
            client_secret=self.client_secret
        )
        return token
    
    def refresh_access_token(self, refresh_token):
        """Refresh expired access token"""
        session = OAuth2Session(self.client_id)
        token = session.refresh_token(
            self.token_endpoint,
            refresh_token=refresh_token,
            client_secret=self.client_secret
        )
        return token

OAuth Token Management

Token Response Format

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "Bearer",
  "expires_in": 3600,
  "scope": "workflows:read workflows:execute user:profile",
  "issued_at": 1642694400
}

Automatic Token Refresh

import time
import threading

class TokenManager:
    def __init__(self, oauth_client):
        self.oauth_client = oauth_client
        self.current_token = None
        self.refresh_thread = None
        self.lock = threading.Lock()
    
    def set_token(self, token):
        """Set current token and schedule refresh"""
        with self.lock:
            self.current_token = token
            self.schedule_refresh()
    
    def get_valid_token(self):
        """Get current valid token, refresh if needed"""
        with self.lock:
            if self.is_token_expired():
                self.refresh_token()
            return self.current_token['access_token']
    
    def is_token_expired(self):
        """Check if current token is expired"""
        if not self.current_token:
            return True
        
        expires_at = (
            self.current_token['issued_at'] + 
            self.current_token['expires_in']
        )
        return time.time() >= expires_at - 300  # 5min buffer
    
    def refresh_token(self):
        """Refresh the current token"""
        if self.current_token and 'refresh_token' in self.current_token:
            new_token = self.oauth_client.refresh_access_token(
                self.current_token['refresh_token']
            )
            self.current_token = new_token
            self.schedule_refresh()

JWT Token Authentication

JWT Implementation

JWT Token Structure

{
  "header": {
    "alg": "HS256",
    "typ": "JWT"
  },
  "payload": {
    "sub": "user_123",
    "iss": "broxi.ai",
    "aud": "api.broxi.ai",
    "exp": 1642694400,
    "iat": 1642690800,
    "scopes": ["workflows:execute"],
    "team_id": "team_456"
  },
  "signature": "signature_hash"
}

JWT Generation Example

import jwt
import time
from datetime import datetime, timedelta

class JWTManager:
    def __init__(self, secret_key, algorithm='HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm
    
    def generate_token(self, user_id, scopes, expires_in=3600):
        """Generate JWT token"""
        now = datetime.utcnow()
        payload = {
            'sub': user_id,
            'iss': 'your-app',
            'aud': 'api.broxi.ai',
            'exp': now + timedelta(seconds=expires_in),
            'iat': now,
            'scopes': scopes
        }
        
        token = jwt.encode(
            payload,
            self.secret_key,
            algorithm=self.algorithm
        )
        return token
    
    def verify_token(self, token):
        """Verify and decode JWT token"""
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm],
                audience='api.broxi.ai'
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise Exception("Token has expired")
        except jwt.InvalidTokenError:
            raise Exception("Invalid token")

Error Handling

Authentication Errors

Common Error Responses

{
  "error": {
    "code": "UNAUTHORIZED",
    "message": "Invalid or expired API token",
    "details": {
      "reason": "token_expired",
      "expires_at": "2024-01-15T10:30:00Z"
    }
  },
  "request_id": "req_abc123"
}

Error Handling Implementation

class AuthenticationError(Exception):
    def __init__(self, message, error_code=None, details=None):
        self.message = message
        self.error_code = error_code
        self.details = details
        super().__init__(self.message)

class BroxiAPIClient:
    def __init__(self, token_manager):
        self.token_manager = token_manager
    
    def make_authenticated_request(self, method, endpoint, **kwargs):
        """Make authenticated API request with error handling"""
        max_retries = 3
        
        for attempt in range(max_retries):
            try:
                token = self.token_manager.get_valid_token()
                headers = kwargs.get('headers', {})
                headers['Authorization'] = f'Bearer {token}'
                kwargs['headers'] = headers
                
                response = requests.request(method, endpoint, **kwargs)
                
                if response.status_code == 401:
                    # Token expired or invalid
                    if attempt < max_retries - 1:
                        self.token_manager.refresh_token()
                        continue
                    else:
                        raise AuthenticationError(
                            "Authentication failed after retries",
                            "AUTH_FAILED"
                        )
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 401:
                    error_data = e.response.json()
                    raise AuthenticationError(
                        error_data['error']['message'],
                        error_data['error']['code'],
                        error_data['error'].get('details')
                    )
                raise

Security Best Practices

Token Security

Secure Token Storage

import keyring
from cryptography.fernet import Fernet

class SecureTokenStorage:
    def __init__(self, service_name="broxi-api"):
        self.service_name = service_name
        
    def store_token(self, username, token):
        """Store token in system keychain"""
        keyring.set_password(self.service_name, username, token)
    
    def retrieve_token(self, username):
        """Retrieve token from system keychain"""
        return keyring.get_password(self.service_name, username)
    
    def delete_token(self, username):
        """Delete token from system keychain"""
        keyring.delete_password(self.service_name, username)

# Environment-based token management
class EnvironmentTokenManager:
    def __init__(self):
        self.token = os.getenv('BROXI_API_TOKEN')
        if not self.token:
            raise ValueError("BROXI_API_TOKEN environment variable not set")
    
    def get_token(self):
        return self.token

Token Validation

def validate_token_format(token):
    """Validate API token format"""
    if not token:
        raise ValueError("Token cannot be empty")
    
    if not token.startswith('sk-'):
        raise ValueError("Invalid token format")
    
    if len(token) < 32:
        raise ValueError("Token too short")
    
    return True

def test_token_validity(token):
    """Test token validity with API"""
    try:
        response = requests.get(
            "https://api.broxi.ai/v1/user",
            headers={"Authorization": f"Bearer {token}"},
            timeout=10
        )
        return response.status_code == 200
    except requests.RequestException:
        return False

Network Security

Request Security

import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class SecureAPIClient:
    def __init__(self, api_token):
        self.api_token = api_token
        self.session = self.create_secure_session()
    
    def create_secure_session(self):
        """Create secure requests session"""
        session = requests.Session()
        
        # Configure SSL/TLS
        session.verify = True  # Verify SSL certificates
        
        # Configure retries
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        
        # Set secure headers
        session.headers.update({
            'Authorization': f'Bearer {self.api_token}',
            'Content-Type': 'application/json',
            'User-Agent': 'SecureBroxiClient/1.0',
            'X-Requested-With': 'XMLHttpRequest'
        })
        
        return session

IP Whitelisting

def configure_ip_whitelist(api_client, allowed_ips):
    """Configure IP whitelist for API access"""
    whitelist_config = {
        "ip_whitelist": {
            "enabled": True,
            "allowed_ips": allowed_ips,
            "deny_all_others": True
        }
    }
    
    return api_client.update_security_settings(whitelist_config)

# Example usage
allowed_ips = [
    "192.168.1.0/24",  # Office network
    "10.0.0.0/8",      # VPN network
    "203.0.113.0/24"   # Production servers
]
configure_ip_whitelist(client, allowed_ips)

Rate Limiting

Understanding Rate Limits

Rate Limit Headers

HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1642694400
X-RateLimit-Window: 3600
Retry-After: 60

Rate Limiting Implementation

import time
from collections import deque

class RateLimiter:
    def __init__(self, max_requests, time_window):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = deque()
    
    def can_make_request(self):
        """Check if request can be made within rate limit"""
        now = time.time()
        
        # Remove old requests outside window
        while self.requests and self.requests[0] <= now - self.time_window:
            self.requests.popleft()
        
        return len(self.requests) < self.max_requests
    
    def record_request(self):
        """Record a new request"""
        self.requests.append(time.time())
    
    def wait_time(self):
        """Calculate wait time until next request allowed"""
        if not self.requests:
            return 0
        
        oldest_request = self.requests[0]
        wait_time = oldest_request + self.time_window - time.time()
        return max(0, wait_time)

class RateLimitedClient:
    def __init__(self, api_token, max_requests=100, time_window=3600):
        self.api_token = api_token
        self.rate_limiter = RateLimiter(max_requests, time_window)
    
    def make_request(self, method, url, **kwargs):
        """Make rate-limited API request"""
        if not self.rate_limiter.can_make_request():
            wait_time = self.rate_limiter.wait_time()
            if wait_time > 0:
                time.sleep(wait_time)
        
        self.rate_limiter.record_request()
        
        # Make the actual request
        headers = kwargs.get('headers', {})
        headers['Authorization'] = f'Bearer {self.api_token}'
        kwargs['headers'] = headers
        
        return requests.request(method, url, **kwargs)

Authentication Testing

Unit Tests

Token Authentication Tests

import unittest
from unittest.mock import patch, Mock

class TestBroxiAuthentication(unittest.TestCase):
    def setUp(self):
        self.api_token = "sk-test-token-123"
        self.client = BroxiAPIClient(self.api_token)
    
    @patch('requests.get')
    def test_valid_token_authentication(self, mock_get):
        """Test successful authentication with valid token"""
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {"user": "test_user"}
        mock_get.return_value = mock_response
        
        result = self.client.get_user_info()
        
        self.assertEqual(result['user'], 'test_user')
        mock_get.assert_called_once()
        
        # Verify Authorization header
        call_kwargs = mock_get.call_args[1]
        expected_header = f"Bearer {self.api_token}"
        self.assertEqual(
            call_kwargs['headers']['Authorization'],
            expected_header
        )
    
    @patch('requests.get')
    def test_invalid_token_authentication(self, mock_get):
        """Test authentication failure with invalid token"""
        mock_response = Mock()
        mock_response.status_code = 401
        mock_response.json.return_value = {
            "error": {
                "code": "UNAUTHORIZED",
                "message": "Invalid API token"
            }
        }
        mock_get.return_value = mock_response
        
        with self.assertRaises(AuthenticationError):
            self.client.get_user_info()

Integration Tests

End-to-End Authentication Test

def test_complete_authentication_flow():
    """Test complete authentication flow"""
    # Test token generation
    token_manager = TokenManager()
    token = token_manager.generate_token(
        user_id="test_user",
        scopes=["workflows:read"]
    )
    
    # Test token validation
    assert token_manager.validate_token(token)
    
    # Test API request with token
    client = BroxiAPIClient(token)
    workflows = client.get_workflows()
    
    assert isinstance(workflows, list)
    assert len(workflows) >= 0

def test_token_refresh_flow():
    """Test OAuth token refresh flow"""
    oauth_client = BroxiOAuthClient(
        client_id="test_client",
        client_secret="test_secret",
        redirect_uri="http://localhost:8000/callback"
    )
    
    # Simulate expired token
    expired_token = {
        "access_token": "expired_token",
        "refresh_token": "valid_refresh_token",
        "expires_in": -1  # Already expired
    }
    
    # Test token refresh
    new_token = oauth_client.refresh_access_token(
        expired_token['refresh_token']
    )
    
    assert 'access_token' in new_token
    assert 'refresh_token' in new_token
    assert new_token['expires_in'] > 0

Troubleshooting Authentication

Common Issues

Token Not Working

def diagnose_token_issues(token):
    """Diagnose common token issues"""
    issues = []
    
    # Check token format
    if not token.startswith('sk-'):
        issues.append("Invalid token format - should start with 'sk-'")
    
    # Check token length
    if len(token) < 32:
        issues.append("Token appears to be truncated")
    
    # Test token with API
    if not test_token_validity(token):
        issues.append("Token rejected by API - may be expired or revoked")
    
    # Check permissions
    try:
        response = requests.get(
            "https://api.broxi.ai/v1/user",
            headers={"Authorization": f"Bearer {token}"}
        )
        if response.status_code == 403:
            issues.append("Token has insufficient permissions")
    except:
        issues.append("Network connectivity issues")
    
    return issues

OAuth Troubleshooting

def troubleshoot_oauth(client_id, client_secret, redirect_uri):
    """Troubleshoot OAuth configuration"""
    problems = []
    
    # Check client credentials
    try:
        # Attempt to get authorization URL
        oauth_client = BroxiOAuthClient(
            client_id, client_secret, redirect_uri
        )
        auth_url, state = oauth_client.get_authorization_url()
        
        if not auth_url:
            problems.append("Failed to generate authorization URL")
            
    except Exception as e:
        problems.append(f"OAuth configuration error: {str(e)}")
    
    # Validate redirect URI
    if not redirect_uri.startswith(('http://', 'https://')):
        problems.append("Invalid redirect URI format")
    
    return problems

Next Steps

After implementing authentication:

  1. Test Thoroughly: Validate all authentication flows

  2. Monitor Usage: Track authentication metrics

  3. Security Review: Regular security audits

  4. Documentation: Keep authentication docs updated

  5. Team Training: Ensure team understands security practices


Last updated