Source code for scitex_benchmark.monitor
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Time-stamp: "2025-07-25 05:40:00"
# File: monitor.py
"""
Real-time performance monitoring for SciTeX.
"""
import json
import threading
import time
import warnings
from collections import defaultdict, deque
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
@dataclass
class PerformanceMetric:
"""Single performance measurement."""
timestamp: float
function: str
duration: float
memory_delta: Optional[float] = None
args_size: Optional[int] = None
result_size: Optional[int] = None
exception: Optional[str] = None
[docs]
class PerformanceMonitor:
"""
Monitor performance metrics for SciTeX functions.
Example
-------
>>> monitor = PerformanceMonitor()
>>> monitor.start()
>>> # Your code here
>>> stats = monitor.get_stats()
"""
def __init__(self, max_history: int = 1000):
self.max_history = max_history
self.metrics = deque(maxlen=max_history)
self.function_stats = defaultdict(
lambda: {
"count": 0,
"total_time": 0.0,
"min_time": float("inf"),
"max_time": 0.0,
"errors": 0,
}
)
self.is_monitoring = False
self._lock = threading.Lock()
# Alerts configuration
self.alerts = {
"slow_function": 1.0, # Alert if function takes > 1s
"memory_spike": 100, # Alert if memory increases > 100MB
"error_rate": 0.1, # Alert if error rate > 10%
}
self.alert_callbacks = []
[docs]
def record_metric(self, metric: PerformanceMetric):
"""Record a performance metric."""
if not self.is_monitoring:
return
with self._lock:
self.metrics.append(metric)
# Update function statistics
stats = self.function_stats[metric.function]
stats["count"] += 1
stats["total_time"] += metric.duration
stats["min_time"] = min(stats["min_time"], metric.duration)
stats["max_time"] = max(stats["max_time"], metric.duration)
if metric.exception:
stats["errors"] += 1
# Check alerts
self._check_alerts(metric)
def _check_alerts(self, metric: PerformanceMetric):
"""Check if metric triggers any alerts."""
alerts_triggered = []
# Slow function alert
if metric.duration > self.alerts["slow_function"]:
alerts_triggered.append(
{
"type": "slow_function",
"function": metric.function,
"duration": metric.duration,
"threshold": self.alerts["slow_function"],
}
)
# Memory spike alert
if metric.memory_delta and metric.memory_delta > self.alerts["memory_spike"]:
alerts_triggered.append(
{
"type": "memory_spike",
"function": metric.function,
"delta": metric.memory_delta,
"threshold": self.alerts["memory_spike"],
}
)
# Error rate alert
stats = self.function_stats[metric.function]
if stats["count"] > 10: # Only check after sufficient calls
error_rate = stats["errors"] / stats["count"]
if error_rate > self.alerts["error_rate"]:
alerts_triggered.append(
{
"type": "high_error_rate",
"function": metric.function,
"rate": error_rate,
"threshold": self.alerts["error_rate"],
}
)
# Trigger callbacks
for alert in alerts_triggered:
for callback in self.alert_callbacks:
callback(alert)
[docs]
def add_alert_callback(self, callback: Callable[[Dict[str, Any]], None]):
"""Add a callback for performance alerts."""
self.alert_callbacks.append(callback)
[docs]
def get_stats(self, function: Optional[str] = None) -> Dict[str, Any]:
"""
Get performance statistics.
Parameters
----------
function : str, optional
Specific function to get stats for
Returns
-------
dict
Performance statistics
"""
with self._lock:
if function:
stats = self.function_stats.get(function, {})
if stats and stats["count"] > 0:
return {
"function": function,
"count": stats["count"],
"total_time": stats["total_time"],
"avg_time": stats["total_time"] / stats["count"],
"min_time": stats["min_time"],
"max_time": stats["max_time"],
"error_rate": stats["errors"] / stats["count"],
}
return {}
else:
# Return all stats
all_stats = {}
for func, stats in self.function_stats.items():
if stats["count"] > 0:
all_stats[func] = {
"count": stats["count"],
"avg_time": stats["total_time"] / stats["count"],
"min_time": stats["min_time"],
"max_time": stats["max_time"],
"error_rate": stats["errors"] / stats["count"],
}
return all_stats
[docs]
def get_recent_metrics(self, n: int = 100) -> List[PerformanceMetric]:
"""Get n most recent metrics."""
with self._lock:
return list(self.metrics)[-n:]
[docs]
def save_metrics(self, path: str):
"""Save metrics to file."""
with self._lock:
data = {
"metrics": [
{
"timestamp": m.timestamp,
"function": m.function,
"duration": m.duration,
"memory_delta": m.memory_delta,
"args_size": m.args_size,
"result_size": m.result_size,
"exception": m.exception,
}
for m in self.metrics
],
"stats": dict(self.function_stats),
}
Path(path).write_text(json.dumps(data, indent=2))
[docs]
def load_metrics(self, path: str):
"""Load metrics from file."""
data = json.loads(Path(path).read_text())
with self._lock:
self.metrics.clear()
for m in data["metrics"]:
self.metrics.append(PerformanceMetric(**m))
self.function_stats.clear()
self.function_stats.update(data["stats"])
[docs]
def clear(self):
"""Clear all metrics."""
with self._lock:
self.metrics.clear()
self.function_stats.clear()
# Global monitor instance
_global_monitor = PerformanceMonitor()
[docs]
def track_performance(func: Callable) -> Callable:
"""
Decorator to track function performance.
Example
-------
>>> @track_performance
... def my_function(x):
... return x ** 2
"""
import sys
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
if not _global_monitor.is_monitoring:
return func(*args, **kwargs)
# Get memory before (if available)
try:
import psutil
process = psutil.Process()
mem_before = process.memory_info().rss / 1024 / 1024
except Exception:
mem_before = None
# Time the function
start_time = time.time()
exception = None
result = None
try:
result = func(*args, **kwargs)
except Exception as e:
exception = str(e)
raise
finally:
duration = time.time() - start_time
# Get memory after
mem_delta = None
if mem_before is not None:
try:
mem_after = process.memory_info().rss / 1024 / 1024
mem_delta = mem_after - mem_before
except Exception:
pass
# Estimate sizes
args_size = None
result_size = None
try:
args_size = sys.getsizeof(args) + sys.getsizeof(kwargs)
if result is not None:
result_size = sys.getsizeof(result)
except Exception:
pass
# Record metric
metric = PerformanceMetric(
timestamp=start_time,
function=func.__name__,
duration=duration,
memory_delta=mem_delta,
args_size=args_size,
result_size=result_size,
exception=exception,
)
_global_monitor.record_metric(metric)
return result
return wrapper
def start_monitoring():
"""Start global performance monitoring."""
_global_monitor.start()
def stop_monitoring():
"""Stop global performance monitoring."""
_global_monitor.stop()
[docs]
def get_performance_stats(function: Optional[str] = None) -> Dict[str, Any]:
"""Get performance statistics from global monitor."""
return _global_monitor.get_stats(function)
def set_performance_alerts(**thresholds):
"""
Set performance alert thresholds.
Parameters
----------
slow_function : float
Alert if function takes longer than this (seconds)
memory_spike : float
Alert if memory increases by more than this (MB)
error_rate : float
Alert if error rate exceeds this (0-1)
"""
_global_monitor.alerts.update(thresholds)
def add_performance_alert_handler(handler: Callable[[Dict[str, Any]], None]):
"""
Add a handler for performance alerts.
Example
-------
>>> def alert_handler(alert):
... print(f"ALERT: {alert['type']} in {alert['function']}")
>>> add_performance_alert_handler(alert_handler)
"""
_global_monitor.add_alert_callback(handler)
# Default alert handler
def _default_alert_handler(alert: Dict[str, Any]):
"""Default handler that prints warnings."""
if alert["type"] == "slow_function":
warnings.warn(
f"Slow function: {alert['function']} took {alert['duration']:.2f}s "
f"(threshold: {alert['threshold']}s)"
)
elif alert["type"] == "memory_spike":
warnings.warn(
f"Memory spike: {alert['function']} increased memory by {alert['delta']:.1f}MB "
f"(threshold: {alert['threshold']}MB)"
)
elif alert["type"] == "high_error_rate":
warnings.warn(
f"High error rate: {alert['function']} has {alert['rate']:.1%} error rate "
f"(threshold: {alert['threshold']:.1%})"
)
# Register default handler
add_performance_alert_handler(_default_alert_handler)