Webhooks

Set up webhooks to receive real-time notifications from BroxiAI workflows

Learn how to configure and use webhooks to receive real-time notifications when events occur in your BroxiAI workflows.

Webhook Overview

What Are Webhooks?

Webhooks are HTTP callbacks that BroxiAI sends to your application when specific events occur. Instead of polling the API for changes, webhooks push notifications to your server in real-time.

Benefits of Webhooks

  • Real-time event notifications

  • Reduced API polling overhead

  • Event-driven architecture

  • Automatic retry mechanisms

  • Scalable notification system

Supported Events

Workflow Events

Workflow Events:
  workflow.started: "Workflow execution began"
  workflow.completed: "Workflow execution finished successfully"
  workflow.failed: "Workflow execution failed"
  workflow.timeout: "Workflow execution timed out"
  workflow.cancelled: "Workflow execution was cancelled"

Component Events:
  component.started: "Component execution began"
  component.completed: "Component execution finished"
  component.failed: "Component execution failed"
  component.timeout: "Component execution timed out"

User Events:
  user.message.received: "New user message received"
  user.session.started: "New user session created"
  user.session.ended: "User session ended"

System Events:
  api.quota.warning: "API quota threshold reached"
  api.quota.exceeded: "API quota exceeded"
  system.maintenance: "System maintenance notification"

Setting Up Webhooks

Webhook Configuration

Create Webhook Endpoint

  1. Navigate to Settings → Webhooks in your BroxiAI dashboard

  2. Click "Add New Webhook"

  3. Configure webhook settings

  4. Test and activate the webhook

Webhook Configuration Options

{
  "webhook_config": {
    "url": "https://your-app.com/webhooks/broxi",
    "events": [
      "workflow.completed",
      "workflow.failed",
      "user.message.received"
    ],
    "secret": "webhook_secret_key_here",
    "retry_policy": {
      "max_retries": 3,
      "retry_delay": 5,
      "exponential_backoff": true
    },
    "headers": {
      "X-Custom-Header": "custom-value",
      "Authorization": "Bearer your-auth-token"
    },
    "timeout": 30,
    "active": true
  }
}

Webhook Endpoint Implementation

Basic Webhook Receiver (Python/Flask)

from flask import Flask, request, jsonify
import hmac
import hashlib
import json

app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret"

@app.route('/webhooks/broxi', methods=['POST'])
def handle_webhook():
    # Verify webhook signature
    if not verify_webhook_signature(request):
        return jsonify({"error": "Invalid signature"}), 401
    
    # Parse webhook payload
    payload = request.get_json()
    event_type = payload.get('event')
    
    # Handle different event types
    try:
        if event_type == 'workflow.completed':
            handle_workflow_completed(payload)
        elif event_type == 'workflow.failed':
            handle_workflow_failed(payload)
        elif event_type == 'user.message.received':
            handle_user_message(payload)
        else:
            print(f"Unhandled event type: {event_type}")
        
        return jsonify({"status": "success"}), 200
        
    except Exception as e:
        print(f"Error processing webhook: {e}")
        return jsonify({"error": "Processing failed"}), 500

def verify_webhook_signature(request):
    """Verify webhook signature for security"""
    signature = request.headers.get('X-Broxi-Signature')
    if not signature:
        return False
    
    # Calculate expected signature
    payload = request.get_data()
    expected_signature = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    
    return hmac.compare_digest(signature, expected_signature)

def handle_workflow_completed(payload):
    """Handle workflow completion event"""
    workflow_id = payload['data']['workflow_id']
    run_id = payload['data']['run_id']
    output = payload['data']['output']
    
    print(f"Workflow {workflow_id} completed: {run_id}")
    
    # Your business logic here
    # - Send notifications
    # - Update database
    # - Trigger next workflow
    # - Analytics tracking

def handle_workflow_failed(payload):
    """Handle workflow failure event"""
    workflow_id = payload['data']['workflow_id']
    run_id = payload['data']['run_id']
    error = payload['data']['error']
    
    print(f"Workflow {workflow_id} failed: {error}")
    
    # Error handling logic
    # - Log error details
    # - Send alerts
    # - Retry logic
    # - Escalation procedures

def handle_user_message(payload):
    """Handle new user message event"""
    user_id = payload['data']['user_id']
    message = payload['data']['message']
    session_id = payload['data']['session_id']
    
    print(f"New message from {user_id}: {message}")
    
    # Message processing logic
    # - Store message
    # - Analytics
    # - Auto-responses
    # - Escalation rules

if __name__ == '__main__':
    app.run(debug=True, port=5000)

Node.js/Express Implementation

const express = require('express');
const crypto = require('crypto');
const app = express();

app.use(express.json());

const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;

app.post('/webhooks/broxi', (req, res) => {
    // Verify signature
    if (!verifyWebhookSignature(req)) {
        return res.status(401).json({ error: 'Invalid signature' });
    }
    
    const { event, data, timestamp } = req.body;
    
    try {
        switch (event) {
            case 'workflow.completed':
                handleWorkflowCompleted(data);
                break;
            case 'workflow.failed':
                handleWorkflowFailed(data);
                break;
            case 'user.message.received':
                handleUserMessage(data);
                break;
            default:
                console.log(`Unhandled event: ${event}`);
        }
        
        res.json({ status: 'success' });
    } catch (error) {
        console.error('Webhook processing error:', error);
        res.status(500).json({ error: 'Processing failed' });
    }
});

function verifyWebhookSignature(req) {
    const signature = req.headers['x-broxi-signature'];
    if (!signature) return false;
    
    const payload = JSON.stringify(req.body);
    const expectedSignature = crypto
        .createHmac('sha256', WEBHOOK_SECRET)
        .update(payload)
        .digest('hex');
    
    return crypto.timingSafeEqual(
        Buffer.from(signature),
        Buffer.from(expectedSignature)
    );
}

function handleWorkflowCompleted(data) {
    console.log(`Workflow completed: ${data.workflow_id}`);
    
    // Business logic implementation
    // - Database updates
    // - Notifications
    // - Analytics
    // - Next steps
}

function handleWorkflowFailed(data) {
    console.log(`Workflow failed: ${data.workflow_id}`, data.error);
    
    // Error handling implementation
    // - Error logging
    // - Alert systems
    // - Retry mechanisms
    // - Escalation
}

function handleUserMessage(data) {
    console.log(`New message: ${data.user_id}`, data.message);
    
    // Message handling implementation
    // - Message storage
    // - Real-time updates
    // - Analytics
    // - Auto-responses
}

app.listen(3000, () => {
    console.log('Webhook server running on port 3000');
});

Webhook Payload Structure

Standard Payload Format

Base Webhook Structure

{
  "event": "workflow.completed",
  "timestamp": "2024-01-15T10:30:00.123Z",
  "webhook_id": "wh_abc123",
  "delivery_id": "del_xyz789",
  "data": {
    "workflow_id": "flow_123",
    "run_id": "run_456",
    "user_id": "user_789",
    "session_id": "session_012"
  }
}

Event-Specific Payloads

Workflow Completed Event

{
  "event": "workflow.completed",
  "timestamp": "2024-01-15T10:30:05.123Z",
  "webhook_id": "wh_abc123",
  "delivery_id": "del_xyz789",
  "data": {
    "workflow_id": "customer_support_bot",
    "run_id": "run_12345",
    "user_id": "user_67890",
    "session_id": "session_abcdef",
    "input": "Hello, I need help with my order",
    "output": "I'd be happy to help you with your order. Could you please provide your order number?",
    "execution_time": 2.3,
    "token_usage": {
      "prompt_tokens": 150,
      "completion_tokens": 75,
      "total_tokens": 225
    },
    "cost": 0.0045,
    "components_executed": [
      {
        "name": "Chat Input",
        "execution_time": 0.1
      },
      {
        "name": "OpenAI Model",
        "execution_time": 2.0
      },
      {
        "name": "Chat Output",
        "execution_time": 0.2
      }
    ]
  }
}

Workflow Failed Event

{
  "event": "workflow.failed",
  "timestamp": "2024-01-15T10:30:10.123Z",
  "webhook_id": "wh_abc123",
  "delivery_id": "del_xyz789",
  "data": {
    "workflow_id": "document_processor",
    "run_id": "run_12346",
    "user_id": "user_67891",
    "session_id": "session_abcdef",
    "input": "Process this document",
    "error": {
      "code": "COMPONENT_TIMEOUT",
      "message": "Document processing component timed out after 30 seconds",
      "component": "Document Loader",
      "details": {
        "timeout_duration": 30,
        "file_size": "15MB",
        "file_type": "PDF"
      }
    },
    "execution_time": 30.5,
    "failed_component": "Document Loader"
  }
}

User Message Event

{
  "event": "user.message.received",
  "timestamp": "2024-01-15T10:25:00.123Z",
  "webhook_id": "wh_abc123",
  "delivery_id": "del_xyz789",
  "data": {
    "user_id": "user_67890",
    "session_id": "session_abcdef",
    "message": {
      "id": "msg_123",
      "content": "What's the status of my order #12345?",
      "type": "text",
      "metadata": {
        "source": "web_chat",
        "user_agent": "Mozilla/5.0...",
        "ip_address": "192.168.1.100"
      }
    },
    "conversation_context": {
      "message_count": 5,
      "session_duration": 120,
      "previous_intents": ["greeting", "order_inquiry"]
    }
  }
}

Webhook Security

Signature Verification

Signature Generation (BroxiAI Side)

import hmac
import hashlib

def generate_webhook_signature(payload, secret):
    """Generate webhook signature"""
    return hmac.new(
        secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

Signature Verification (Your Side)

import hmac
import hashlib
import time

def verify_webhook_signature(payload, signature, secret, tolerance=300):
    """Verify webhook signature with timestamp tolerance"""
    
    # Extract timestamp from headers
    timestamp = request.headers.get('X-Broxi-Timestamp')
    if not timestamp:
        return False
    
    # Check timestamp tolerance (5 minutes)
    current_time = int(time.time())
    if abs(current_time - int(timestamp)) > tolerance:
        return False
    
    # Calculate expected signature
    signed_payload = f"{timestamp}.{payload}"
    expected_signature = hmac.new(
        secret.encode(),
        signed_payload.encode(),
        hashlib.sha256
    ).hexdigest()
    
    # Compare signatures
    return hmac.compare_digest(signature, expected_signature)

def secure_webhook_handler(request):
    """Secure webhook handler with all security checks"""
    
    # Get signature and timestamp
    signature = request.headers.get('X-Broxi-Signature')
    timestamp = request.headers.get('X-Broxi-Timestamp')
    
    if not signature or not timestamp:
        return {"error": "Missing security headers"}, 401
    
    # Get payload
    payload = request.get_data(as_text=True)
    
    # Verify signature
    if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
        return {"error": "Invalid signature"}, 401
    
    # Process webhook
    return process_webhook(request.get_json()), 200

IP Whitelisting

Configure IP Whitelist

ALLOWED_IPS = [
    '52.89.214.238',
    '34.212.75.30',
    '54.218.53.128',
    # BroxiAI webhook IP ranges
]

def is_ip_allowed(request):
    """Check if request IP is whitelisted"""
    client_ip = request.environ.get('HTTP_X_FORWARDED_FOR') or request.remote_addr
    return client_ip in ALLOWED_IPS

@app.before_request
def check_ip_whitelist():
    """Middleware to check IP whitelist"""
    if request.endpoint == 'handle_webhook':
        if not is_ip_allowed(request):
            abort(403)

Webhook Reliability

Retry Mechanism

Retry Configuration

Retry Policy:
  max_retries: 3
  initial_delay: 5s
  exponential_backoff: true
  max_delay: 60s
  
  retry_conditions:
    - status_codes: [500, 502, 503, 504, 408]
    - network_errors: true
    - timeouts: true
  
  non_retry_conditions:
    - status_codes: [400, 401, 403, 404]
    - malformed_response: false

Handling Retry Logic

import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class WebhookSender:
    def __init__(self, max_retries=3):
        self.session = requests.Session()
        
        # Configure retry strategy
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=2,
            status_forcelist=[500, 502, 503, 504, 408],
            allowed_methods=["POST"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def send_webhook(self, url, payload, headers=None):
        """Send webhook with retry logic"""
        try:
            response = self.session.post(
                url,
                json=payload,
                headers=headers,
                timeout=30
            )
            response.raise_for_status()
            return True
            
        except requests.exceptions.RequestException as e:
            print(f"Webhook delivery failed: {e}")
            return False

# Idempotency handling
def handle_duplicate_webhooks(delivery_id):
    """Handle duplicate webhook deliveries"""
    
    # Check if we've already processed this delivery
    if redis_client.exists(f"webhook_processed:{delivery_id}"):
        return {"status": "already_processed"}, 200
    
    # Mark as processing
    redis_client.setex(f"webhook_processing:{delivery_id}", 300, "true")
    
    try:
        # Process webhook
        result = process_webhook_payload(payload)
        
        # Mark as completed
        redis_client.setex(f"webhook_processed:{delivery_id}", 86400, "true")
        
        return result, 200
        
    except Exception as e:
        # Remove processing lock on error
        redis_client.delete(f"webhook_processing:{delivery_id}")
        raise
    finally:
        redis_client.delete(f"webhook_processing:{delivery_id}")

Webhook Monitoring

Delivery Status Tracking

class WebhookMonitor:
    def __init__(self):
        self.delivery_stats = {}
    
    def track_delivery(self, webhook_id, delivery_id, status, response_time):
        """Track webhook delivery statistics"""
        
        if webhook_id not in self.delivery_stats:
            self.delivery_stats[webhook_id] = {
                "total_deliveries": 0,
                "successful_deliveries": 0,
                "failed_deliveries": 0,
                "average_response_time": 0,
                "last_delivery": None
            }
        
        stats = self.delivery_stats[webhook_id]
        stats["total_deliveries"] += 1
        stats["last_delivery"] = time.time()
        
        if status == "success":
            stats["successful_deliveries"] += 1
        else:
            stats["failed_deliveries"] += 1
        
        # Update average response time
        current_avg = stats["average_response_time"]
        total = stats["total_deliveries"]
        stats["average_response_time"] = (
            (current_avg * (total - 1) + response_time) / total
        )
    
    def get_webhook_health(self, webhook_id):
        """Get webhook health metrics"""
        stats = self.delivery_stats.get(webhook_id, {})
        
        if not stats:
            return {"status": "no_data"}
        
        success_rate = (
            stats["successful_deliveries"] / stats["total_deliveries"] * 100
        )
        
        return {
            "success_rate": success_rate,
            "total_deliveries": stats["total_deliveries"],
            "average_response_time": stats["average_response_time"],
            "status": "healthy" if success_rate > 95 else "degraded"
        }

Advanced Webhook Patterns

Event Filtering

Conditional Webhooks

{
  "webhook_config": {
    "url": "https://your-app.com/webhooks/broxi",
    "events": ["workflow.completed"],
    "filters": {
      "workflow_id": ["customer_support_bot", "sales_assistant"],
      "user_id": {"starts_with": "premium_"},
      "output_contains": ["error", "failed", "unable"]
    },
    "conditions": {
      "execution_time": {"greater_than": 5.0},
      "token_usage": {"greater_than": 1000},
      "cost": {"greater_than": 0.10}
    }
  }
}

Dynamic Webhook Routing

def route_webhook_by_content(payload):
    """Route webhooks based on content"""
    
    event_type = payload.get('event')
    data = payload.get('data', {})
    
    # Route based on workflow type
    workflow_id = data.get('workflow_id')
    
    if workflow_id.startswith('support_'):
        return send_to_support_system(payload)
    elif workflow_id.startswith('sales_'):
        return send_to_crm_system(payload)
    elif workflow_id.startswith('analytics_'):
        return send_to_analytics_system(payload)
    
    # Route based on error severity
    if event_type == 'workflow.failed':
        error_code = data.get('error', {}).get('code')
        
        if error_code in ['CRITICAL_ERROR', 'SECURITY_BREACH']:
            return send_urgent_alert(payload)
        else:
            return log_error(payload)
    
    # Default routing
    return process_standard_webhook(payload)

Webhook Aggregation

Batch Processing

import asyncio
from collections import defaultdict

class WebhookAggregator:
    def __init__(self, batch_size=10, batch_timeout=30):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.batches = defaultdict(list)
        self.timers = {}
    
    async def add_webhook(self, webhook_type, payload):
        """Add webhook to batch for processing"""
        
        self.batches[webhook_type].append(payload)
        
        # Start timer for this batch type if not already started
        if webhook_type not in self.timers:
            self.timers[webhook_type] = asyncio.create_task(
                self.batch_timer(webhook_type)
            )
        
        # Process batch if it reaches size limit
        if len(self.batches[webhook_type]) >= self.batch_size:
            await self.process_batch(webhook_type)
    
    async def batch_timer(self, webhook_type):
        """Timer to process batch after timeout"""
        await asyncio.sleep(self.batch_timeout)
        await self.process_batch(webhook_type)
    
    async def process_batch(self, webhook_type):
        """Process accumulated batch"""
        
        if webhook_type not in self.batches:
            return
        
        batch = self.batches[webhook_type].copy()
        self.batches[webhook_type].clear()
        
        # Cancel timer
        if webhook_type in self.timers:
            self.timers[webhook_type].cancel()
            del self.timers[webhook_type]
        
        # Process the batch
        if batch:
            await self.handle_batch(webhook_type, batch)
    
    async def handle_batch(self, webhook_type, batch):
        """Handle processed batch"""
        
        if webhook_type == 'user.message.received':
            await self.process_message_batch(batch)
        elif webhook_type == 'workflow.completed':
            await self.process_completion_batch(batch)
        elif webhook_type == 'workflow.failed':
            await self.process_error_batch(batch)

Integration Examples

Slack Integration

Slack Webhook Handler

import slack_sdk

class SlackWebhookHandler:
    def __init__(self, slack_token):
        self.slack_client = slack_sdk.WebClient(token=slack_token)
    
    def handle_workflow_failed(self, payload):
        """Send Slack notification for workflow failures"""
        
        workflow_id = payload['data']['workflow_id']
        error = payload['data']['error']
        
        message = {
            "channel": "#alerts",
            "text": f"🚨 Workflow Failed: {workflow_id}",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Workflow Failed*\n*ID:* {workflow_id}\n*Error:* {error['message']}"
                    }
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "View Details"},
                            "url": f"https://app.broxi.ai/workflows/{workflow_id}"
                        },
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "Retry"},
                            "action_id": "retry_workflow",
                            "value": workflow_id
                        }
                    ]
                }
            ]
        }
        
        self.slack_client.chat_postMessage(**message)
    
    def handle_quota_warning(self, payload):
        """Send quota warning to Slack"""
        
        usage_percent = payload['data']['usage_percent']
        limit = payload['data']['limit']
        
        message = {
            "channel": "#billing",
            "text": f"⚠️ API Quota Warning: {usage_percent}% used",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*API Quota Warning*\n*Usage:* {usage_percent}% of {limit}\n*Time to reset:* {payload['data']['reset_time']}"
                    }
                }
            ]
        }
        
        self.slack_client.chat_postMessage(**message)

Database Integration

Webhook to Database

import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker

class DatabaseWebhookHandler:
    def __init__(self, database_url):
        self.engine = sa.create_engine(database_url)
        self.Session = sessionmaker(bind=self.engine)
    
    def store_workflow_completion(self, payload):
        """Store workflow completion in database"""
        
        session = self.Session()
        try:
            workflow_execution = WorkflowExecution(
                workflow_id=payload['data']['workflow_id'],
                run_id=payload['data']['run_id'],
                user_id=payload['data']['user_id'],
                session_id=payload['data']['session_id'],
                input_text=payload['data']['input'],
                output_text=payload['data']['output'],
                execution_time=payload['data']['execution_time'],
                token_usage=payload['data']['token_usage']['total_tokens'],
                cost=payload['data']['cost'],
                status='completed',
                timestamp=payload['timestamp']
            )
            
            session.add(workflow_execution)
            session.commit()
            
        except Exception as e:
            session.rollback()
            raise
        finally:
            session.close()
    
    def store_user_message(self, payload):
        """Store user message in database"""
        
        session = self.Session()
        try:
            message = UserMessage(
                user_id=payload['data']['user_id'],
                session_id=payload['data']['session_id'],
                message_id=payload['data']['message']['id'],
                content=payload['data']['message']['content'],
                message_type=payload['data']['message']['type'],
                metadata=payload['data']['message']['metadata'],
                timestamp=payload['timestamp']
            )
            
            session.add(message)
            session.commit()
            
        except Exception as e:
            session.rollback()
            raise
        finally:
            session.close()

Testing Webhooks

Webhook Testing Tools

Local Webhook Testing

from flask import Flask, request, jsonify
import json

app = Flask(__name__)

@app.route('/webhook-test', methods=['POST'])
def webhook_test():
    """Test webhook endpoint"""
    
    print("=== Webhook Received ===")
    print(f"Headers: {dict(request.headers)}")
    print(f"Payload: {json.dumps(request.get_json(), indent=2)}")
    print("========================")
    
    return jsonify({"status": "received"}), 200

if __name__ == '__main__':
    app.run(debug=True, port=5000)

Webhook Testing with ngrok

# Install ngrok
npm install -g ngrok

# Start your local webhook server
python webhook_test.py

# In another terminal, expose your local server
ngrok http 5000

# Use the ngrok URL in your webhook configuration
# https://abc123.ngrok.io/webhook-test

Unit Tests for Webhook Handler

import unittest
from unittest.mock import patch, Mock
import json

class TestWebhookHandler(unittest.TestCase):
    def setUp(self):
        self.webhook_handler = WebhookHandler()
    
    def test_workflow_completed_webhook(self):
        """Test workflow completed webhook handling"""
        
        payload = {
            "event": "workflow.completed",
            "timestamp": "2024-01-15T10:30:00Z",
            "data": {
                "workflow_id": "test_workflow",
                "run_id": "test_run",
                "user_id": "test_user",
                "output": "Test response"
            }
        }
        
        with patch.object(self.webhook_handler, 'process_completion') as mock_process:
            result = self.webhook_handler.handle_webhook(payload)
            
            self.assertEqual(result['status'], 'success')
            mock_process.assert_called_once_with(payload['data'])
    
    def test_invalid_signature(self):
        """Test webhook with invalid signature"""
        
        with patch('hmac.compare_digest', return_value=False):
            result = self.webhook_handler.verify_signature("payload", "invalid_sig")
            self.assertFalse(result)
    
    @patch('requests.post')
    def test_webhook_retry_logic(self, mock_post):
        """Test webhook retry mechanism"""
        
        # Simulate initial failure, then success
        mock_post.side_effect = [
            Mock(status_code=500),  # First attempt fails
            Mock(status_code=500),  # Second attempt fails
            Mock(status_code=200)   # Third attempt succeeds
        ]
        
        sender = WebhookSender(max_retries=3)
        result = sender.send_webhook("http://test.com", {"test": "data"})
        
        self.assertTrue(result)
        self.assertEqual(mock_post.call_count, 3)

Troubleshooting Webhooks

Common Issues

Webhook Not Receiving Events

def diagnose_webhook_issues(webhook_url):
    """Diagnose common webhook issues"""
    
    issues = []
    
    # Test URL accessibility
    try:
        response = requests.get(webhook_url, timeout=10)
        if response.status_code >= 400:
            issues.append(f"URL returns {response.status_code}")
    except requests.exceptions.ConnectionError:
        issues.append("URL is not accessible")
    except requests.exceptions.Timeout:
        issues.append("URL request timed out")
    
    # Test POST endpoint
    try:
        test_payload = {"test": "webhook"}
        response = requests.post(webhook_url, json=test_payload, timeout=10)
        if response.status_code >= 400:
            issues.append(f"POST endpoint returns {response.status_code}")
    except Exception as e:
        issues.append(f"POST test failed: {str(e)}")
    
    # Check SSL certificate
    if webhook_url.startswith('https://'):
        try:
            requests.get(webhook_url, verify=True, timeout=10)
        except requests.exceptions.SSLError:
            issues.append("SSL certificate issues")
    
    return issues

Debugging Webhook Payloads

import logging

# Configure detailed logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger('webhook_handler')

def debug_webhook(request):
    """Debug webhook request details"""
    
    logger.debug("=== Webhook Debug Info ===")
    logger.debug(f"Method: {request.method}")
    logger.debug(f"URL: {request.url}")
    logger.debug(f"Headers: {dict(request.headers)}")
    logger.debug(f"Body: {request.get_data(as_text=True)}")
    logger.debug("========================")
    
    # Validate required headers
    required_headers = ['X-Broxi-Signature', 'X-Broxi-Timestamp']
    for header in required_headers:
        if header not in request.headers:
            logger.warning(f"Missing required header: {header}")
    
    # Validate JSON payload
    try:
        payload = request.get_json()
        required_fields = ['event', 'timestamp', 'data']
        for field in required_fields:
            if field not in payload:
                logger.warning(f"Missing required field: {field}")
    except Exception as e:
        logger.error(f"Invalid JSON payload: {e}")

Best Practices

Webhook Security

  • Always verify webhook signatures

  • Use HTTPS endpoints only

  • Implement IP whitelisting when possible

  • Set reasonable timeout values

  • Log webhook activities for audit

Performance Optimization

  • Process webhooks asynchronously

  • Implement proper error handling

  • Use batch processing for high volume

  • Cache frequently accessed data

  • Monitor webhook performance

Reliability

  • Implement idempotency handling

  • Use retry mechanisms with exponential backoff

  • Monitor webhook delivery success rates

  • Have fallback mechanisms for critical events

  • Test webhook endpoints regularly

Next Steps

After implementing webhooks:

  1. Monitor Performance: Track delivery success rates

  2. Optimize Processing: Implement async processing

  3. Scale Infrastructure: Handle increasing webhook volume

  4. Enhance Security: Regular security audits

  5. Documentation: Keep webhook docs updated


Last updated