Data Protection

Comprehensive data protection strategies for BroxiAI applications including encryption, access controls, and privacy safeguards

Implement comprehensive data protection measures for your BroxiAI applications to safeguard sensitive information, ensure privacy compliance, and maintain customer trust.

Data Protection Overview

Core Principles

Data Protection Fundamentals

Data Protection Principles:
  Data Minimization: "Collect only necessary data"
  Purpose Limitation: "Use data only for stated purposes"
  Data Quality: "Maintain accurate and up-to-date data"
  Storage Limitation: "Retain data only as long as necessary"
  Security: "Implement appropriate technical and organizational measures"
  Accountability: "Demonstrate compliance with data protection principles"
  Transparency: "Inform users about data processing"
  User Rights: "Respect data subject rights and preferences"

Data Classification Framework

Data Classification Levels:
  Public:
    description: "Information that can be freely shared"
    examples: ["Marketing materials", "Public documentation", "General product information"]
    protection_level: "Basic"
    
  Internal:
    description: "Information for internal use only"
    examples: ["Business processes", "Internal communications", "Operational data"]
    protection_level: "Standard"
    
  Confidential:
    description: "Sensitive business information"
    examples: ["Customer data", "Financial information", "Business strategies"]
    protection_level: "Enhanced"
    
  Restricted:
    description: "Highly sensitive information"
    examples: ["Personal data", "Payment information", "Health records", "Legal documents"]
    protection_level: "Maximum"

Encryption Implementation

Data at Rest Encryption

Database Encryption

import cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class DatabaseEncryption:
    def __init__(self, master_key=None):
        self.master_key = master_key or self.generate_master_key()
        self.fernet = Fernet(self.master_key)
    
    def generate_master_key(self):
        """Generate encryption master key"""
        return Fernet.generate_key()
    
    def derive_key_from_password(self, password, salt=None):
        """Derive encryption key from password"""
        if salt is None:
            salt = os.urandom(16)
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
        return key, salt
    
    def encrypt_field(self, data):
        """Encrypt sensitive field data"""
        if data is None:
            return None
        
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        encrypted_data = self.fernet.encrypt(data)
        return base64.urlsafe_b64encode(encrypted_data).decode('utf-8')
    
    def decrypt_field(self, encrypted_data):
        """Decrypt sensitive field data"""
        if encrypted_data is None:
            return None
        
        try:
            encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
            decrypted_data = self.fernet.decrypt(encrypted_bytes)
            return decrypted_data.decode('utf-8')
        except Exception as e:
            raise DecryptionError(f"Failed to decrypt data: {str(e)}")

# SQLAlchemy encryption model
from sqlalchemy import Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property

Base = declarative_base()
encryption_handler = DatabaseEncryption()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(100), nullable=False)
    _email = Column('email', Text)  # Encrypted field
    _phone = Column('phone', Text)  # Encrypted field
    _ssn = Column('ssn', Text)     # Encrypted field
    created_at = Column(DateTime)
    
    @hybrid_property
    def email(self):
        """Decrypt email when accessed"""
        return encryption_handler.decrypt_field(self._email)
    
    @email.setter
    def email(self, value):
        """Encrypt email when set"""
        self._email = encryption_handler.encrypt_field(value)
    
    @hybrid_property
    def phone(self):
        return encryption_handler.decrypt_field(self._phone)
    
    @phone.setter
    def phone(self, value):
        self._phone = encryption_handler.encrypt_field(value)
    
    @hybrid_property
    def ssn(self):
        return encryption_handler.decrypt_field(self._ssn)
    
    @ssn.setter
    def ssn(self, value):
        self._ssn = encryption_handler.encrypt_field(value)

File System Encryption

import os
import shutil
from pathlib import Path
from cryptography.fernet import Fernet

class FileEncryption:
    def __init__(self, key_file_path="encryption.key"):
        self.key_file_path = key_file_path
        self.key = self.load_or_generate_key()
        self.fernet = Fernet(self.key)
    
    def load_or_generate_key(self):
        """Load existing key or generate new one"""
        if os.path.exists(self.key_file_path):
            with open(self.key_file_path, 'rb') as key_file:
                return key_file.read()
        else:
            key = Fernet.generate_key()
            with open(self.key_file_path, 'wb') as key_file:
                key_file.write(key)
            # Secure key file permissions
            os.chmod(self.key_file_path, 0o600)
            return key
    
    def encrypt_file(self, file_path, output_path=None):
        """Encrypt a file"""
        if output_path is None:
            output_path = f"{file_path}.encrypted"
        
        with open(file_path, 'rb') as file:
            file_data = file.read()
        
        encrypted_data = self.fernet.encrypt(file_data)
        
        with open(output_path, 'wb') as encrypted_file:
            encrypted_file.write(encrypted_data)
        
        # Secure file permissions
        os.chmod(output_path, 0o600)
        
        return output_path
    
    def decrypt_file(self, encrypted_file_path, output_path=None):
        """Decrypt a file"""
        if output_path is None:
            output_path = encrypted_file_path.replace('.encrypted', '')
        
        with open(encrypted_file_path, 'rb') as encrypted_file:
            encrypted_data = encrypted_file.read()
        
        decrypted_data = self.fernet.decrypt(encrypted_data)
        
        with open(output_path, 'wb') as file:
            file.write(decrypted_data)
        
        return output_path
    
    def encrypt_directory(self, directory_path, output_directory=None):
        """Encrypt all files in a directory"""
        if output_directory is None:
            output_directory = f"{directory_path}_encrypted"
        
        Path(output_directory).mkdir(exist_ok=True)
        
        for root, dirs, files in os.walk(directory_path):
            for file in files:
                file_path = os.path.join(root, file)
                relative_path = os.path.relpath(file_path, directory_path)
                output_file_path = os.path.join(output_directory, f"{relative_path}.encrypted")
                
                # Create subdirectories if needed
                os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
                
                self.encrypt_file(file_path, output_file_path)
        
        return output_directory

Data in Transit Encryption

HTTPS/TLS Configuration

import ssl
import socket
from flask import Flask
from werkzeug.serving import make_ssl_devcert

class SecureFlaskApp:
    def __init__(self, app_name):
        self.app = Flask(app_name)
        self.configure_ssl()
    
    def configure_ssl(self):
        """Configure SSL/TLS settings"""
        
        # Create SSL context
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        
        # Configure SSL options
        context.options |= ssl.OP_NO_SSLv2
        context.options |= ssl.OP_NO_SSLv3
        context.options |= ssl.OP_NO_TLSv1
        context.options |= ssl.OP_NO_TLSv1_1
        context.options |= ssl.OP_SINGLE_DH_USE
        context.options |= ssl.OP_SINGLE_ECDH_USE
        
        # Set cipher suites (strong ciphers only)
        context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
        
        # Load certificate and key
        context.load_cert_chain('path/to/certificate.pem', 'path/to/private.key')
        
        self.ssl_context = context
    
    def run_secure(self, host='0.0.0.0', port=5000):
        """Run Flask app with SSL"""
        self.app.run(
            host=host,
            port=port,
            ssl_context=self.ssl_context,
            debug=False
        )

# API Client with TLS verification
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context

class SecureAPIClient:
    def __init__(self, base_url, verify_ssl=True):
        self.base_url = base_url
        self.session = requests.Session()
        
        if verify_ssl:
            self.configure_ssl_verification()
    
    def configure_ssl_verification(self):
        """Configure strict SSL verification"""
        
        class SSLAdapter(HTTPAdapter):
            def init_poolmanager(self, connections, maxsize, block=False):
                ctx = create_urllib3_context()
                ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
                ctx.check_hostname = True
                ctx.verify_mode = ssl.CERT_REQUIRED
                
                self.poolmanager = urllib3.poolmanager.PoolManager(
                    num_pools=connections,
                    maxsize=maxsize,
                    block=block,
                    ssl_context=ctx
                )
        
        self.session.mount('https://', SSLAdapter())
    
    def make_secure_request(self, method, endpoint, **kwargs):
        """Make secure API request"""
        url = f"{self.base_url}{endpoint}"
        
        # Add security headers
        headers = kwargs.get('headers', {})
        headers.update({
            'User-Agent': 'BroxiAI-SecureClient/1.0',
            'X-Content-Type-Options': 'nosniff',
            'X-Frame-Options': 'DENY',
            'X-XSS-Protection': '1; mode=block'
        })
        kwargs['headers'] = headers
        
        # Enforce SSL verification
        kwargs['verify'] = True
        kwargs['timeout'] = kwargs.get('timeout', 30)
        
        response = self.session.request(method, url, **kwargs)
        response.raise_for_status()
        
        return response

Access Controls and Authentication

Multi-Factor Authentication

MFA Implementation

import pyotp
import qrcode
import io
import base64
from datetime import datetime, timedelta
import secrets

class MFAManager:
    def __init__(self):
        self.backup_codes_count = 10
        self.totp_window = 1  # Allow 30-second window
    
    def generate_totp_secret(self, user_email):
        """Generate TOTP secret for user"""
        secret = pyotp.random_base32()
        
        return {
            "secret": secret,
            "provisioning_uri": pyotp.totp.TOTP(secret).provisioning_uri(
                name=user_email,
                issuer_name="BroxiAI"
            )
        }
    
    def generate_qr_code(self, provisioning_uri):
        """Generate QR code for TOTP setup"""
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(provisioning_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # Convert to base64 for web display
        buf = io.BytesIO()
        img.save(buf, format='PNG')
        img_str = base64.b64encode(buf.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    def verify_totp_token(self, secret, token):
        """Verify TOTP token"""
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=self.totp_window)
    
    def generate_backup_codes(self):
        """Generate backup codes for account recovery"""
        codes = []
        for _ in range(self.backup_codes_count):
            code = secrets.token_hex(4).upper()  # 8-character hex code
            codes.append(f"{code[:4]}-{code[4:]}")  # Format: XXXX-XXXX
        
        return codes
    
    def verify_backup_code(self, user_backup_codes, provided_code):
        """Verify backup code and mark as used"""
        formatted_code = provided_code.replace(" ", "").replace("-", "").upper()
        
        for i, stored_code in enumerate(user_backup_codes):
            stored_formatted = stored_code.replace("-", "").upper()
            if stored_formatted == formatted_code:
                # Mark code as used (remove from list)
                user_backup_codes.pop(i)
                return True, user_backup_codes
        
        return False, user_backup_codes

# Flask MFA integration
from flask import session, request, jsonify
from functools import wraps

class FlaskMFAIntegration:
    def __init__(self, app):
        self.app = app
        self.mfa_manager = MFAManager()
    
    def require_mfa(self, f):
        """Decorator to require MFA for sensitive operations"""
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not session.get('mfa_verified'):
                return jsonify({
                    "error": "MFA verification required",
                    "redirect": "/mfa/verify"
                }), 403
            
            # Check MFA session timeout (30 minutes)
            mfa_timestamp = session.get('mfa_timestamp')
            if mfa_timestamp:
                if datetime.now() - datetime.fromisoformat(mfa_timestamp) > timedelta(minutes=30):
                    session.pop('mfa_verified', None)
                    session.pop('mfa_timestamp', None)
                    return jsonify({
                        "error": "MFA session expired",
                        "redirect": "/mfa/verify"
                    }), 403
            
            return f(*args, **kwargs)
        return decorated_function
    
    def setup_mfa_routes(self):
        """Set up MFA routes"""
        
        @self.app.route('/mfa/setup', methods=['POST'])
        def setup_mfa():
            user_email = session.get('user_email')
            if not user_email:
                return jsonify({"error": "Not authenticated"}), 401
            
            totp_data = self.mfa_manager.generate_totp_secret(user_email)
            backup_codes = self.mfa_manager.generate_backup_codes()
            
            # Store secret in database (encrypted)
            # store_user_mfa_secret(user_email, totp_data["secret"], backup_codes)
            
            return jsonify({
                "qr_code": self.mfa_manager.generate_qr_code(totp_data["provisioning_uri"]),
                "backup_codes": backup_codes
            })
        
        @self.app.route('/mfa/verify', methods=['POST'])
        def verify_mfa():
            user_email = session.get('user_email')
            token = request.json.get('token')
            
            if not user_email or not token:
                return jsonify({"error": "Missing credentials"}), 400
            
            # Get user's MFA secret from database
            # user_secret = get_user_mfa_secret(user_email)
            
            if self.mfa_manager.verify_totp_token(user_secret, token):
                session['mfa_verified'] = True
                session['mfa_timestamp'] = datetime.now().isoformat()
                return jsonify({"status": "MFA verified"})
            else:
                return jsonify({"error": "Invalid MFA token"}), 401

Role-Based Access Control (RBAC)

RBAC Implementation

from enum import Enum
from dataclasses import dataclass
from typing import List, Set
import json

class Permission(Enum):
    # Workflow permissions
    WORKFLOW_CREATE = "workflow.create"
    WORKFLOW_READ = "workflow.read"
    WORKFLOW_UPDATE = "workflow.update"
    WORKFLOW_DELETE = "workflow.delete"
    WORKFLOW_EXECUTE = "workflow.execute"
    
    # Data permissions
    DATA_READ = "data.read"
    DATA_WRITE = "data.write"
    DATA_DELETE = "data.delete"
    DATA_EXPORT = "data.export"
    
    # Admin permissions
    USER_MANAGE = "user.manage"
    ROLE_MANAGE = "role.manage"
    SYSTEM_CONFIG = "system.config"
    AUDIT_VIEW = "audit.view"
    
    # API permissions
    API_READ = "api.read"
    API_WRITE = "api.write"
    API_ADMIN = "api.admin"

@dataclass
class Role:
    name: str
    description: str
    permissions: Set[Permission]
    is_system_role: bool = False

class RBACManager:
    def __init__(self):
        self.roles = self.initialize_default_roles()
        self.user_roles = {}  # user_id -> set of role names
    
    def initialize_default_roles(self):
        """Initialize default system roles"""
        return {
            "admin": Role(
                name="admin",
                description="Full system administrator",
                permissions=set(Permission),
                is_system_role=True
            ),
            "developer": Role(
                name="developer", 
                description="Workflow developer",
                permissions={
                    Permission.WORKFLOW_CREATE,
                    Permission.WORKFLOW_READ,
                    Permission.WORKFLOW_UPDATE,
                    Permission.WORKFLOW_DELETE,
                    Permission.WORKFLOW_EXECUTE,
                    Permission.DATA_READ,
                    Permission.DATA_WRITE,
                    Permission.API_READ,
                    Permission.API_WRITE
                },
                is_system_role=True
            ),
            "analyst": Role(
                name="analyst",
                description="Data analyst",
                permissions={
                    Permission.WORKFLOW_READ,
                    Permission.WORKFLOW_EXECUTE,
                    Permission.DATA_READ,
                    Permission.DATA_EXPORT,
                    Permission.API_READ
                },
                is_system_role=True
            ),
            "viewer": Role(
                name="viewer",
                description="Read-only access",
                permissions={
                    Permission.WORKFLOW_READ,
                    Permission.DATA_READ,
                    Permission.API_READ
                },
                is_system_role=True
            )
        }
    
    def create_role(self, name: str, description: str, permissions: List[Permission]):
        """Create custom role"""
        if name in self.roles:
            raise ValueError(f"Role {name} already exists")
        
        self.roles[name] = Role(
            name=name,
            description=description,
            permissions=set(permissions),
            is_system_role=False
        )
        
        return self.roles[name]
    
    def assign_role_to_user(self, user_id: str, role_name: str):
        """Assign role to user"""
        if role_name not in self.roles:
            raise ValueError(f"Role {role_name} does not exist")
        
        if user_id not in self.user_roles:
            self.user_roles[user_id] = set()
        
        self.user_roles[user_id].add(role_name)
    
    def remove_role_from_user(self, user_id: str, role_name: str):
        """Remove role from user"""
        if user_id in self.user_roles:
            self.user_roles[user_id].discard(role_name)
    
    def get_user_permissions(self, user_id: str) -> Set[Permission]:
        """Get all permissions for a user"""
        if user_id not in self.user_roles:
            return set()
        
        permissions = set()
        for role_name in self.user_roles[user_id]:
            if role_name in self.roles:
                permissions.update(self.roles[role_name].permissions)
        
        return permissions
    
    def check_permission(self, user_id: str, permission: Permission) -> bool:
        """Check if user has specific permission"""
        user_permissions = self.get_user_permissions(user_id)
        return permission in user_permissions
    
    def require_permission(self, permission: Permission):
        """Decorator to require specific permission"""
        def decorator(f):
            @wraps(f)
            def decorated_function(*args, **kwargs):
                user_id = session.get('user_id')
                if not user_id:
                    return jsonify({"error": "Authentication required"}), 401
                
                if not self.check_permission(user_id, permission):
                    return jsonify({
                        "error": "Insufficient permissions",
                        "required_permission": permission.value
                    }), 403
                
                return f(*args, **kwargs)
            return decorated_function
        return decorator

# Usage example
rbac = RBACManager()

# Protect API endpoints
@app.route('/api/workflows', methods=['POST'])
@rbac.require_permission(Permission.WORKFLOW_CREATE)
def create_workflow():
    # Only users with WORKFLOW_CREATE permission can access
    pass

@app.route('/api/data/export', methods=['GET'])
@rbac.require_permission(Permission.DATA_EXPORT)
def export_data():
    # Only users with DATA_EXPORT permission can access
    pass

Data Loss Prevention (DLP)

Sensitive Data Detection

PII Detection and Masking

import re
import hashlib
from typing import Dict, List, Tuple

class SensitiveDataDetector:
    def __init__(self):
        self.patterns = self.initialize_patterns()
        self.risk_scores = {
            "ssn": 10,
            "credit_card": 10,
            "email": 5,
            "phone": 4,
            "ip_address": 3,
            "url": 2
        }
    
    def initialize_patterns(self):
        """Initialize regex patterns for sensitive data"""
        return {
            "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b|\b\d{9}\b'),
            "credit_card": re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
            "email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            "phone": re.compile(r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'),
            "ip_address": re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),
            "url": re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+'),
            "api_key": re.compile(r'\b[A-Za-z0-9]{32,}\b'),
            "passport": re.compile(r'\b[A-Z]{1,2}[0-9]{6,9}\b'),
            "driver_license": re.compile(r'\b[A-Z]{1,2}[0-9]{6,8}\b')
        }
    
    def scan_text(self, text: str) -> Dict:
        """Scan text for sensitive data"""
        findings = []
        risk_score = 0
        
        for data_type, pattern in self.patterns.items():
            matches = pattern.findall(text)
            if matches:
                for match in matches:
                    findings.append({
                        "type": data_type,
                        "value": match,
                        "start": text.find(match),
                        "end": text.find(match) + len(match),
                        "risk_score": self.risk_scores.get(data_type, 1)
                    })
                    risk_score += self.risk_scores.get(data_type, 1)
        
        return {
            "findings": findings,
            "total_risk_score": risk_score,
            "risk_level": self.calculate_risk_level(risk_score),
            "contains_pii": risk_score > 0
        }
    
    def calculate_risk_level(self, score: int) -> str:
        """Calculate risk level based on score"""
        if score >= 20:
            return "CRITICAL"
        elif score >= 10:
            return "HIGH"
        elif score >= 5:
            return "MEDIUM"
        elif score > 0:
            return "LOW"
        else:
            return "NONE"
    
    def mask_sensitive_data(self, text: str, mask_char: str = "*") -> str:
        """Mask sensitive data in text"""
        masked_text = text
        
        for data_type, pattern in self.patterns.items():
            if data_type in ["ssn", "credit_card", "api_key"]:
                # Full masking for highly sensitive data
                masked_text = pattern.sub(
                    lambda m: mask_char * len(m.group()),
                    masked_text
                )
            elif data_type in ["email", "phone"]:
                # Partial masking
                masked_text = pattern.sub(
                    lambda m: self.partial_mask(m.group(), data_type, mask_char),
                    masked_text
                )
        
        return masked_text
    
    def partial_mask(self, value: str, data_type: str, mask_char: str) -> str:
        """Apply partial masking to preserve some readability"""
        if data_type == "email":
            if "@" in value:
                local, domain = value.split("@", 1)
                if len(local) > 2:
                    masked_local = local[0] + mask_char * (len(local) - 2) + local[-1]
                    return f"{masked_local}@{domain}"
        elif data_type == "phone":
            if len(value) >= 7:
                return value[:3] + mask_char * (len(value) - 6) + value[-3:]
        
        return value
    
    def anonymize_data(self, text: str) -> str:
        """Replace sensitive data with anonymized placeholders"""
        anonymized_text = text
        
        replacement_map = {
            "ssn": "[SSN-REDACTED]",
            "credit_card": "[CARD-REDACTED]",
            "email": "[EMAIL-REDACTED]",
            "phone": "[PHONE-REDACTED]",
            "api_key": "[API-KEY-REDACTED]"
        }
        
        for data_type, pattern in self.patterns.items():
            if data_type in replacement_map:
                anonymized_text = pattern.sub(
                    replacement_map[data_type],
                    anonymized_text
                )
        
        return anonymized_text

class DataLossPreventionEngine:
    def __init__(self):
        self.detector = SensitiveDataDetector()
        self.policies = self.load_dlp_policies()
        self.audit_logger = AuditLogger()
    
    def load_dlp_policies(self):
        """Load DLP policies"""
        return {
            "prevent_pii_in_logs": {
                "enabled": True,
                "action": "mask",
                "risk_threshold": 5
            },
            "block_high_risk_data": {
                "enabled": True,
                "action": "block",
                "risk_threshold": 15
            },
            "audit_all_sensitive_data": {
                "enabled": True,
                "action": "audit",
                "risk_threshold": 1
            }
        }
    
    def process_data(self, data: str, context: Dict) -> Dict:
        """Process data through DLP engine"""
        scan_result = self.detector.scan_text(data)
        
        # Apply policies
        actions_taken = []
        processed_data = data
        
        for policy_name, policy in self.policies.items():
            if not policy["enabled"]:
                continue
            
            if scan_result["total_risk_score"] >= policy["risk_threshold"]:
                action = policy["action"]
                
                if action == "mask":
                    processed_data = self.detector.mask_sensitive_data(processed_data)
                    actions_taken.append("masked")
                elif action == "block":
                    actions_taken.append("blocked")
                    return {
                        "blocked": True,
                        "reason": "High-risk sensitive data detected",
                        "risk_score": scan_result["total_risk_score"],
                        "findings": scan_result["findings"]
                    }
                elif action == "audit":
                    self.audit_logger.log_sensitive_data_access(
                        data=data,
                        findings=scan_result["findings"],
                        context=context
                    )
                    actions_taken.append("audited")
        
        return {
            "blocked": False,
            "processed_data": processed_data,
            "original_risk_score": scan_result["total_risk_score"],
            "risk_level": scan_result["risk_level"],
            "actions_taken": actions_taken,
            "findings": scan_result["findings"]
        }

Data Retention and Deletion

Automated Data Lifecycle Management

Data Retention Policies

from datetime import datetime, timedelta
from enum import Enum
import schedule
import time

class RetentionPeriod(Enum):
    DAYS_30 = 30
    DAYS_90 = 90
    DAYS_180 = 180
    YEAR_1 = 365
    YEARS_3 = 1095
    YEARS_7 = 2555
    YEARS_10 = 3650

class DataCategory(Enum):
    USER_CONVERSATIONS = "user_conversations"
    SYSTEM_LOGS = "system_logs"
    AUDIT_LOGS = "audit_logs"
    PERSONAL_DATA = "personal_data"
    BUSINESS_DATA = "business_data"
    TEMP_FILES = "temp_files"

class DataRetentionManager:
    def __init__(self):
        self.retention_policies = self.initialize_retention_policies()
        self.deletion_queue = []
    
    def initialize_retention_policies(self):
        """Initialize data retention policies"""
        return {
            DataCategory.USER_CONVERSATIONS: {
                "retention_period": RetentionPeriod.YEARS_3,
                "archive_after": RetentionPeriod.YEAR_1,
                "deletion_method": "secure_delete",
                "requires_approval": False
            },
            DataCategory.SYSTEM_LOGS: {
                "retention_period": RetentionPeriod.DAYS_90,
                "archive_after": RetentionPeriod.DAYS_30,
                "deletion_method": "standard_delete",
                "requires_approval": False
            },
            DataCategory.AUDIT_LOGS: {
                "retention_period": RetentionPeriod.YEARS_7,
                "archive_after": RetentionPeriod.YEAR_1,
                "deletion_method": "secure_delete",
                "requires_approval": True
            },
            DataCategory.PERSONAL_DATA: {
                "retention_period": RetentionPeriod.YEARS_3,
                "archive_after": RetentionPeriod.DAYS_180,
                "deletion_method": "secure_delete",
                "requires_approval": True
            },
            DataCategory.TEMP_FILES: {
                "retention_period": RetentionPeriod.DAYS_30,
                "archive_after": None,
                "deletion_method": "standard_delete",
                "requires_approval": False
            }
        }
    
    def schedule_deletion(self, data_id: str, category: DataCategory, created_date: datetime):
        """Schedule data for deletion based on retention policy"""
        policy = self.retention_policies[category]
        deletion_date = created_date + timedelta(days=policy["retention_period"].value)
        
        if policy["archive_after"]:
            archive_date = created_date + timedelta(days=policy["archive_after"].value)
        else:
            archive_date = None
        
        deletion_item = {
            "data_id": data_id,
            "category": category,
            "created_date": created_date,
            "archive_date": archive_date,
            "deletion_date": deletion_date,
            "deletion_method": policy["deletion_method"],
            "requires_approval": policy["requires_approval"],
            "status": "scheduled"
        }
        
        self.deletion_queue.append(deletion_item)
        return deletion_item
    
    def process_deletion_queue(self):
        """Process items in deletion queue"""
        current_date = datetime.utcnow()
        
        for item in self.deletion_queue[:]:  # Copy list to avoid modification during iteration
            # Check for archival
            if item["archive_date"] and current_date >= item["archive_date"] and item["status"] == "scheduled":
                self.archive_data(item)
                item["status"] = "archived"
            
            # Check for deletion
            if current_date >= item["deletion_date"]:
                if item["requires_approval"] and item["status"] != "approved":
                    self.request_deletion_approval(item)
                else:
                    self.delete_data(item)
                    self.deletion_queue.remove(item)
    
    def archive_data(self, deletion_item):
        """Archive data to long-term storage"""
        data_id = deletion_item["data_id"]
        category = deletion_item["category"]
        
        # Implementation depends on your storage system
        # This could involve moving data to cheaper storage tier
        
        print(f"Archiving {category.value} data: {data_id}")
        
        # Log archival action
        self.log_data_lifecycle_event(
            data_id=data_id,
            action="archived",
            category=category,
            timestamp=datetime.utcnow()
        )
    
    def delete_data(self, deletion_item):
        """Delete data according to deletion method"""
        data_id = deletion_item["data_id"]
        category = deletion_item["category"]
        method = deletion_item["deletion_method"]
        
        if method == "secure_delete":
            self.secure_delete(data_id, category)
        else:
            self.standard_delete(data_id, category)
        
        # Log deletion action
        self.log_data_lifecycle_event(
            data_id=data_id,
            action="deleted",
            category=category,
            method=method,
            timestamp=datetime.utcnow()
        )
    
    def secure_delete(self, data_id: str, category: DataCategory):
        """Perform secure deletion with multiple overwrites"""
        # Implementation for secure deletion
        # - Overwrite data multiple times with random data
        # - Remove from all backups
        # - Clear from caches
        # - Update indexes
        
        print(f"Securely deleting {category.value} data: {data_id}")
        
        # Example: Database secure deletion
        # 1. Overwrite sensitive fields with random data
        # 2. Delete record
        # 3. Run VACUUM/optimization to ensure space is reclaimed
    
    def standard_delete(self, data_id: str, category: DataCategory):
        """Perform standard deletion"""
        print(f"Deleting {category.value} data: {data_id}")
        
        # Standard deletion implementation
        # - Remove from primary storage
        # - Clear from caches
        # - Update indexes
    
    def request_deletion_approval(self, deletion_item):
        """Request approval for data deletion"""
        # Send notification to data protection officer or admin
        approval_request = {
            "data_id": deletion_item["data_id"],
            "category": deletion_item["category"].value,
            "scheduled_deletion_date": deletion_item["deletion_date"],
            "reason": "Retention period expired",
            "requested_by": "automated_system",
            "requested_at": datetime.utcnow()
        }
        
        # Implementation would send email, create ticket, etc.
        print(f"Deletion approval requested for {deletion_item['data_id']}")
        
        deletion_item["status"] = "approval_requested"
    
    def setup_automated_cleanup(self):
        """Set up automated cleanup schedule"""
        # Schedule daily cleanup
        schedule.every().day.at("02:00").do(self.process_deletion_queue)
        
        # Schedule weekly retention policy review
        schedule.every().week.do(self.review_retention_policies)
        
        # Run scheduler
        while True:
            schedule.run_pending()
            time.sleep(3600)  # Check every hour
    
    def user_data_deletion_request(self, user_id: str, reason: str = "user_request"):
        """Handle user's request to delete their data"""
        
        # Find all data associated with user
        user_data_items = self.find_user_data(user_id)
        
        deletion_summary = {
            "user_id": user_id,
            "request_date": datetime.utcnow(),
            "reason": reason,
            "items_found": len(user_data_items),
            "items_deleted": 0,
            "items_requiring_retention": 0,
            "deletion_details": []
        }
        
        for data_item in user_data_items:
            # Check if data can be deleted (legal holds, audit requirements, etc.)
            if self.can_delete_immediately(data_item):
                self.delete_data_immediately(data_item)
                deletion_summary["items_deleted"] += 1
                deletion_summary["deletion_details"].append({
                    "data_type": data_item["type"],
                    "action": "deleted",
                    "reason": "user_request"
                })
            else:
                # Mark for deletion when retention period allows
                retention_reason = self.get_retention_reason(data_item)
                deletion_summary["items_requiring_retention"] += 1
                deletion_summary["deletion_details"].append({
                    "data_type": data_item["type"],
                    "action": "scheduled_for_deletion",
                    "reason": retention_reason,
                    "deletion_date": self.calculate_earliest_deletion_date(data_item)
                })
        
        return deletion_summary

Privacy Engineering

Privacy by Design Implementation

Privacy-Preserving Data Processing

import hashlib
import hmac
import random
from typing import List, Dict, Any

class PrivacyEngineer:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key.encode('utf-8')
    
    def pseudonymize_identifier(self, identifier: str, salt: str = None) -> str:
        """Create consistent pseudonym for identifier"""
        if salt is None:
            salt = "default_salt"
        
        # Use HMAC for consistent pseudonymization
        h = hmac.new(self.secret_key, f"{identifier}{salt}".encode('utf-8'), hashlib.sha256)
        return h.hexdigest()[:16]  # Return first 16 characters
    
    def anonymize_dataset(self, dataset: List[Dict], identifier_fields: List[str], 
                         quasi_identifier_fields: List[str] = None) -> List[Dict]:
        """Anonymize dataset by removing/modifying identifying information"""
        anonymized_data = []
        
        for record in dataset:
            anonymized_record = record.copy()
            
            # Remove direct identifiers
            for field in identifier_fields:
                if field in anonymized_record:
                    del anonymized_record[field]
            
            # Generalize quasi-identifiers
            if quasi_identifier_fields:
                for field in quasi_identifier_fields:
                    if field in anonymized_record:
                        anonymized_record[field] = self.generalize_value(
                            anonymized_record[field], field
                        )
            
            anonymized_data.append(anonymized_record)
        
        return anonymized_data
    
    def generalize_value(self, value: Any, field_type: str) -> Any:
        """Generalize quasi-identifier values"""
        if field_type == "age":
            # Age ranges instead of exact age
            if isinstance(value, int):
                age_range = (value // 10) * 10
                return f"{age_range}-{age_range + 9}"
        elif field_type == "zip_code":
            # Truncate zip code
            if isinstance(value, str) and len(value) >= 5:
                return value[:3] + "XX"
        elif field_type == "date":
            # Month/year only instead of full date
            if hasattr(value, 'strftime'):
                return value.strftime("%Y-%m")
        
        return value
    
    def apply_differential_privacy(self, dataset: List[Dict], epsilon: float = 1.0) -> List[Dict]:
        """Apply differential privacy noise to numeric fields"""
        private_dataset = []
        
        for record in dataset:
            private_record = record.copy()
            
            for key, value in private_record.items():
                if isinstance(value, (int, float)):
                    # Add Laplace noise
                    noise = random.laplace(0, 1/epsilon)
                    private_record[key] = value + noise
            
            private_dataset.append(private_record)
        
        return private_dataset
    
    def implement_k_anonymity(self, dataset: List[Dict], 
                            quasi_identifiers: List[str], k: int = 5) -> List[Dict]:
        """Implement k-anonymity by generalizing quasi-identifiers"""
        # Group records by quasi-identifier combinations
        groups = {}
        
        for record in dataset:
            # Create key from quasi-identifiers
            key = tuple(record.get(field, "") for field in quasi_identifiers)
            if key not in groups:
                groups[key] = []
            groups[key].append(record)
        
        # Generalize groups with fewer than k records
        k_anonymous_data = []
        
        for key, records in groups.items():
            if len(records) < k:
                # Generalize these records further
                generalized_records = self.generalize_group(records, quasi_identifiers)
                k_anonymous_data.extend(generalized_records)
            else:
                k_anonymous_data.extend(records)
        
        return k_anonymous_data
    
    def generalize_group(self, records: List[Dict], quasi_identifiers: List[str]) -> List[Dict]:
        """Generalize a group of records to achieve k-anonymity"""
        if not records:
            return []
        
        # Create generalized version of all records in group
        generalized_records = []
        
        for record in records:
            generalized_record = record.copy()
            
            for field in quasi_identifiers:
                if field in generalized_record:
                    # Apply more aggressive generalization
                    generalized_record[field] = self.aggressive_generalize(
                        generalized_record[field], field
                    )
            
            generalized_records.append(generalized_record)
        
        return generalized_records
    
    def aggressive_generalize(self, value: Any, field_type: str) -> str:
        """Apply aggressive generalization for k-anonymity"""
        if field_type == "age":
            if isinstance(value, int):
                if value < 30:
                    return "Under 30"
                elif value < 50:
                    return "30-49"
                else:
                    return "50+"
        elif field_type == "income":
            if isinstance(value, (int, float)):
                if value < 50000:
                    return "Low"
                elif value < 100000:
                    return "Medium"
                else:
                    return "High"
        
        return "*"  # Suppress value if no specific rule

Best Practices

Data Protection Strategy

Comprehensive Data Protection Checklist

Security Monitoring

Continuous Security Monitoring

  • Monitor access patterns for anomalies

  • Track data access and modifications

  • Set up alerts for suspicious activities

  • Regular penetration testing

  • Automated vulnerability scanning

  • Security metrics dashboards

  • Incident response procedures

Compliance Alignment

Regulatory Compliance

  • Map data flows and processing activities

  • Maintain records of processing activities

  • Implement data subject rights procedures

  • Regular compliance assessments

  • Privacy impact assessments for new projects

  • Vendor security assessments

  • Cross-border data transfer safeguards

Next Steps

After implementing data protection:

  1. Security Assessment: Conduct comprehensive security review

  2. Compliance Validation: Ensure regulatory compliance

  3. Staff Training: Train team on data protection procedures

  4. Monitoring Enhancement: Implement advanced threat detection

  5. Regular Updates: Keep security measures current


Last updated