Data Protection
Comprehensive data protection strategies for BroxiAI applications including encryption, access controls, and privacy safeguards
Implement comprehensive data protection measures for your BroxiAI applications to safeguard sensitive information, ensure privacy compliance, and maintain customer trust.
Data Protection Overview
Core Principles
Data Protection Fundamentals
Data Protection Principles:
Data Minimization: "Collect only necessary data"
Purpose Limitation: "Use data only for stated purposes"
Data Quality: "Maintain accurate and up-to-date data"
Storage Limitation: "Retain data only as long as necessary"
Security: "Implement appropriate technical and organizational measures"
Accountability: "Demonstrate compliance with data protection principles"
Transparency: "Inform users about data processing"
User Rights: "Respect data subject rights and preferences"
Data Classification Framework
Data Classification Levels:
Public:
description: "Information that can be freely shared"
examples: ["Marketing materials", "Public documentation", "General product information"]
protection_level: "Basic"
Internal:
description: "Information for internal use only"
examples: ["Business processes", "Internal communications", "Operational data"]
protection_level: "Standard"
Confidential:
description: "Sensitive business information"
examples: ["Customer data", "Financial information", "Business strategies"]
protection_level: "Enhanced"
Restricted:
description: "Highly sensitive information"
examples: ["Personal data", "Payment information", "Health records", "Legal documents"]
protection_level: "Maximum"
Encryption Implementation
Data at Rest Encryption
Database Encryption
import cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class DatabaseEncryption:
def __init__(self, master_key=None):
self.master_key = master_key or self.generate_master_key()
self.fernet = Fernet(self.master_key)
def generate_master_key(self):
"""Generate encryption master key"""
return Fernet.generate_key()
def derive_key_from_password(self, password, salt=None):
"""Derive encryption key from password"""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key, salt
def encrypt_field(self, data):
"""Encrypt sensitive field data"""
if data is None:
return None
if isinstance(data, str):
data = data.encode('utf-8')
encrypted_data = self.fernet.encrypt(data)
return base64.urlsafe_b64encode(encrypted_data).decode('utf-8')
def decrypt_field(self, encrypted_data):
"""Decrypt sensitive field data"""
if encrypted_data is None:
return None
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
decrypted_data = self.fernet.decrypt(encrypted_bytes)
return decrypted_data.decode('utf-8')
except Exception as e:
raise DecryptionError(f"Failed to decrypt data: {str(e)}")
# SQLAlchemy encryption model
from sqlalchemy import Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
Base = declarative_base()
encryption_handler = DatabaseEncryption()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(100), nullable=False)
_email = Column('email', Text) # Encrypted field
_phone = Column('phone', Text) # Encrypted field
_ssn = Column('ssn', Text) # Encrypted field
created_at = Column(DateTime)
@hybrid_property
def email(self):
"""Decrypt email when accessed"""
return encryption_handler.decrypt_field(self._email)
@email.setter
def email(self, value):
"""Encrypt email when set"""
self._email = encryption_handler.encrypt_field(value)
@hybrid_property
def phone(self):
return encryption_handler.decrypt_field(self._phone)
@phone.setter
def phone(self, value):
self._phone = encryption_handler.encrypt_field(value)
@hybrid_property
def ssn(self):
return encryption_handler.decrypt_field(self._ssn)
@ssn.setter
def ssn(self, value):
self._ssn = encryption_handler.encrypt_field(value)
File System Encryption
import os
import shutil
from pathlib import Path
from cryptography.fernet import Fernet
class FileEncryption:
def __init__(self, key_file_path="encryption.key"):
self.key_file_path = key_file_path
self.key = self.load_or_generate_key()
self.fernet = Fernet(self.key)
def load_or_generate_key(self):
"""Load existing key or generate new one"""
if os.path.exists(self.key_file_path):
with open(self.key_file_path, 'rb') as key_file:
return key_file.read()
else:
key = Fernet.generate_key()
with open(self.key_file_path, 'wb') as key_file:
key_file.write(key)
# Secure key file permissions
os.chmod(self.key_file_path, 0o600)
return key
def encrypt_file(self, file_path, output_path=None):
"""Encrypt a file"""
if output_path is None:
output_path = f"{file_path}.encrypted"
with open(file_path, 'rb') as file:
file_data = file.read()
encrypted_data = self.fernet.encrypt(file_data)
with open(output_path, 'wb') as encrypted_file:
encrypted_file.write(encrypted_data)
# Secure file permissions
os.chmod(output_path, 0o600)
return output_path
def decrypt_file(self, encrypted_file_path, output_path=None):
"""Decrypt a file"""
if output_path is None:
output_path = encrypted_file_path.replace('.encrypted', '')
with open(encrypted_file_path, 'rb') as encrypted_file:
encrypted_data = encrypted_file.read()
decrypted_data = self.fernet.decrypt(encrypted_data)
with open(output_path, 'wb') as file:
file.write(decrypted_data)
return output_path
def encrypt_directory(self, directory_path, output_directory=None):
"""Encrypt all files in a directory"""
if output_directory is None:
output_directory = f"{directory_path}_encrypted"
Path(output_directory).mkdir(exist_ok=True)
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, directory_path)
output_file_path = os.path.join(output_directory, f"{relative_path}.encrypted")
# Create subdirectories if needed
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
self.encrypt_file(file_path, output_file_path)
return output_directory
Data in Transit Encryption
HTTPS/TLS Configuration
import ssl
import socket
from flask import Flask
from werkzeug.serving import make_ssl_devcert
class SecureFlaskApp:
def __init__(self, app_name):
self.app = Flask(app_name)
self.configure_ssl()
def configure_ssl(self):
"""Configure SSL/TLS settings"""
# Create SSL context
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
# Configure SSL options
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
context.options |= ssl.OP_SINGLE_DH_USE
context.options |= ssl.OP_SINGLE_ECDH_USE
# Set cipher suites (strong ciphers only)
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
# Load certificate and key
context.load_cert_chain('path/to/certificate.pem', 'path/to/private.key')
self.ssl_context = context
def run_secure(self, host='0.0.0.0', port=5000):
"""Run Flask app with SSL"""
self.app.run(
host=host,
port=port,
ssl_context=self.ssl_context,
debug=False
)
# API Client with TLS verification
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
class SecureAPIClient:
def __init__(self, base_url, verify_ssl=True):
self.base_url = base_url
self.session = requests.Session()
if verify_ssl:
self.configure_ssl_verification()
def configure_ssl_verification(self):
"""Configure strict SSL verification"""
class SSLAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False):
ctx = create_urllib3_context()
ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
self.poolmanager = urllib3.poolmanager.PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
ssl_context=ctx
)
self.session.mount('https://', SSLAdapter())
def make_secure_request(self, method, endpoint, **kwargs):
"""Make secure API request"""
url = f"{self.base_url}{endpoint}"
# Add security headers
headers = kwargs.get('headers', {})
headers.update({
'User-Agent': 'BroxiAI-SecureClient/1.0',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block'
})
kwargs['headers'] = headers
# Enforce SSL verification
kwargs['verify'] = True
kwargs['timeout'] = kwargs.get('timeout', 30)
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
Access Controls and Authentication
Multi-Factor Authentication
MFA Implementation
import pyotp
import qrcode
import io
import base64
from datetime import datetime, timedelta
import secrets
class MFAManager:
def __init__(self):
self.backup_codes_count = 10
self.totp_window = 1 # Allow 30-second window
def generate_totp_secret(self, user_email):
"""Generate TOTP secret for user"""
secret = pyotp.random_base32()
return {
"secret": secret,
"provisioning_uri": pyotp.totp.TOTP(secret).provisioning_uri(
name=user_email,
issuer_name="BroxiAI"
)
}
def generate_qr_code(self, provisioning_uri):
"""Generate QR code for TOTP setup"""
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convert to base64 for web display
buf = io.BytesIO()
img.save(buf, format='PNG')
img_str = base64.b64encode(buf.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
def verify_totp_token(self, secret, token):
"""Verify TOTP token"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=self.totp_window)
def generate_backup_codes(self):
"""Generate backup codes for account recovery"""
codes = []
for _ in range(self.backup_codes_count):
code = secrets.token_hex(4).upper() # 8-character hex code
codes.append(f"{code[:4]}-{code[4:]}") # Format: XXXX-XXXX
return codes
def verify_backup_code(self, user_backup_codes, provided_code):
"""Verify backup code and mark as used"""
formatted_code = provided_code.replace(" ", "").replace("-", "").upper()
for i, stored_code in enumerate(user_backup_codes):
stored_formatted = stored_code.replace("-", "").upper()
if stored_formatted == formatted_code:
# Mark code as used (remove from list)
user_backup_codes.pop(i)
return True, user_backup_codes
return False, user_backup_codes
# Flask MFA integration
from flask import session, request, jsonify
from functools import wraps
class FlaskMFAIntegration:
def __init__(self, app):
self.app = app
self.mfa_manager = MFAManager()
def require_mfa(self, f):
"""Decorator to require MFA for sensitive operations"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('mfa_verified'):
return jsonify({
"error": "MFA verification required",
"redirect": "/mfa/verify"
}), 403
# Check MFA session timeout (30 minutes)
mfa_timestamp = session.get('mfa_timestamp')
if mfa_timestamp:
if datetime.now() - datetime.fromisoformat(mfa_timestamp) > timedelta(minutes=30):
session.pop('mfa_verified', None)
session.pop('mfa_timestamp', None)
return jsonify({
"error": "MFA session expired",
"redirect": "/mfa/verify"
}), 403
return f(*args, **kwargs)
return decorated_function
def setup_mfa_routes(self):
"""Set up MFA routes"""
@self.app.route('/mfa/setup', methods=['POST'])
def setup_mfa():
user_email = session.get('user_email')
if not user_email:
return jsonify({"error": "Not authenticated"}), 401
totp_data = self.mfa_manager.generate_totp_secret(user_email)
backup_codes = self.mfa_manager.generate_backup_codes()
# Store secret in database (encrypted)
# store_user_mfa_secret(user_email, totp_data["secret"], backup_codes)
return jsonify({
"qr_code": self.mfa_manager.generate_qr_code(totp_data["provisioning_uri"]),
"backup_codes": backup_codes
})
@self.app.route('/mfa/verify', methods=['POST'])
def verify_mfa():
user_email = session.get('user_email')
token = request.json.get('token')
if not user_email or not token:
return jsonify({"error": "Missing credentials"}), 400
# Get user's MFA secret from database
# user_secret = get_user_mfa_secret(user_email)
if self.mfa_manager.verify_totp_token(user_secret, token):
session['mfa_verified'] = True
session['mfa_timestamp'] = datetime.now().isoformat()
return jsonify({"status": "MFA verified"})
else:
return jsonify({"error": "Invalid MFA token"}), 401
Role-Based Access Control (RBAC)
RBAC Implementation
from enum import Enum
from dataclasses import dataclass
from typing import List, Set
import json
class Permission(Enum):
# Workflow permissions
WORKFLOW_CREATE = "workflow.create"
WORKFLOW_READ = "workflow.read"
WORKFLOW_UPDATE = "workflow.update"
WORKFLOW_DELETE = "workflow.delete"
WORKFLOW_EXECUTE = "workflow.execute"
# Data permissions
DATA_READ = "data.read"
DATA_WRITE = "data.write"
DATA_DELETE = "data.delete"
DATA_EXPORT = "data.export"
# Admin permissions
USER_MANAGE = "user.manage"
ROLE_MANAGE = "role.manage"
SYSTEM_CONFIG = "system.config"
AUDIT_VIEW = "audit.view"
# API permissions
API_READ = "api.read"
API_WRITE = "api.write"
API_ADMIN = "api.admin"
@dataclass
class Role:
name: str
description: str
permissions: Set[Permission]
is_system_role: bool = False
class RBACManager:
def __init__(self):
self.roles = self.initialize_default_roles()
self.user_roles = {} # user_id -> set of role names
def initialize_default_roles(self):
"""Initialize default system roles"""
return {
"admin": Role(
name="admin",
description="Full system administrator",
permissions=set(Permission),
is_system_role=True
),
"developer": Role(
name="developer",
description="Workflow developer",
permissions={
Permission.WORKFLOW_CREATE,
Permission.WORKFLOW_READ,
Permission.WORKFLOW_UPDATE,
Permission.WORKFLOW_DELETE,
Permission.WORKFLOW_EXECUTE,
Permission.DATA_READ,
Permission.DATA_WRITE,
Permission.API_READ,
Permission.API_WRITE
},
is_system_role=True
),
"analyst": Role(
name="analyst",
description="Data analyst",
permissions={
Permission.WORKFLOW_READ,
Permission.WORKFLOW_EXECUTE,
Permission.DATA_READ,
Permission.DATA_EXPORT,
Permission.API_READ
},
is_system_role=True
),
"viewer": Role(
name="viewer",
description="Read-only access",
permissions={
Permission.WORKFLOW_READ,
Permission.DATA_READ,
Permission.API_READ
},
is_system_role=True
)
}
def create_role(self, name: str, description: str, permissions: List[Permission]):
"""Create custom role"""
if name in self.roles:
raise ValueError(f"Role {name} already exists")
self.roles[name] = Role(
name=name,
description=description,
permissions=set(permissions),
is_system_role=False
)
return self.roles[name]
def assign_role_to_user(self, user_id: str, role_name: str):
"""Assign role to user"""
if role_name not in self.roles:
raise ValueError(f"Role {role_name} does not exist")
if user_id not in self.user_roles:
self.user_roles[user_id] = set()
self.user_roles[user_id].add(role_name)
def remove_role_from_user(self, user_id: str, role_name: str):
"""Remove role from user"""
if user_id in self.user_roles:
self.user_roles[user_id].discard(role_name)
def get_user_permissions(self, user_id: str) -> Set[Permission]:
"""Get all permissions for a user"""
if user_id not in self.user_roles:
return set()
permissions = set()
for role_name in self.user_roles[user_id]:
if role_name in self.roles:
permissions.update(self.roles[role_name].permissions)
return permissions
def check_permission(self, user_id: str, permission: Permission) -> bool:
"""Check if user has specific permission"""
user_permissions = self.get_user_permissions(user_id)
return permission in user_permissions
def require_permission(self, permission: Permission):
"""Decorator to require specific permission"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
return jsonify({"error": "Authentication required"}), 401
if not self.check_permission(user_id, permission):
return jsonify({
"error": "Insufficient permissions",
"required_permission": permission.value
}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# Usage example
rbac = RBACManager()
# Protect API endpoints
@app.route('/api/workflows', methods=['POST'])
@rbac.require_permission(Permission.WORKFLOW_CREATE)
def create_workflow():
# Only users with WORKFLOW_CREATE permission can access
pass
@app.route('/api/data/export', methods=['GET'])
@rbac.require_permission(Permission.DATA_EXPORT)
def export_data():
# Only users with DATA_EXPORT permission can access
pass
Data Loss Prevention (DLP)
Sensitive Data Detection
PII Detection and Masking
import re
import hashlib
from typing import Dict, List, Tuple
class SensitiveDataDetector:
def __init__(self):
self.patterns = self.initialize_patterns()
self.risk_scores = {
"ssn": 10,
"credit_card": 10,
"email": 5,
"phone": 4,
"ip_address": 3,
"url": 2
}
def initialize_patterns(self):
"""Initialize regex patterns for sensitive data"""
return {
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b|\b\d{9}\b'),
"credit_card": re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
"email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"phone": re.compile(r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'),
"ip_address": re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),
"url": re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+'),
"api_key": re.compile(r'\b[A-Za-z0-9]{32,}\b'),
"passport": re.compile(r'\b[A-Z]{1,2}[0-9]{6,9}\b'),
"driver_license": re.compile(r'\b[A-Z]{1,2}[0-9]{6,8}\b')
}
def scan_text(self, text: str) -> Dict:
"""Scan text for sensitive data"""
findings = []
risk_score = 0
for data_type, pattern in self.patterns.items():
matches = pattern.findall(text)
if matches:
for match in matches:
findings.append({
"type": data_type,
"value": match,
"start": text.find(match),
"end": text.find(match) + len(match),
"risk_score": self.risk_scores.get(data_type, 1)
})
risk_score += self.risk_scores.get(data_type, 1)
return {
"findings": findings,
"total_risk_score": risk_score,
"risk_level": self.calculate_risk_level(risk_score),
"contains_pii": risk_score > 0
}
def calculate_risk_level(self, score: int) -> str:
"""Calculate risk level based on score"""
if score >= 20:
return "CRITICAL"
elif score >= 10:
return "HIGH"
elif score >= 5:
return "MEDIUM"
elif score > 0:
return "LOW"
else:
return "NONE"
def mask_sensitive_data(self, text: str, mask_char: str = "*") -> str:
"""Mask sensitive data in text"""
masked_text = text
for data_type, pattern in self.patterns.items():
if data_type in ["ssn", "credit_card", "api_key"]:
# Full masking for highly sensitive data
masked_text = pattern.sub(
lambda m: mask_char * len(m.group()),
masked_text
)
elif data_type in ["email", "phone"]:
# Partial masking
masked_text = pattern.sub(
lambda m: self.partial_mask(m.group(), data_type, mask_char),
masked_text
)
return masked_text
def partial_mask(self, value: str, data_type: str, mask_char: str) -> str:
"""Apply partial masking to preserve some readability"""
if data_type == "email":
if "@" in value:
local, domain = value.split("@", 1)
if len(local) > 2:
masked_local = local[0] + mask_char * (len(local) - 2) + local[-1]
return f"{masked_local}@{domain}"
elif data_type == "phone":
if len(value) >= 7:
return value[:3] + mask_char * (len(value) - 6) + value[-3:]
return value
def anonymize_data(self, text: str) -> str:
"""Replace sensitive data with anonymized placeholders"""
anonymized_text = text
replacement_map = {
"ssn": "[SSN-REDACTED]",
"credit_card": "[CARD-REDACTED]",
"email": "[EMAIL-REDACTED]",
"phone": "[PHONE-REDACTED]",
"api_key": "[API-KEY-REDACTED]"
}
for data_type, pattern in self.patterns.items():
if data_type in replacement_map:
anonymized_text = pattern.sub(
replacement_map[data_type],
anonymized_text
)
return anonymized_text
class DataLossPreventionEngine:
def __init__(self):
self.detector = SensitiveDataDetector()
self.policies = self.load_dlp_policies()
self.audit_logger = AuditLogger()
def load_dlp_policies(self):
"""Load DLP policies"""
return {
"prevent_pii_in_logs": {
"enabled": True,
"action": "mask",
"risk_threshold": 5
},
"block_high_risk_data": {
"enabled": True,
"action": "block",
"risk_threshold": 15
},
"audit_all_sensitive_data": {
"enabled": True,
"action": "audit",
"risk_threshold": 1
}
}
def process_data(self, data: str, context: Dict) -> Dict:
"""Process data through DLP engine"""
scan_result = self.detector.scan_text(data)
# Apply policies
actions_taken = []
processed_data = data
for policy_name, policy in self.policies.items():
if not policy["enabled"]:
continue
if scan_result["total_risk_score"] >= policy["risk_threshold"]:
action = policy["action"]
if action == "mask":
processed_data = self.detector.mask_sensitive_data(processed_data)
actions_taken.append("masked")
elif action == "block":
actions_taken.append("blocked")
return {
"blocked": True,
"reason": "High-risk sensitive data detected",
"risk_score": scan_result["total_risk_score"],
"findings": scan_result["findings"]
}
elif action == "audit":
self.audit_logger.log_sensitive_data_access(
data=data,
findings=scan_result["findings"],
context=context
)
actions_taken.append("audited")
return {
"blocked": False,
"processed_data": processed_data,
"original_risk_score": scan_result["total_risk_score"],
"risk_level": scan_result["risk_level"],
"actions_taken": actions_taken,
"findings": scan_result["findings"]
}
Data Retention and Deletion
Automated Data Lifecycle Management
Data Retention Policies
from datetime import datetime, timedelta
from enum import Enum
import schedule
import time
class RetentionPeriod(Enum):
DAYS_30 = 30
DAYS_90 = 90
DAYS_180 = 180
YEAR_1 = 365
YEARS_3 = 1095
YEARS_7 = 2555
YEARS_10 = 3650
class DataCategory(Enum):
USER_CONVERSATIONS = "user_conversations"
SYSTEM_LOGS = "system_logs"
AUDIT_LOGS = "audit_logs"
PERSONAL_DATA = "personal_data"
BUSINESS_DATA = "business_data"
TEMP_FILES = "temp_files"
class DataRetentionManager:
def __init__(self):
self.retention_policies = self.initialize_retention_policies()
self.deletion_queue = []
def initialize_retention_policies(self):
"""Initialize data retention policies"""
return {
DataCategory.USER_CONVERSATIONS: {
"retention_period": RetentionPeriod.YEARS_3,
"archive_after": RetentionPeriod.YEAR_1,
"deletion_method": "secure_delete",
"requires_approval": False
},
DataCategory.SYSTEM_LOGS: {
"retention_period": RetentionPeriod.DAYS_90,
"archive_after": RetentionPeriod.DAYS_30,
"deletion_method": "standard_delete",
"requires_approval": False
},
DataCategory.AUDIT_LOGS: {
"retention_period": RetentionPeriod.YEARS_7,
"archive_after": RetentionPeriod.YEAR_1,
"deletion_method": "secure_delete",
"requires_approval": True
},
DataCategory.PERSONAL_DATA: {
"retention_period": RetentionPeriod.YEARS_3,
"archive_after": RetentionPeriod.DAYS_180,
"deletion_method": "secure_delete",
"requires_approval": True
},
DataCategory.TEMP_FILES: {
"retention_period": RetentionPeriod.DAYS_30,
"archive_after": None,
"deletion_method": "standard_delete",
"requires_approval": False
}
}
def schedule_deletion(self, data_id: str, category: DataCategory, created_date: datetime):
"""Schedule data for deletion based on retention policy"""
policy = self.retention_policies[category]
deletion_date = created_date + timedelta(days=policy["retention_period"].value)
if policy["archive_after"]:
archive_date = created_date + timedelta(days=policy["archive_after"].value)
else:
archive_date = None
deletion_item = {
"data_id": data_id,
"category": category,
"created_date": created_date,
"archive_date": archive_date,
"deletion_date": deletion_date,
"deletion_method": policy["deletion_method"],
"requires_approval": policy["requires_approval"],
"status": "scheduled"
}
self.deletion_queue.append(deletion_item)
return deletion_item
def process_deletion_queue(self):
"""Process items in deletion queue"""
current_date = datetime.utcnow()
for item in self.deletion_queue[:]: # Copy list to avoid modification during iteration
# Check for archival
if item["archive_date"] and current_date >= item["archive_date"] and item["status"] == "scheduled":
self.archive_data(item)
item["status"] = "archived"
# Check for deletion
if current_date >= item["deletion_date"]:
if item["requires_approval"] and item["status"] != "approved":
self.request_deletion_approval(item)
else:
self.delete_data(item)
self.deletion_queue.remove(item)
def archive_data(self, deletion_item):
"""Archive data to long-term storage"""
data_id = deletion_item["data_id"]
category = deletion_item["category"]
# Implementation depends on your storage system
# This could involve moving data to cheaper storage tier
print(f"Archiving {category.value} data: {data_id}")
# Log archival action
self.log_data_lifecycle_event(
data_id=data_id,
action="archived",
category=category,
timestamp=datetime.utcnow()
)
def delete_data(self, deletion_item):
"""Delete data according to deletion method"""
data_id = deletion_item["data_id"]
category = deletion_item["category"]
method = deletion_item["deletion_method"]
if method == "secure_delete":
self.secure_delete(data_id, category)
else:
self.standard_delete(data_id, category)
# Log deletion action
self.log_data_lifecycle_event(
data_id=data_id,
action="deleted",
category=category,
method=method,
timestamp=datetime.utcnow()
)
def secure_delete(self, data_id: str, category: DataCategory):
"""Perform secure deletion with multiple overwrites"""
# Implementation for secure deletion
# - Overwrite data multiple times with random data
# - Remove from all backups
# - Clear from caches
# - Update indexes
print(f"Securely deleting {category.value} data: {data_id}")
# Example: Database secure deletion
# 1. Overwrite sensitive fields with random data
# 2. Delete record
# 3. Run VACUUM/optimization to ensure space is reclaimed
def standard_delete(self, data_id: str, category: DataCategory):
"""Perform standard deletion"""
print(f"Deleting {category.value} data: {data_id}")
# Standard deletion implementation
# - Remove from primary storage
# - Clear from caches
# - Update indexes
def request_deletion_approval(self, deletion_item):
"""Request approval for data deletion"""
# Send notification to data protection officer or admin
approval_request = {
"data_id": deletion_item["data_id"],
"category": deletion_item["category"].value,
"scheduled_deletion_date": deletion_item["deletion_date"],
"reason": "Retention period expired",
"requested_by": "automated_system",
"requested_at": datetime.utcnow()
}
# Implementation would send email, create ticket, etc.
print(f"Deletion approval requested for {deletion_item['data_id']}")
deletion_item["status"] = "approval_requested"
def setup_automated_cleanup(self):
"""Set up automated cleanup schedule"""
# Schedule daily cleanup
schedule.every().day.at("02:00").do(self.process_deletion_queue)
# Schedule weekly retention policy review
schedule.every().week.do(self.review_retention_policies)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(3600) # Check every hour
def user_data_deletion_request(self, user_id: str, reason: str = "user_request"):
"""Handle user's request to delete their data"""
# Find all data associated with user
user_data_items = self.find_user_data(user_id)
deletion_summary = {
"user_id": user_id,
"request_date": datetime.utcnow(),
"reason": reason,
"items_found": len(user_data_items),
"items_deleted": 0,
"items_requiring_retention": 0,
"deletion_details": []
}
for data_item in user_data_items:
# Check if data can be deleted (legal holds, audit requirements, etc.)
if self.can_delete_immediately(data_item):
self.delete_data_immediately(data_item)
deletion_summary["items_deleted"] += 1
deletion_summary["deletion_details"].append({
"data_type": data_item["type"],
"action": "deleted",
"reason": "user_request"
})
else:
# Mark for deletion when retention period allows
retention_reason = self.get_retention_reason(data_item)
deletion_summary["items_requiring_retention"] += 1
deletion_summary["deletion_details"].append({
"data_type": data_item["type"],
"action": "scheduled_for_deletion",
"reason": retention_reason,
"deletion_date": self.calculate_earliest_deletion_date(data_item)
})
return deletion_summary
Privacy Engineering
Privacy by Design Implementation
Privacy-Preserving Data Processing
import hashlib
import hmac
import random
from typing import List, Dict, Any
class PrivacyEngineer:
def __init__(self, secret_key: str):
self.secret_key = secret_key.encode('utf-8')
def pseudonymize_identifier(self, identifier: str, salt: str = None) -> str:
"""Create consistent pseudonym for identifier"""
if salt is None:
salt = "default_salt"
# Use HMAC for consistent pseudonymization
h = hmac.new(self.secret_key, f"{identifier}{salt}".encode('utf-8'), hashlib.sha256)
return h.hexdigest()[:16] # Return first 16 characters
def anonymize_dataset(self, dataset: List[Dict], identifier_fields: List[str],
quasi_identifier_fields: List[str] = None) -> List[Dict]:
"""Anonymize dataset by removing/modifying identifying information"""
anonymized_data = []
for record in dataset:
anonymized_record = record.copy()
# Remove direct identifiers
for field in identifier_fields:
if field in anonymized_record:
del anonymized_record[field]
# Generalize quasi-identifiers
if quasi_identifier_fields:
for field in quasi_identifier_fields:
if field in anonymized_record:
anonymized_record[field] = self.generalize_value(
anonymized_record[field], field
)
anonymized_data.append(anonymized_record)
return anonymized_data
def generalize_value(self, value: Any, field_type: str) -> Any:
"""Generalize quasi-identifier values"""
if field_type == "age":
# Age ranges instead of exact age
if isinstance(value, int):
age_range = (value // 10) * 10
return f"{age_range}-{age_range + 9}"
elif field_type == "zip_code":
# Truncate zip code
if isinstance(value, str) and len(value) >= 5:
return value[:3] + "XX"
elif field_type == "date":
# Month/year only instead of full date
if hasattr(value, 'strftime'):
return value.strftime("%Y-%m")
return value
def apply_differential_privacy(self, dataset: List[Dict], epsilon: float = 1.0) -> List[Dict]:
"""Apply differential privacy noise to numeric fields"""
private_dataset = []
for record in dataset:
private_record = record.copy()
for key, value in private_record.items():
if isinstance(value, (int, float)):
# Add Laplace noise
noise = random.laplace(0, 1/epsilon)
private_record[key] = value + noise
private_dataset.append(private_record)
return private_dataset
def implement_k_anonymity(self, dataset: List[Dict],
quasi_identifiers: List[str], k: int = 5) -> List[Dict]:
"""Implement k-anonymity by generalizing quasi-identifiers"""
# Group records by quasi-identifier combinations
groups = {}
for record in dataset:
# Create key from quasi-identifiers
key = tuple(record.get(field, "") for field in quasi_identifiers)
if key not in groups:
groups[key] = []
groups[key].append(record)
# Generalize groups with fewer than k records
k_anonymous_data = []
for key, records in groups.items():
if len(records) < k:
# Generalize these records further
generalized_records = self.generalize_group(records, quasi_identifiers)
k_anonymous_data.extend(generalized_records)
else:
k_anonymous_data.extend(records)
return k_anonymous_data
def generalize_group(self, records: List[Dict], quasi_identifiers: List[str]) -> List[Dict]:
"""Generalize a group of records to achieve k-anonymity"""
if not records:
return []
# Create generalized version of all records in group
generalized_records = []
for record in records:
generalized_record = record.copy()
for field in quasi_identifiers:
if field in generalized_record:
# Apply more aggressive generalization
generalized_record[field] = self.aggressive_generalize(
generalized_record[field], field
)
generalized_records.append(generalized_record)
return generalized_records
def aggressive_generalize(self, value: Any, field_type: str) -> str:
"""Apply aggressive generalization for k-anonymity"""
if field_type == "age":
if isinstance(value, int):
if value < 30:
return "Under 30"
elif value < 50:
return "30-49"
else:
return "50+"
elif field_type == "income":
if isinstance(value, (int, float)):
if value < 50000:
return "Low"
elif value < 100000:
return "Medium"
else:
return "High"
return "*" # Suppress value if no specific rule
Best Practices
Data Protection Strategy
Comprehensive Data Protection Checklist
Security Monitoring
Continuous Security Monitoring
Monitor access patterns for anomalies
Track data access and modifications
Set up alerts for suspicious activities
Regular penetration testing
Automated vulnerability scanning
Security metrics dashboards
Incident response procedures
Compliance Alignment
Regulatory Compliance
Map data flows and processing activities
Maintain records of processing activities
Implement data subject rights procedures
Regular compliance assessments
Privacy impact assessments for new projects
Vendor security assessments
Cross-border data transfer safeguards
Next Steps
After implementing data protection:
Security Assessment: Conduct comprehensive security review
Compliance Validation: Ensure regulatory compliance
Staff Training: Train team on data protection procedures
Monitoring Enhancement: Implement advanced threat detection
Regular Updates: Keep security measures current
Related Guides
Security Overview: Overall security framework
Compliance: Regulatory compliance
Incident Response: Security incident procedures
Comprehensive data protection is essential for maintaining customer trust and regulatory compliance. Implement these measures systematically and maintain them through regular audits and updates.
Last updated