Performance Optimization Guide
This guide focuses on specific performance optimization techniques for maximizing the efficiency of your Chutes applications.
Overview
Performance optimization covers:
- Inference Optimization: Speed up model inference
- Memory Management: Efficient use of GPU and system memory
- Throughput Maximization: Handle more requests per second
- Latency Reduction: Minimize response times
- Resource Utilization: Get the most from your hardware
Model Inference Optimization
Dynamic Batching
Implement dynamic batching for better GPU utilization:
import asyncio
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import torch
@dataclass
class BatchRequest:
data: Dict[str, Any]
future: asyncio.Future
timestamp: float
class DynamicBatcher:
def __init__(self,
max_batch_size: int = 32,
max_wait_time: float = 0.01, # 10ms
model_inference_fn=None):
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.model_inference_fn = model_inference_fn
self.pending_requests: List[BatchRequest] = []
self.processing = False
self.lock = asyncio.Lock()
async def add_request(self, data: Dict[str, Any]) -> Any:
"""Add request to batch queue"""
future = asyncio.Future()
request = BatchRequest(
data=data,
future=future,
timestamp=time.time()
)
async with self.lock:
self.pending_requests.append(request)
# Start processing if not already running
if not self.processing:
asyncio.create_task(self._process_batch())
return await future
async def _process_batch(self):
"""Process accumulated requests"""
async with self.lock:
if self.processing or not self.pending_requests:
return
self.processing = True
while True:
# Wait for batch to accumulate or timeout
start_time = time.time()
while (len(self.pending_requests) < self.max_batch_size and
time.time() - start_time < self.max_wait_time):
await asyncio.sleep(0.001) # 1ms
async with self.lock:
if not self.pending_requests:
break
# Extract batch
batch_size = min(len(self.pending_requests), self.max_batch_size)
batch = self.pending_requests[:batch_size]
self.pending_requests = self.pending_requests[batch_size:]
# Process batch
try:
batch_data = [req.data for req in batch]
results = await self._run_inference_batch(batch_data)
# Return results
for req, result in zip(batch, results):
if not req.future.done():
req.future.set_result(result)
except Exception as e:
# Handle batch failure
for req in batch:
if not req.future.done():
req.future.set_exception(e)
async with self.lock:
self.processing = False
async def _run_inference_batch(self, batch_data: List[Dict[str, Any]]) -> List[Any]:
"""Run inference on batch"""
if self.model_inference_fn:
return await self.model_inference_fn(batch_data)
# Default implementation
return [{"result": f"processed_{i}"} for i in range(len(batch_data))]
# Usage
async def model_inference_batch(batch_data: List[Dict[str, Any]]) -> List[Any]:
"""Your model inference function"""
# Extract texts
texts = [item["text"] for item in batch_data]
# Tokenize batch
inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
# Inference
with torch.no_grad():
outputs = model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Return results
results = []
for i, pred in enumerate(predictions):
results.append({
"text": texts[i],
"prediction": pred.cpu().numpy().tolist(),
"confidence": float(torch.max(pred))
})
return results
# Global batcher
batcher = DynamicBatcher(
max_batch_size=16,
max_wait_time=0.01,
model_inference_fn=model_inference_batch
)
async def run_optimized(inputs: Dict[str, Any]) -> Any:
"""Optimized endpoint using dynamic batching"""
result = await batcher.add_request(inputs)
return result
Model Quantization and Optimization
Optimize models for faster inference:
import torch
from transformers import AutoModel, AutoTokenizer
from torch.jit import script
import torch.ao.quantization as quantization
class OptimizedModel:
def __init__(self, model_name: str, optimization_level: str = "basic"):
self.model_name = model_name
self.optimization_level = optimization_level
# Load base model
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
# Apply optimizations
self._optimize_model()
def _optimize_model(self):
"""Apply various optimizations"""
self.model.eval()
if self.optimization_level == "basic":
self._basic_optimization()
elif self.optimization_level == "quantized":
self._quantization_optimization()
elif self.optimization_level == "compiled":
self._compilation_optimization()
elif self.optimization_level == "all":
self._all_optimizations()
def _basic_optimization(self):
"""Basic optimizations"""
# Enable inference mode
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False
# Move to GPU if available
if torch.cuda.is_available():
self.model = self.model.cuda()
def _quantization_optimization(self):
"""Apply quantization"""
# Post-training quantization
self.model = quantization.quantize_dynamic(
self.model,
{torch.nn.Linear},
dtype=torch.qint8
)
def _compilation_optimization(self):
"""Apply TorchScript compilation"""
# Example input for tracing
example_input = self.tokenizer(
"Hello world",
return_tensors="pt",
padding=True,
truncation=True
)
# Trace model
with torch.no_grad():
self.model = torch.jit.trace(self.model, (example_input))
def _all_optimizations(self):
"""Apply all optimizations"""
self._basic_optimization()
# Note: Quantization and compilation may conflict
self._quantization_optimization()
@torch.no_grad()
def predict(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Optimized prediction"""
# Tokenize
inputs = self.tokenizer(
texts,
padding=True,
truncation=True,
return_tensors="pt",
max_length=512
)
if torch.cuda.is_available():
inputs = {k: v.cuda() for k, v in inputs.items()}
# Inference
outputs = self.model(**inputs)
# Process results
embeddings = outputs.last_hidden_state.mean(dim=1)
results = []
for i, embedding in enumerate(embeddings):
results.append({
"text": texts[i],
"embedding": embedding.cpu().numpy().tolist()
})
return results
# Global optimized model
optimized_model = OptimizedModel("bert-base-uncased", optimization_level="all")
Memory Optimization
GPU Memory Management
Efficient GPU memory usage:
import torch
import gc
from contextlib import contextmanager
from typing import Iterator
class GPUMemoryManager:
def __init__(self):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.memory_threshold = 0.85 # 85% memory usage threshold
@contextmanager
def memory_efficient_context(self) -> Iterator[None]:
"""Context manager for memory-efficient operations"""
initial_memory = self.get_memory_usage()
try:
yield
finally:
# Clean up after operation
self.cleanup_memory()
final_memory = self.get_memory_usage()
memory_saved = initial_memory - final_memory
if memory_saved > 0:
print(f"Memory cleaned up: {memory_saved:.2f}%")
def get_memory_usage(self) -> float:
"""Get current GPU memory usage percentage"""
if not torch.cuda.is_available():
return 0.0
memory_used = torch.cuda.memory_allocated()
memory_total = torch.cuda.get_device_properties(0).total_memory
return (memory_used / memory_total) * 100
def cleanup_memory(self):
"""Aggressive memory cleanup"""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
def check_memory_pressure(self) -> bool:
"""Check if we're under memory pressure"""
return self.get_memory_usage() > self.memory_threshold * 100
def adaptive_batch_size(self, base_batch_size: int, max_batch_size: int) -> int:
"""Adapt batch size based on memory pressure"""
memory_usage = self.get_memory_usage()
if memory_usage > 80:
return max(1, base_batch_size // 4)
elif memory_usage > 60:
return max(1, base_batch_size // 2)
elif memory_usage < 40:
return min(max_batch_size, base_batch_size * 2)
else:
return base_batch_size
# Global memory manager
memory_manager = GPUMemoryManager()
class MemoryOptimizedProcessor:
def __init__(self, model):
self.model = model
self.base_batch_size = 16
self.max_batch_size = 64
async def process_with_memory_management(self, data_list: List[Any]) -> List[Any]:
"""Process data with dynamic memory management"""
results = []
i = 0
while i < len(data_list):
# Adapt batch size based on memory pressure
current_batch_size = memory_manager.adaptive_batch_size(
self.base_batch_size,
self.max_batch_size
)
batch = data_list[i:i + current_batch_size]
with memory_manager.memory_efficient_context():
batch_results = await self._process_batch(batch)
results.extend(batch_results)
i += current_batch_size
# Yield control to allow other operations
await asyncio.sleep(0)
return results
async def _process_batch(self, batch: List[Any]) -> List[Any]:
"""Process a single batch"""
with torch.no_grad():
# Your processing logic here
return [{"processed": item} for item in batch]
Caching and Memoization
Advanced Caching Strategies
Implement intelligent caching:
import hashlib
import pickle
import time
from typing import Any, Optional, Dict, Callable
import redis
import asyncio
from functools import wraps
class AdvancedCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.local_cache: Dict[str, tuple] = {} # (value, timestamp, access_count)
self.local_cache_size = 1000
self.default_ttl = 3600
# Cache statistics
self.hits = 0
self.misses = 0
self.local_hits = 0
self.redis_hits = 0
def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate cache key"""
key_data = f"{func_name}:{args}:{sorted(kwargs.items())}"
return f"cache:{hashlib.md5(key_data.encode()).hexdigest()}"
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache (local first, then Redis)"""
# Try local cache first
if key in self.local_cache:
value, timestamp, access_count = self.local_cache[key]
if time.time() - timestamp < self.default_ttl:
# Update access count and timestamp
self.local_cache[key] = (value, timestamp, access_count + 1)
self.hits += 1
self.local_hits += 1
return value
else:
# Expired
del self.local_cache[key]
# Try Redis cache
try:
cached_data = self.redis_client.get(key)
if cached_data:
value = pickle.loads(cached_data)
# Store in local cache
self._store_local(key, value)
self.hits += 1
self.redis_hits += 1
return value
except Exception:
pass
self.misses += 1
return None
async def set(self, key: str, value: Any, ttl: int = None) -> None:
"""Set value in cache"""
ttl = ttl or self.default_ttl
# Store in local cache
self._store_local(key, value)
# Store in Redis
try:
serialized_value = pickle.dumps(value)
self.redis_client.setex(key, ttl, serialized_value)
except Exception:
pass
def _store_local(self, key: str, value: Any) -> None:
"""Store value in local cache with LRU eviction"""
current_time = time.time()
# Evict if at capacity
if len(self.local_cache) >= self.local_cache_size:
# Remove least recently used item
lru_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k][1] # timestamp
)
del self.local_cache[lru_key]
self.local_cache[key] = (value, current_time, 1)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
total_requests = self.hits + self.misses
hit_rate = self.hits / total_requests if total_requests > 0 else 0
return {
"hit_rate": hit_rate,
"total_requests": total_requests,
"local_hits": self.local_hits,
"redis_hits": self.redis_hits,
"misses": self.misses,
"local_cache_size": len(self.local_cache)
}
def cached(self, ttl: int = None):
"""Decorator for caching function results"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_key(func.__name__, args, kwargs)
# Try to get cached result
cached_result = await self.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache result
await self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Global cache instance
advanced_cache = AdvancedCache()
@advanced_cache.cached(ttl=1800) # 30 minutes
async def expensive_model_inference(text: str, model_params: dict) -> dict:
"""Expensive computation that benefits from caching"""
# Simulate expensive operation
await asyncio.sleep(1)
return {"result": f"processed_{text}", "params": model_params}
Concurrent Processing
Async Processing Patterns
Optimize concurrent request handling:
import asyncio
from typing import List, Dict, Any, Callable
import aiohttp
from asyncio import Semaphore, Queue
from dataclasses import dataclass
@dataclass
class ProcessingTask:
id: str
data: Dict[str, Any]
priority: int = 0
class ConcurrentProcessor:
def __init__(self,
max_concurrent: int = 10,
max_queue_size: int = 1000):
self.max_concurrent = max_concurrent
self.semaphore = Semaphore(max_concurrent)
self.task_queue: Queue = Queue(maxsize=max_queue_size)
self.workers_running = False
# Performance metrics
self.processed_count = 0
self.failed_count = 0
self.total_processing_time = 0.0
async def start_workers(self, num_workers: int = 5):
"""Start background workers"""
self.workers_running = True
workers = [
asyncio.create_task(self._worker(f"worker-{i}"))
for i in range(num_workers)
]
return workers
async def stop_workers(self):
"""Stop background workers"""
self.workers_running = False
async def submit_task(self, task: ProcessingTask) -> Any:
"""Submit task for processing"""
# Add to queue
await self.task_queue.put(task)
# Wait for processing (in real implementation, use futures)
# This is simplified for example
return {"task_id": task.id, "status": "queued"}
async def _worker(self, worker_id: str):
"""Background worker"""
while self.workers_running:
try:
# Get task from queue
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
# Process task with concurrency control
async with self.semaphore:
await self._process_task(task)
# Mark task as done
self.task_queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Worker {worker_id} error: {e}")
self.failed_count += 1
async def _process_task(self, task: ProcessingTask):
"""Process individual task"""
start_time = time.time()
try:
# Your processing logic here
await self._simulate_processing(task.data)
self.processed_count += 1
processing_time = time.time() - start_time
self.total_processing_time += processing_time
except Exception as e:
self.failed_count += 1
raise e
async def _simulate_processing(self, data: Dict[str, Any]):
"""Simulate processing work"""
# Replace with actual processing
await asyncio.sleep(0.1)
def get_metrics(self) -> Dict[str, Any]:
"""Get processing metrics"""
avg_processing_time = (
self.total_processing_time / self.processed_count
if self.processed_count > 0 else 0
)
return {
"processed_count": self.processed_count,
"failed_count": self.failed_count,
"average_processing_time": avg_processing_time,
"queue_size": self.task_queue.qsize(),
"success_rate": (
self.processed_count / (self.processed_count + self.failed_count)
if (self.processed_count + self.failed_count) > 0 else 0
)
}
# Global processor
concurrent_processor = ConcurrentProcessor(max_concurrent=20)
async def run_concurrent_processing(inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Handle requests with concurrent processing"""
# Create processing task
task = ProcessingTask(
id=inputs.get("request_id", "unknown"),
data=inputs,
priority=inputs.get("priority", 0)
)
# Submit for processing
result = await concurrent_processor.submit_task(task)
# Return result with metrics
return {
"result": result,
"metrics": concurrent_processor.get_metrics()
}
# Initialize workers on startup
async def initialize_concurrent_processing():
"""Initialize concurrent processing system"""
workers = await concurrent_processor.start_workers(num_workers=10)
print(f"Started {len(workers)} concurrent workers")
return workers
Performance Monitoring
Real-time Performance Metrics
Monitor performance in real-time:
import time
import psutil
import asyncio
from collections import deque
from typing import Dict, Any, List
from dataclasses import dataclass
from prometheus_client import Counter, Histogram, Gauge, start_http_server
@dataclass
class PerformanceMetric:
timestamp: float
request_count: int
avg_response_time: float
cpu_usage: float
memory_usage: float
gpu_utilization: float
class PerformanceMonitor:
def __init__(self, history_size: int = 100):
self.history_size = history_size
self.metrics_history = deque(maxlen=history_size)
# Prometheus metrics
self.request_counter = Counter('requests_total', 'Total requests')
self.response_time_histogram = Histogram('response_time_seconds', 'Response time')
self.cpu_gauge = Gauge('cpu_usage_percent', 'CPU usage')
self.memory_gauge = Gauge('memory_usage_percent', 'Memory usage')
self.gpu_gauge = Gauge('gpu_utilization_percent', 'GPU utilization')
# Performance tracking
self.request_times = deque(maxlen=1000)
self.start_time = time.time()
# Start metrics server
start_http_server(8001)
def record_request(self, response_time: float):
"""Record a request"""
self.request_counter.inc()
self.response_time_histogram.observe(response_time)
self.request_times.append(response_time)
def update_system_metrics(self):
"""Update system performance metrics"""
# CPU usage
cpu_percent = psutil.cpu_percent(interval=None)
self.cpu_gauge.set(cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
memory_percent = memory.percent
self.memory_gauge.set(memory_percent)
# GPU utilization (if available)
gpu_util = self._get_gpu_utilization()
self.gpu_gauge.set(gpu_util)
# Create metric snapshot
current_metric = PerformanceMetric(
timestamp=time.time(),
request_count=len(self.request_times),
avg_response_time=sum(self.request_times) / len(self.request_times) if self.request_times else 0,
cpu_usage=cpu_percent,
memory_usage=memory_percent,
gpu_utilization=gpu_util
)
self.metrics_history.append(current_metric)
def _get_gpu_utilization(self) -> float:
"""Get GPU utilization"""
try:
import torch
if torch.cuda.is_available():
return torch.cuda.utilization()
except:
pass
return 0.0
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary"""
if not self.metrics_history:
return {"status": "no_data"}
recent_metrics = list(self.metrics_history)[-10:] # Last 10 metrics
return {
"uptime_seconds": time.time() - self.start_time,
"total_requests": self.request_counter._value._value,
"current_rps": len(self.request_times),
"avg_response_time": sum(m.avg_response_time for m in recent_metrics) / len(recent_metrics),
"cpu_usage": recent_metrics[-1].cpu_usage,
"memory_usage": recent_metrics[-1].memory_usage,
"gpu_utilization": recent_metrics[-1].gpu_utilization,
"performance_trend": self._calculate_performance_trend()
}
def _calculate_performance_trend(self) -> str:
"""Calculate performance trend"""
if len(self.metrics_history) < 10:
return "insufficient_data"
recent = list(self.metrics_history)[-5:]
older = list(self.metrics_history)[-10:-5]
recent_avg_response = sum(m.avg_response_time for m in recent) / len(recent)
older_avg_response = sum(m.avg_response_time for m in older) / len(older)
if recent_avg_response > older_avg_response * 1.1:
return "degrading"
elif recent_avg_response < older_avg_response * 0.9:
return "improving"
else:
return "stable"
def track_request(self, func):
"""Decorator to track request performance"""
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
response_time = time.time() - start_time
self.record_request(response_time)
self.update_system_metrics()
return wrapper
# Global performance monitor
performance_monitor = PerformanceMonitor()
# Auto-update metrics
async def metrics_updater():
"""Background task to update metrics"""
while True:
performance_monitor.update_system_metrics()
await asyncio.sleep(10) # Update every 10 seconds
# Start metrics updater
asyncio.create_task(metrics_updater())
@performance_monitor.track_request
async def run_with_monitoring(inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Run with performance monitoring"""
# Your processing logic
result = await process_request(inputs)
# Include performance summary
return {
"result": result,
"performance": performance_monitor.get_performance_summary()
}
Next Steps
- Performance Guide - General performance optimization
- Cost Optimization - Balance performance and cost
- Best Practices - Deployment best practices
For advanced performance tuning, see the Advanced Performance Guide.