Cost Optimization Guide
This guide provides strategies to optimize costs while maintaining performance and reliability for your Chutes applications.
Overview
Cost optimization in Chutes involves:
- Resource Right-sizing: Choose appropriate hardware configurations
- Auto-scaling: Scale resources based on demand
- Spot Instances: Use cost-effective computing options
- Efficient Scheduling: Optimize when workloads run
- Model Optimization: Reduce computational requirements
Resource Right-sizing
Choose Appropriate Hardware
Select the right GPU and memory configuration:
from chutes.chute import Chute, NodeSelector
# Cost-optimized for inference
inference_chute = Chute(
username="myuser",
name="cost-optimized-inference",
image=your_image,
entry_file="app.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=8, # Right-size for your model# Minimal RAM requirements
preferred_provider="vast" # Often more cost-effective
),
timeout_seconds=300,
concurrency=8
)
# For batch processing with higher throughput needs
batch_chute = Chute(
username="myuser",
name="batch-processing",
image=your_image,
entry_file="batch_app.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=2,
min_vram_gb_per_gpu=24),
timeout_seconds=1800,
concurrency=4
)
Spot Instance Strategy
Using Spot Instances
Leverage spot instances for significant cost savings:
from chutes.chute import Chute, NodeSelector
# Spot instance configuration
spot_chute = Chute(
username="myuser",
name="spot-training",
image=training_image,
entry_file="training.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=4,
min_vram_gb_per_gpu=16max_spot_price=0.50 # Set maximum price you're willing to pay
),
timeout_seconds=7200, # Longer timeout for training
concurrency=1,
auto_scale=False
)
# Fault-tolerant batch processing with spot instances
class SpotInstanceManager:
def __init__(self, chute_config):
self.chute_config = chute_config
self.retry_count = 3
async def run_with_retry(self, inputs):
"""Run job with automatic retry on spot interruption"""
for attempt in range(self.retry_count):
try:
# Create chute with spot instance
chute = Chute(**self.chute_config)
result = chute.run(inputs)
return result
except Exception as e:
if attempt == self.retry_count - 1:
# All retries exhausted
raise e
# Wait before retry
await asyncio.sleep(30)
raise Exception("Failed after all retry attempts")
Smart Scaling Strategies
Time-based Scaling
Scale based on predictable usage patterns:
import schedule
import time
from datetime import datetime
class TimeBasedScaler:
def __init__(self, chute_name):
self.chute_name = chute_name
self.setup_schedule()
def setup_schedule(self):
"""Set up scaling schedule based on usage patterns"""
# Scale up during business hours
schedule.every().monday.at("08:00").do(self.scale_up)
schedule.every().tuesday.at("08:00").do(self.scale_up)
schedule.every().wednesday.at("08:00").do(self.scale_up)
schedule.every().thursday.at("08:00").do(self.scale_up)
schedule.every().friday.at("08:00").do(self.scale_up)
# Scale down after hours
schedule.every().monday.at("18:00").do(self.scale_down)
schedule.every().tuesday.at("18:00").do(self.scale_down)
schedule.every().wednesday.at("18:00").do(self.scale_down)
schedule.every().thursday.at("18:00").do(self.scale_down)
schedule.every().friday.at("18:00").do(self.scale_down)
# Minimal scaling on weekends
schedule.every().saturday.at("00:00").do(self.scale_minimal)
schedule.every().sunday.at("00:00").do(self.scale_minimal)
def scale_up(self):
"""Scale up for peak hours"""
self.update_chute_config({
"min_instances": 3,
"max_instances": 10,
"concurrency": 20
})
def scale_down(self):
"""Scale down for off-peak hours"""
self.update_chute_config({
"min_instances": 1,
"max_instances": 3,
"concurrency": 8
})
def scale_minimal(self):
"""Minimal scaling for weekends"""
self.update_chute_config({
"min_instances": 0,
"max_instances": 2,
"concurrency": 4
})
def update_chute_config(self, config):
"""Update chute configuration"""
# Implementation to update chute scaling settings
pass
def run(self):
"""Run the scheduler"""
while True:
schedule.run_pending()
time.sleep(60)
Demand-based Auto-scaling
Implement intelligent auto-scaling:
class DemandBasedScaler:
def __init__(self, chute, target_utilization=0.7):
self.chute = chute
self.target_utilization = target_utilization
self.metrics_history = []
self.scale_cooldown = 300 # 5 minutes
self.last_scale_time = 0
async def monitor_and_scale(self):
"""Monitor metrics and scale accordingly"""
current_metrics = await self.get_current_metrics()
self.metrics_history.append(current_metrics)
# Keep only last 10 minutes of metrics
if len(self.metrics_history) > 10:
self.metrics_history.pop(0)
# Calculate average utilization
avg_utilization = sum(m['utilization'] for m in self.metrics_history) / len(self.metrics_history)
current_time = time.time()
time_since_last_scale = current_time - self.last_scale_time
# Only scale if cooldown period has passed
if time_since_last_scale < self.scale_cooldown:
return
if avg_utilization > self.target_utilization + 0.1: # Scale up
await self.scale_up()
self.last_scale_time = current_time
elif avg_utilization < self.target_utilization - 0.2: # Scale down
await self.scale_down()
self.last_scale_time = current_time
async def get_current_metrics(self):
"""Get current performance metrics"""
# Implementation to get actual metrics
return {
'utilization': 0.8,
'response_time': 200,
'queue_length': 5
}
async def scale_up(self):
"""Scale up instances"""
current_instances = await self.get_current_instance_count()
new_count = min(current_instances + 1, self.chute.max_instances)
await self.set_instance_count(new_count)
async def scale_down(self):
"""Scale down instances"""
current_instances = await self.get_current_instance_count()
new_count = max(current_instances - 1, self.chute.min_instances)
await self.set_instance_count(new_count)
Workload Optimization
Batch Processing for Cost Efficiency
Process multiple requests together:
import asyncio
from typing import List, Dict, Any
class CostOptimizedBatchProcessor:
def __init__(self, max_batch_size=32, max_wait_time=5.0):
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.pending_requests = []
self.processing = False
async def add_request(self, request_data: Dict[str, Any]) -> Any:
"""Add request to batch queue"""
future = asyncio.Future()
self.pending_requests.append({
'data': request_data,
'future': future
})
# Start processing if not already running
if not self.processing:
asyncio.create_task(self.process_batch())
return await future
async def process_batch(self):
"""Process accumulated requests as a batch"""
if self.processing:
return
self.processing = True
# Wait for batch to fill up or timeout
start_time = time.time()
while (len(self.pending_requests) < self.max_batch_size and
time.time() - start_time < self.max_wait_time):
await asyncio.sleep(0.1)
if not self.pending_requests:
self.processing = False
return
# Extract batch
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
try:
# Process batch
batch_data = [req['data'] for req in batch]
results = await self.process_batch_data(batch_data)
# Return results to futures
for req, result in zip(batch, results):
req['future'].set_result(result)
except Exception as e:
# Handle batch errors
for req in batch:
req['future'].set_exception(e)
finally:
self.processing = False
# Process remaining requests if any
if self.pending_requests:
asyncio.create_task(self.process_batch())
async def process_batch_data(self, batch_data: List[Dict[str, Any]]) -> List[Any]:
"""Process the actual batch - implement your logic here"""
# Example: AI model inference on batch
results = []
for data in batch_data:
# Process individual item
result = await self.model_inference(data)
results.append(result)
return results
# Usage in chute
batch_processor = CostOptimizedBatchProcessor(max_batch_size=16, max_wait_time=2.0)
async def run_cost_optimized(inputs: Dict[str, Any]) -> Any:
"""Cost-optimized endpoint using batching"""
result = await batch_processor.add_request(inputs)
return result
Model Optimization for Cost
Model Quantization
Reduce computational costs through quantization:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
class QuantizedModelForCost:
def __init__(self, model_name: str):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
# Load model with 8-bit quantization
self.model = AutoModelForSequenceClassification.from_pretrained(
model_name,
load_in_8bit=True, # Reduces memory usage by ~50%
device_map="auto"
)
async def predict(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Batch prediction with quantized model"""
# Process in batches for efficiency
batch_size = 16
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# Tokenize batch
inputs = self.tokenizer(
batch,
padding=True,
truncation=True,
return_tensors="pt",
max_length=512
)
# Inference
with torch.no_grad():
outputs = self.model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Extract results
for j, prediction in enumerate(predictions):
results.append({
'text': batch[j],
'prediction': prediction.cpu().numpy().tolist(),
'confidence': float(torch.max(prediction))
})
return results
# Deploy with cost-optimized settings
cost_optimized_chute = Chute(
username="myuser",
name="quantized-inference",
image=quantized_image,
entry_file="quantized_model.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=8, # Reduced from 16GB due to quantization),
concurrency=12, # Higher concurrency due to reduced memory usage
timeout_seconds=120
)
Model Caching Strategy
Implement intelligent caching to reduce compute costs:
import hashlib
import pickle
import redis
from typing import Optional, Dict, Any
class CostOptimizedCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.hit_count = 0
self.miss_count = 0
def get_cache_key(self, inputs: Dict[str, Any]) -> str:
"""Generate cache key from inputs"""
# Create deterministic hash of inputs
input_str = str(sorted(inputs.items()))
return f"model_cache:{hashlib.md5(input_str.encode()).hexdigest()}"
async def get_cached_result(self, inputs: Dict[str, Any]) -> Optional[Any]:
"""Get cached result if available"""
cache_key = self.get_cache_key(inputs)
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
self.hit_count += 1
return pickle.loads(cached_data)
except Exception:
pass
self.miss_count += 1
return None
async def cache_result(self, inputs: Dict[str, Any], result: Any, ttl: int = 3600):
"""Cache computation result"""
cache_key = self.get_cache_key(inputs)
try:
serialized_result = pickle.dumps(result)
self.redis_client.setex(cache_key, ttl, serialized_result)
except Exception:
pass
def get_cache_stats(self) -> Dict[str, float]:
"""Get cache performance statistics"""
total_requests = self.hit_count + self.miss_count
if total_requests == 0:
return {"hit_rate": 0.0, "miss_rate": 0.0}
return {
"hit_rate": self.hit_count / total_requests,
"miss_rate": self.miss_count / total_requests,
"total_requests": total_requests
}
# Global cache instance
cost_cache = CostOptimizedCache()
async def run_with_cost_cache(inputs: Dict[str, Any]) -> Any:
"""Run with intelligent caching for cost optimization"""
# Try to get cached result first
cached_result = await cost_cache.get_cached_result(inputs)
if cached_result is not None:
return {
"result": cached_result,
"cached": True,
"cache_stats": cost_cache.get_cache_stats()
}
# Compute result if not cached
result = await expensive_computation(inputs)
# Cache result for future requests
await cost_cache.cache_result(inputs, result, ttl=1800) # 30 minutes
return {
"result": result,
"cached": False,
"cache_stats": cost_cache.get_cache_stats()
}
Cost Monitoring and Analytics
Cost Tracking
Monitor and track costs in real-time:
import time
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class CostMetric:
timestamp: float
gpu_hours: float
compute_cost: float
request_count: int
cache_hit_rate: float
class CostMonitor:
def __init__(self):
self.cost_history: List[CostMetric] = []
self.hourly_costs: Dict[str, float] = {}
self.daily_budgets: Dict[str, float] = {}
def record_usage(self, gpu_hours: float, compute_cost: float,
request_count: int, cache_hit_rate: float):
"""Record usage metrics"""
metric = CostMetric(
timestamp=time.time(),
gpu_hours=gpu_hours,
compute_cost=compute_cost,
request_count=request_count,
cache_hit_rate=cache_hit_rate
)
self.cost_history.append(metric)
# Update hourly tracking
hour_key = datetime.now().strftime("%Y-%m-%d-%H")
if hour_key not in self.hourly_costs:
self.hourly_costs[hour_key] = 0
self.hourly_costs[hour_key] += compute_cost
def get_daily_cost(self, date: str = None) -> float:
"""Get total cost for a specific day"""
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
daily_cost = 0
for hour_key, cost in self.hourly_costs.items():
if hour_key.startswith(date):
daily_cost += cost
return daily_cost
def check_budget_alert(self, daily_budget: float) -> Dict[str, Any]:
"""Check if approaching budget limits"""
current_cost = self.get_daily_cost()
budget_usage = current_cost / daily_budget
alert_level = "green"
if budget_usage > 0.9:
alert_level = "red"
elif budget_usage > 0.7:
alert_level = "yellow"
return {
"current_cost": current_cost,
"daily_budget": daily_budget,
"budget_usage": budget_usage,
"alert_level": alert_level,
"remaining_budget": daily_budget - current_cost
}
def get_cost_optimization_suggestions(self) -> List[str]:
"""Generate cost optimization suggestions"""
suggestions = []
# Analyze recent metrics
recent_metrics = self.cost_history[-10:] if len(self.cost_history) >= 10 else self.cost_history
if recent_metrics:
avg_cache_hit_rate = sum(m.cache_hit_rate for m in recent_metrics) / len(recent_metrics)
avg_cost_per_request = sum(m.compute_cost / max(m.request_count, 1) for m in recent_metrics) / len(recent_metrics)
if avg_cache_hit_rate < 0.5:
suggestions.append("Consider increasing cache TTL or implementing better caching strategy")
if avg_cost_per_request > 0.01: # Threshold for expensive requests
suggestions.append("Consider using smaller models or batch processing")
# Check for usage patterns
hourly_usage = {}
for metric in recent_metrics:
hour = datetime.fromtimestamp(metric.timestamp).hour
if hour not in hourly_usage:
hourly_usage[hour] = []
hourly_usage[hour].append(metric.compute_cost)
# Suggest time-based scaling if usage varies significantly
if len(hourly_usage) > 3:
costs = [sum(costs) for costs in hourly_usage.values()]
if max(costs) / min(costs) > 3:
suggestions.append("Consider time-based scaling to reduce costs during low-usage periods")
return suggestions
# Global cost monitor
cost_monitor = CostMonitor()
async def run_with_cost_monitoring(inputs: Dict[str, Any]) -> Any:
"""Run with cost monitoring"""
start_time = time.time()
# Execute request
result = await process_request(inputs)
# Calculate metrics
execution_time = time.time() - start_time
gpu_hours = execution_time / 3600 # Convert to hours
estimated_cost = gpu_hours * 0.50 # $0.50 per GPU hour (example rate)
# Record usage
cost_monitor.record_usage(
gpu_hours=gpu_hours,
compute_cost=estimated_cost,
request_count=1,
cache_hit_rate=0.8 # From cache system
)
# Check budget
budget_status = cost_monitor.check_budget_alert(daily_budget=50.0)
return {
"result": result,
"cost_info": {
"execution_time": execution_time,
"estimated_cost": estimated_cost,
"budget_status": budget_status
}
}
Cost Optimization Best Practices
1. Resource Selection
- Choose the smallest GPU that meets your performance requirements
- Use CPU-only instances for non-AI workloads
- Consider memory requirements carefully
2. Scaling Strategy
- Implement auto-scaling based on actual demand
- Use time-based scaling for predictable patterns
- Set appropriate scale-down policies
3. Workload Optimization
- Batch requests when possible
- Implement intelligent caching
- Use model quantization for inference workloads
4. Monitoring and Alerts
- Set up budget alerts and monitoring
- Track cost per request and optimization opportunities
- Regular review of usage patterns
Next Steps
- Performance Guide - Optimize performance while controlling costs
- Best Practices - General optimization strategies
- Monitoring - Advanced cost and performance monitoring
For enterprise cost optimization, see the Enterprise Cost Management Guide.