Copy from google.cloud import bigquery
from google.auth import default
import pandas as pd
class BigQueryIntegration:
def __init__(self, project_id):
credentials, _ = default()
self.client = bigquery.Client(project=project_id, credentials=credentials)
self.project_id = project_id
def create_workflow_analytics_dataset(self):
"""Create dataset for workflow analytics"""
dataset_id = f"{self.project_id}.broxi_analytics"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset.description = "BroxiAI workflow analytics and metrics"
# Create dataset
dataset = self.client.create_dataset(dataset, exists_ok=True)
# Create tables
self.create_workflow_executions_table()
self.create_user_interactions_table()
self.create_performance_metrics_table()
return f"Dataset {dataset_id} created successfully"
def create_workflow_executions_table(self):
"""Create table for workflow execution data"""
table_id = f"{self.project_id}.broxi_analytics.workflow_executions"
schema = [
bigquery.SchemaField("execution_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("workflow_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("user_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("input_text", "STRING", mode="NULLABLE"),
bigquery.SchemaField("output_text", "STRING", mode="NULLABLE"),
bigquery.SchemaField("execution_time", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("token_usage", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("status", "STRING", mode="REQUIRED"),
bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("metadata", "JSON", mode="NULLABLE")
]
table = bigquery.Table(table_id, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="timestamp"
)
table = self.client.create_table(table, exists_ok=True)
return f"Table {table_id} created successfully"
def insert_workflow_execution(self, execution_data):
"""Insert workflow execution data"""
table_id = f"{self.project_id}.broxi_analytics.workflow_executions"
table = self.client.get_table(table_id)
rows_to_insert = [execution_data]
errors = self.client.insert_rows_json(table, rows_to_insert)
if errors:
print(f"Errors inserting data: {errors}")
return False
return True
def analyze_workflow_performance(self, start_date, end_date):
"""Analyze workflow performance over time"""
query = f"""
SELECT
workflow_id,
COUNT(*) as total_executions,
AVG(execution_time) as avg_execution_time,
PERCENTILE_CONT(execution_time, 0.5) OVER() as median_execution_time,
PERCENTILE_CONT(execution_time, 0.95) OVER() as p95_execution_time,
SUM(token_usage) as total_tokens,
SUM(cost) as total_cost,
COUNT(CASE WHEN status = 'completed' THEN 1 END) as successful_executions,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_executions
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE timestamp BETWEEN '{start_date}' AND '{end_date}'
GROUP BY workflow_id
ORDER BY total_executions DESC
"""
query_job = self.client.query(query)
results = query_job.result()
return [dict(row) for row in results]
def create_usage_dashboard(self):
"""Create usage analytics dashboard queries"""
queries = {
"daily_usage": f"""
SELECT
DATE(timestamp) as date,
COUNT(*) as executions,
SUM(token_usage) as tokens,
SUM(cost) as cost,
AVG(execution_time) as avg_response_time
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY DATE(timestamp)
ORDER BY date DESC
""",
"top_users": f"""
SELECT
user_id,
COUNT(*) as total_requests,
SUM(cost) as total_cost,
AVG(execution_time) as avg_response_time
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY user_id
ORDER BY total_requests DESC
LIMIT 10
""",
"error_analysis": f"""
SELECT
workflow_id,
error_message,
COUNT(*) as error_count,
MAX(timestamp) as last_occurrence
FROM `{self.project_id}.broxi_analytics.workflow_executions`
WHERE status = 'failed'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
GROUP BY workflow_id, error_message
ORDER BY error_count DESC
"""
}
return queries