Google Cloud Platform
Integrate BroxiAI with Google Cloud Platform services for scalable AI applications
Learn how to integrate BroxiAI workflows with Google Cloud Platform (GCP) services for enhanced AI capabilities, global scale, and enterprise-grade infrastructure.
Overview
BroxiAI integrates seamlessly with GCP services to provide:
Advanced AI and machine learning capabilities
Global infrastructure and edge computing
Enterprise security and compliance
Cost-effective scaling and resource management
Data analytics and intelligence platforms
Core GCP Services Integration
Vertex AI
Connect BroxiAI to Google's unified AI platform for enhanced machine learning capabilities.
Configuration
{
"provider": "vertex_ai",
"project_id": "your-gcp-project",
"location": "us-central1",
"credentials_path": "${GOOGLE_APPLICATION_CREDENTIALS}",
"models": {
"text": "text-bison@001",
"chat": "chat-bison@001",
"code": "code-bison@001",
"embeddings": "textembedding-gecko@001"
}
}
Supported Models
PaLM 2 for Text: text-bison, text-bison-32k
PaLM 2 for Chat: chat-bison, chat-bison-32k
Codey for Code: code-bison, codechat-bison
Embedding Models: textembedding-gecko
Imagen: imagen-editor, imagen-captioner
Vertex AI Integration Example
from google.cloud import aiplatform
from google.auth import default
class VertexAIIntegration:
def __init__(self, project_id, location="us-central1"):
self.project_id = project_id
self.location = location
# Initialize Vertex AI
credentials, _ = default()
aiplatform.init(
project=project_id,
location=location,
credentials=credentials
)
def generate_text(self, prompt, model="text-bison@001", **parameters):
"""Generate text using Vertex AI"""
from vertexai.language_models import TextGenerationModel
model = TextGenerationModel.from_pretrained(model)
response = model.predict(
prompt,
temperature=parameters.get('temperature', 0.2),
max_output_tokens=parameters.get('max_tokens', 1024),
top_k=parameters.get('top_k', 40),
top_p=parameters.get('top_p', 0.8)
)
return {
"text": response.text,
"safety_attributes": response.safety_attributes,
"token_count": len(response.text.split())
}
def generate_embeddings(self, texts, model="textembedding-gecko@001"):
"""Generate embeddings using Vertex AI"""
from vertexai.language_models import TextEmbeddingModel
model = TextEmbeddingModel.from_pretrained(model)
embeddings = model.get_embeddings(texts)
return [
{
"text": text,
"embedding": embedding.values,
"dimension": len(embedding.values)
}
for text, embedding in zip(texts, embeddings)
]
def create_custom_model(self, training_data_uri, model_display_name):
"""Create custom model with Vertex AI"""
from google.cloud import aiplatform
job = aiplatform.CustomTrainingJob(
display_name=f"broxi-custom-{model_display_name}",
script_path="training_script.py",
container_uri="gcr.io/cloud-aiplatform/training/tf-enterprise-2.8-gpu:latest",
requirements=["transformers", "datasets"],
model_serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-8:latest"
)
model = job.run(
dataset=training_data_uri,
replica_count=1,
machine_type="n1-standard-4",
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1
)
return model
Cloud Storage
Store and process documents, datasets, and model artifacts.
Use Cases
Document storage for RAG applications
Training data management
Model artifact storage
Backup and archival
Data lake implementation
Integration Example
from google.cloud import storage
import io
class GCSIntegration:
def __init__(self, project_id):
self.client = storage.Client(project=project_id)
def upload_document(self, bucket_name, file_path, destination_blob_name):
"""Upload document to Google Cloud Storage"""
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
# Upload file
blob.upload_from_filename(file_path)
# Set metadata
blob.metadata = {
"uploaded_by": "broxi_ai",
"upload_timestamp": datetime.utcnow().isoformat(),
"content_type": blob.content_type
}
blob.patch()
return {
"bucket": bucket_name,
"blob_name": destination_blob_name,
"size": blob.size,
"public_url": blob.public_url,
"metadata": blob.metadata
}
def process_documents_batch(self, bucket_name, prefix="documents/"):
"""Batch process documents from GCS"""
bucket = self.client.bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)
processed_docs = []
for blob in blobs:
if blob.name.endswith(('.pdf', '.docx', '.txt')):
# Download document content
content = blob.download_as_text()
# Process with BroxiAI
processed_content = self.process_with_broxi(content)
# Store processed version
processed_blob_name = f"processed/{blob.name}"
processed_blob = bucket.blob(processed_blob_name)
processed_blob.upload_from_string(
processed_content,
content_type="text/plain"
)
processed_docs.append({
"original": blob.name,
"processed": processed_blob_name,
"size": len(processed_content)
})
return processed_docs
def setup_lifecycle_management(self, bucket_name):
"""Set up lifecycle management for cost optimization"""
bucket = self.client.bucket(bucket_name)
# Define lifecycle rules
lifecycle_rule = {
"action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
"condition": {"age": 30} # Move to coldline after 30 days
}
archive_rule = {
"action": {"type": "SetStorageClass", "storageClass": "ARCHIVE"},
"condition": {"age": 365} # Move to archive after 1 year
}
delete_rule = {
"action": {"type": "Delete"},
"condition": {"age": 2555} # Delete after 7 years
}
bucket.lifecycle_rules = [lifecycle_rule, archive_rule, delete_rule]
bucket.patch()
return "Lifecycle management configured"
Cloud Firestore
NoSQL database for real-time applications and session management.
Configuration
from google.cloud import firestore
from google.auth import default
class FirestoreIntegration:
def __init__(self, project_id):
credentials, _ = default()
self.db = firestore.Client(project=project_id, credentials=credentials)
def store_conversation(self, user_id, session_id, messages):
"""Store conversation history in Firestore"""
doc_ref = self.db.collection('conversations').document(f"{user_id}_{session_id}")
conversation_data = {
"user_id": user_id,
"session_id": session_id,
"messages": messages,
"created_at": firestore.SERVER_TIMESTAMP,
"updated_at": firestore.SERVER_TIMESTAMP,
"message_count": len(messages)
}
doc_ref.set(conversation_data, merge=True)
return doc_ref.id
def get_conversation_history(self, user_id, limit=10):
"""Retrieve conversation history for user"""
conversations_ref = self.db.collection('conversations')
query = conversations_ref.where('user_id', '==', user_id)\
.order_by('updated_at', direction=firestore.Query.DESCENDING)\
.limit(limit)
conversations = []
for doc in query.stream():
data = doc.to_dict()
data['id'] = doc.id
conversations.append(data)
return conversations
def store_user_preferences(self, user_id, preferences):
"""Store user preferences and settings"""
doc_ref = self.db.collection('user_preferences').document(user_id)
preference_data = {
"user_id": user_id,
"preferences": preferences,
"updated_at": firestore.SERVER_TIMESTAMP
}
doc_ref.set(preference_data, merge=True)
return "Preferences stored successfully"
def real_time_listener(self, collection_name, callback):
"""Set up real-time listener for collection changes"""
def on_snapshot(collection_snapshot, changes, read_time):
for change in changes:
if change.type.name == 'ADDED':
callback('added', change.document.to_dict())
elif change.type.name == 'MODIFIED':
callback('modified', change.document.to_dict())
elif change.type.name == 'REMOVED':
callback('removed', change.document.to_dict())
collection_ref = self.db.collection(collection_name)
collection_watch = collection_ref.on_snapshot(on_snapshot)
return collection_watch
Cloud Functions
Serverless computing for event-driven BroxiAI integrations.
Function Configuration
# main.py for Cloud Function
import functions_framework
import requests
import json
import os
from google.cloud import firestore
@functions_framework.http
def broxi_webhook_handler(request):
"""Cloud Function to handle BroxiAI webhooks"""
# Verify webhook signature
if not verify_webhook_signature(request):
return {"error": "Invalid signature"}, 401
# Parse webhook data
webhook_data = request.get_json()
event_type = webhook_data.get('event')
try:
if event_type == 'workflow.completed':
handle_workflow_completion(webhook_data)
elif event_type == 'user.message.received':
handle_user_message(webhook_data)
else:
print(f"Unhandled event type: {event_type}")
return {"status": "success"}, 200
except Exception as e:
print(f"Error processing webhook: {e}")
return {"error": "Processing failed"}, 500
def handle_workflow_completion(data):
"""Handle workflow completion event"""
# Store results in Firestore
db = firestore.Client()
doc_ref = db.collection('workflow_results').document()
doc_ref.set({
"workflow_id": data['data']['workflow_id'],
"run_id": data['data']['run_id'],
"output": data['data']['output'],
"execution_time": data['data']['execution_time'],
"timestamp": firestore.SERVER_TIMESTAMP
})
# Trigger follow-up actions
if data['data']['workflow_id'] == 'document_analysis':
trigger_report_generation(data['data']['output'])
@functions_framework.cloud_event
def process_gcs_upload(cloud_event):
"""Process file uploads to Cloud Storage"""
# Get file information
bucket_name = cloud_event.data['bucket']
file_name = cloud_event.data['name']
if file_name.endswith(('.pdf', '.docx', '.txt')):
# Trigger BroxiAI document processing
response = requests.post(
"https://api.broxi.ai/v1/flows/document-processor/run",
headers={
"Authorization": f"Bearer {os.environ['BROXI_API_TOKEN']}",
"Content-Type": "application/json"
},
json={
"input": f"Process document from gs://{bucket_name}/{file_name}",
"variables": {
"bucket": bucket_name,
"file_path": file_name
}
}
)
if response.status_code == 200:
print(f"Document processing initiated for {file_name}")
else:
print(f"Failed to process document: {response.text}")
# requirements.txt
"""
functions-framework>=3.0.0
google-cloud-firestore>=2.0.0
google-cloud-storage>=2.0.0
requests>=2.25.0
"""
# Deploy command
"""
gcloud functions deploy broxi-webhook-handler \
--runtime python311 \
--trigger-http \
--allow-unauthenticated \
--set-env-vars BROXI_API_TOKEN=your_token
"""
BigQuery
Data warehouse and analytics platform for workflow insights.
BigQuery Integration
from google.cloud import bigquery
from google.auth import default
import pandas as pd
class BigQueryIntegration:
def __init__(self, project_id):
credentials, _ = default()
self.client = bigquery.Client(project=project_id, credentials=credentials)
self.project_id = project_id
def create_workflow_analytics_dataset(self):
"""Create dataset for workflow analytics"""
dataset_id = f"{self.project_id}.broxi_analytics"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset.description = "BroxiAI workflow analytics and metrics"
# Create dataset
dataset = self.client.create_dataset(dataset, exists_ok=True)
# Create tables
self.create_workflow_executions_table()
self.create_user_interactions_table()
self.create_performance_metrics_table()
return f"Dataset {dataset_id} created successfully"
def create_workflow_executions_table(self):
"""Create table for workflow execution data"""
table_id = f"{self.project_id}.broxi_analytics.workflow_executions"
schema = [
bigquery.SchemaField("execution_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("workflow_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("user_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("input_text", "STRING", mode="NULLABLE"),
bigquery.SchemaField("output_text", "STRING", mode="NULLABLE"),
bigquery.SchemaField("execution_time", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("token_usage", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("status", "STRING", mode="REQUIRED"),
bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("metadata", "JSON", mode="NULLABLE")
]
table = bigquery.Table(table_id, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="timestamp"
)
table = self.client.create_table(table, exists_ok=True)
return f"Table {table_id} created successfully"
def insert_workflow_execution(self, execution_data):
"""Insert workflow execution data"""
table_id = f"{self.project_id}.broxi_analytics.workflow_executions"
table = self.client.get_table(table_id)
rows_to_insert = [execution_data]
errors = self.client.insert_rows_json(table, rows_to_insert)
if errors:
print(f"Errors inserting data: {errors}")
return False
return True
def analyze_workflow_performance(self, start_date, end_date):
"""Analyze workflow performance over time"""
query = f"""
SELECT
workflow_id,
COUNT(*) as total_executions,
AVG(execution_time) as avg_execution_time,
PERCENTILE_CONT(execution_time, 0.5) OVER() as median_execution_time,
PERCENTILE_CONT(execution_time, 0.95) OVER() as p95_execution_time,
SUM(token_usage) as total_tokens,
SUM(cost) as total_cost,
COUNT(CASE WHEN status = 'completed' THEN 1 END) as successful_executions,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_executions
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE timestamp BETWEEN '{start_date}' AND '{end_date}'
GROUP BY workflow_id
ORDER BY total_executions DESC
"""
query_job = self.client.query(query)
results = query_job.result()
return [dict(row) for row in results]
def create_usage_dashboard(self):
"""Create usage analytics dashboard queries"""
queries = {
"daily_usage": f"""
SELECT
DATE(timestamp) as date,
COUNT(*) as executions,
SUM(token_usage) as tokens,
SUM(cost) as cost,
AVG(execution_time) as avg_response_time
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY DATE(timestamp)
ORDER BY date DESC
""",
"top_users": f"""
SELECT
user_id,
COUNT(*) as total_requests,
SUM(cost) as total_cost,
AVG(execution_time) as avg_response_time
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY user_id
ORDER BY total_requests DESC
LIMIT 10
""",
"error_analysis": f"""
SELECT
workflow_id,
error_message,
COUNT(*) as error_count,
MAX(timestamp) as last_occurrence
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE status = 'failed'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
GROUP BY workflow_id, error_message
ORDER BY error_count DESC
"""
}
return queries
Pub/Sub
Message queuing and event streaming for scalable architectures.
Pub/Sub Integration
from google.cloud import pubsub_v1
from concurrent.futures import ThreadPoolExecutor
import json
class PubSubIntegration:
def __init__(self, project_id):
self.project_id = project_id
self.publisher = pubsub_v1.PublisherClient()
self.subscriber = pubsub_v1.SubscriberClient()
def create_broxi_topics(self):
"""Create Pub/Sub topics for BroxiAI events"""
topics = [
"broxi-workflow-requests",
"broxi-workflow-results",
"broxi-user-events",
"broxi-system-alerts"
]
created_topics = []
for topic_name in topics:
topic_path = self.publisher.topic_path(self.project_id, topic_name)
try:
topic = self.publisher.create_topic(request={"name": topic_path})
created_topics.append(topic.name)
except Exception as e:
if "already exists" in str(e).lower():
created_topics.append(topic_path)
else:
print(f"Error creating topic {topic_name}: {e}")
return created_topics
def publish_workflow_request(self, workflow_id, input_data, user_id):
"""Publish workflow request to Pub/Sub"""
topic_path = self.publisher.topic_path(self.project_id, "broxi-workflow-requests")
message_data = {
"workflow_id": workflow_id,
"input": input_data,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"request_id": str(uuid.uuid4())
}
# Publish message
data = json.dumps(message_data).encode("utf-8")
future = self.publisher.publish(topic_path, data)
return future.result()
def setup_workflow_processor(self):
"""Set up subscriber to process workflow requests"""
subscription_path = self.subscriber.subscription_path(
self.project_id, "broxi-workflow-processor"
)
def callback(message):
try:
# Parse message
message_data = json.loads(message.data.decode("utf-8"))
# Process workflow request
result = self.process_workflow_request(message_data)
# Publish result
self.publish_workflow_result(message_data["request_id"], result)
# Acknowledge message
message.ack()
except Exception as e:
print(f"Error processing message: {e}")
message.nack()
# Start subscriber
streaming_pull_future = self.subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=pubsub_v1.types.FlowControl(max_messages=100)
)
return streaming_pull_future
def process_workflow_request(self, request_data):
"""Process workflow request via BroxiAI API"""
response = requests.post(
f"https://api.broxi.ai/v1/flows/{request_data['workflow_id']}/run",
headers={
"Authorization": f"Bearer {os.environ['BROXI_API_TOKEN']}",
"Content-Type": "application/json"
},
json={
"input": request_data["input"],
"variables": {
"user_id": request_data["user_id"],
"request_id": request_data["request_id"]
}
}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Workflow execution failed: {response.text}")
def publish_workflow_result(self, request_id, result):
"""Publish workflow result"""
topic_path = self.publisher.topic_path(self.project_id, "broxi-workflow-results")
result_data = {
"request_id": request_id,
"result": result,
"timestamp": datetime.utcnow().isoformat()
}
data = json.dumps(result_data).encode("utf-8")
future = self.publisher.publish(topic_path, data)
return future.result()
Advanced GCP Integrations
Cloud Run
Deploy BroxiAI integration services as containerized applications.
Cloud Run Service
# app.py
from flask import Flask, request, jsonify
import os
from google.cloud import firestore, storage
import requests
app = Flask(__name__)
# Initialize GCP clients
db = firestore.Client()
storage_client = storage.Client()
@app.route('/process-document', methods=['POST'])
def process_document():
"""Process document with BroxiAI and store results"""
try:
# Get request data
data = request.get_json()
document_url = data.get('document_url')
user_id = data.get('user_id')
# Download document from GCS
bucket_name, blob_name = parse_gcs_url(document_url)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
document_content = blob.download_as_text()
# Process with BroxiAI
broxi_response = requests.post(
"https://api.broxi.ai/v1/flows/document-analyzer/run",
headers={
"Authorization": f"Bearer {os.environ['BROXI_API_TOKEN']}",
"Content-Type": "application/json"
},
json={
"input": document_content,
"variables": {
"document_url": document_url,
"user_id": user_id
}
}
)
if broxi_response.status_code == 200:
result = broxi_response.json()
# Store result in Firestore
doc_ref = db.collection('document_analysis').document()
doc_ref.set({
"user_id": user_id,
"document_url": document_url,
"analysis_result": result["output"],
"processing_time": result["execution_time"],
"timestamp": firestore.SERVER_TIMESTAMP
})
return jsonify({
"status": "success",
"analysis_id": doc_ref.id,
"result": result["output"]
})
else:
return jsonify({
"status": "error",
"message": "BroxiAI processing failed"
}), 500
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["python", "app.py"]
Deploy to Cloud Run
# Build and deploy
gcloud run deploy broxi-document-processor \
--source . \
--platform managed \
--region us-central1 \
--allow-unauthenticated \
--set-env-vars BROXI_API_TOKEN=your_token
AI Platform Notebooks
Interactive development environment for BroxiAI integration.
Notebook Setup
# Install required packages
!pip install google-cloud-aiplatform
!pip install google-cloud-storage
!pip install requests
import requests
import json
from google.cloud import aiplatform, storage
from google.auth import default
import pandas as pd
import matplotlib.pyplot as plt
# Configure authentication
credentials, project_id = default()
print(f"Project ID: {project_id}")
# Initialize clients
aiplatform.init(project=project_id, location="us-central1")
storage_client = storage.Client(project=project_id)
# BroxiAI API configuration
BROXI_API_TOKEN = "your_api_token"
BROXI_BASE_URL = "https://api.broxi.ai/v1"
def call_broxi_workflow(workflow_id, input_text, variables=None):
"""Call BroxiAI workflow from notebook"""
response = requests.post(
f"{BROXI_BASE_URL}/flows/{workflow_id}/run",
headers={
"Authorization": f"Bearer {BROXI_API_TOKEN}",
"Content-Type": "application/json"
},
json={
"input": input_text,
"variables": variables or {}
}
)
return response.json()
# Example: Batch processing documents
def batch_process_documents(bucket_name, prefix="documents/"):
"""Batch process documents with BroxiAI"""
bucket = storage_client.bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix=prefix))
results = []
for blob in blobs[:10]: # Process first 10 documents
if blob.name.endswith(('.txt', '.md')):
# Download content
content = blob.download_as_text()
# Process with BroxiAI
result = call_broxi_workflow(
"document-summarizer",
content,
{"filename": blob.name}
)
results.append({
"filename": blob.name,
"original_length": len(content),
"summary": result.get("output", ""),
"processing_time": result.get("execution_time", 0)
})
return pd.DataFrame(results)
# Run batch processing
df_results = batch_process_documents("your-documents-bucket")
print(df_results.head())
# Visualize results
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.scatter(df_results['original_length'], df_results['processing_time'])
plt.xlabel('Document Length (characters)')
plt.ylabel('Processing Time (seconds)')
plt.title('Processing Time vs Document Length')
plt.subplot(1, 2, 2)
plt.hist(df_results['processing_time'], bins=10)
plt.xlabel('Processing Time (seconds)')
plt.ylabel('Frequency')
plt.title('Processing Time Distribution')
plt.tight_layout()
plt.show()
Cloud Monitoring
Comprehensive monitoring and alerting for BroxiAI integrations.
Custom Metrics
from google.cloud import monitoring_v3
import time
class GCPMonitoringIntegration:
def __init__(self, project_id):
self.project_id = project_id
self.client = monitoring_v3.MetricServiceClient()
self.project_name = f"projects/{project_id}"
def create_custom_metrics(self):
"""Create custom metrics for BroxiAI monitoring"""
metrics = [
{
"type": "custom.googleapis.com/broxi/workflow_executions",
"metric_kind": monitoring_v3.MetricDescriptor.MetricKind.GAUGE,
"value_type": monitoring_v3.MetricDescriptor.ValueType.INT64,
"description": "Number of BroxiAI workflow executions"
},
{
"type": "custom.googleapis.com/broxi/response_time",
"metric_kind": monitoring_v3.MetricDescriptor.MetricKind.GAUGE,
"value_type": monitoring_v3.MetricDescriptor.ValueType.DOUBLE,
"description": "BroxiAI workflow response time in seconds"
},
{
"type": "custom.googleapis.com/broxi/token_usage",
"metric_kind": monitoring_v3.MetricDescriptor.MetricKind.CUMULATIVE,
"value_type": monitoring_v3.MetricDescriptor.ValueType.INT64,
"description": "Total tokens used by BroxiAI workflows"
}
]
for metric_config in metrics:
descriptor = monitoring_v3.MetricDescriptor(
type=metric_config["type"],
metric_kind=metric_config["metric_kind"],
value_type=metric_config["value_type"],
description=metric_config["description"]
)
try:
self.client.create_metric_descriptor(
name=self.project_name,
metric_descriptor=descriptor
)
print(f"Created metric: {metric_config['type']}")
except Exception as e:
print(f"Error creating metric {metric_config['type']}: {e}")
def record_workflow_execution(self, workflow_id, response_time, token_usage):
"""Record workflow execution metrics"""
# Record execution count
self.write_time_series(
"custom.googleapis.com/broxi/workflow_executions",
1,
{"workflow_id": workflow_id}
)
# Record response time
self.write_time_series(
"custom.googleapis.com/broxi/response_time",
response_time,
{"workflow_id": workflow_id}
)
# Record token usage
self.write_time_series(
"custom.googleapis.com/broxi/token_usage",
token_usage,
{"workflow_id": workflow_id}
)
def write_time_series(self, metric_type, value, labels=None):
"""Write time series data to Cloud Monitoring"""
series = monitoring_v3.TimeSeries()
series.metric.type = metric_type
if labels:
for key, val in labels.items():
series.metric.labels[key] = val
series.resource.type = "global"
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
interval = monitoring_v3.TimeInterval(
{"end_time": {"seconds": seconds, "nanos": nanos}}
)
point = monitoring_v3.Point({
"interval": interval,
"value": {"double_value": value} if isinstance(value, float) else {"int64_value": value}
})
series.points = [point]
self.client.create_time_series(
name=self.project_name,
time_series=[series]
)
Security and Identity
Identity and Access Management (IAM)
Service Account Setup
# Service account for BroxiAI integration
gcloud iam service-accounts create broxi-integration \
--display-name="BroxiAI Integration Service Account"
# Grant necessary permissions
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:broxi-integration@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:broxi-integration@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/datastore.user"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:broxi-integration@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/pubsub.editor"
# Create and download service account key
gcloud iam service-accounts keys create broxi-key.json \
--iam-account=broxi-integration@PROJECT_ID.iam.gserviceaccount.com
IAM Policy Configuration
from google.cloud import resource_manager
from google.cloud.iam_v1 import Policy, Binding
class IAMManager:
def __init__(self, project_id):
self.project_id = project_id
self.client = resource_manager.Client()
def setup_broxi_permissions(self, service_account_email):
"""Set up IAM permissions for BroxiAI integration"""
project = self.client.project(self.project_id)
policy = project.get_iam_policy()
# Define required roles
required_roles = [
"roles/storage.objectAdmin",
"roles/datastore.user",
"roles/pubsub.editor",
"roles/cloudfunctions.invoker",
"roles/run.invoker",
"roles/aiplatform.user"
]
# Add service account to roles
for role in required_roles:
binding = Binding(
role=role,
members=[f"serviceAccount:{service_account_email}"]
)
# Check if binding exists
existing_binding = next(
(b for b in policy.bindings if b.role == role), None
)
if existing_binding:
if f"serviceAccount:{service_account_email}" not in existing_binding.members:
existing_binding.members.append(f"serviceAccount:{service_account_email}")
else:
policy.bindings.append(binding)
# Update policy
project.set_iam_policy(policy)
return "IAM permissions configured successfully"
Cost Optimization
Resource Management
Cost Optimization Strategies
class GCPCostOptimizer:
def __init__(self, project_id):
self.project_id = project_id
self.storage_client = storage.Client()
self.compute_client = compute_v1.InstancesClient()
def optimize_storage_costs(self, bucket_name):
"""Optimize Cloud Storage costs"""
bucket = self.storage_client.bucket(bucket_name)
# Set up lifecycle management
lifecycle_rules = [
{
"action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
"condition": {"age": 30, "matchesStorageClass": ["STANDARD"]}
},
{
"action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
"condition": {"age": 90, "matchesStorageClass": ["NEARLINE"]}
},
{
"action": {"type": "SetStorageClass", "storageClass": "ARCHIVE"},
"condition": {"age": 365, "matchesStorageClass": ["COLDLINE"]}
},
{
"action": {"type": "Delete"},
"condition": {"age": 2555} # 7 years
}
]
bucket.lifecycle_rules = lifecycle_rules
bucket.patch()
return "Storage lifecycle rules configured"
def schedule_compute_instances(self):
"""Schedule compute instances for cost optimization"""
# This would integrate with Cloud Scheduler
# to start/stop instances based on usage patterns
scheduler_config = {
"development_instances": {
"start_schedule": "0 9 * * 1-5", # 9 AM weekdays
"stop_schedule": "0 18 * * 1-5" # 6 PM weekdays
},
"staging_instances": {
"start_schedule": "0 8 * * 1-5", # 8 AM weekdays
"stop_schedule": "0 20 * * 1-5" # 8 PM weekdays
}
}
return scheduler_config
def monitor_costs(self):
"""Monitor and alert on costs"""
# Integration with Cloud Billing API
billing_config = {
"budget_alerts": [
{
"threshold": 80,
"action": "email_notification"
},
{
"threshold": 95,
"action": "disable_billing"
}
],
"cost_tracking": {
"by_service": True,
"by_project": True,
"by_label": True
}
}
return billing_config
Best Practices
Performance Optimization
GCP Best Practices
Use regional resources for better latency
Implement caching with Memorystore
Use Cloud CDN for global content delivery
Optimize data transfer costs
Use preemptible instances for batch processing
Security
Security Best Practices
Use IAM service accounts with minimal permissions
Enable audit logging for all services
Use VPC for network isolation
Implement secret management with Secret Manager
Regular security scanning and updates
Monitoring
Comprehensive Monitoring
Set up Cloud Monitoring dashboards
Configure alerting policies
Use Cloud Logging for centralized logs
Implement distributed tracing
Monitor costs and usage patterns
Next Steps
After GCP integration:
Monitor Performance: Set up comprehensive monitoring
Optimize Costs: Implement cost optimization strategies
Scale Infrastructure: Plan for growth and scaling
Security Review: Conduct security assessments
Disaster Recovery: Implement backup and recovery procedures
Related Guides
AWS Integration: Multi-cloud strategies
Azure Integration: Alternative cloud platform
Security: Security best practices
GCP integration provides powerful AI capabilities, global infrastructure, and cost-effective scaling for BroxiAI applications. Leverage Google's AI expertise and infrastructure for optimal results.
Last updated