The @chute.job() decorator is used to create long-running jobs or server rentals in Chutes applications. Jobs are different from API endpoints (cords) and are designed for tasks that need persistent compute resources, specific network ports, or long-running processes.
Jobs can expose network ports for external access using the Port class:
from chutes.chute.job import Port
port = Port(
name: str, # Port identifier (lowercase letters + optional numbers)
port: int, # Port number (2202 or 8002-65535)
proto: str # Protocol: "tcp", "udp", or "http"
)
Port Rules
Port must be 2202 (reserved for SSH) or in range 8002-65535
Each port must have a unique number within the job
Name must match pattern ^[a-z]+[0-9]*$ (e.g., "web", "api", "metrics1")
Parameters
ports: list[Port] = []
List of network ports to expose for the job.
Examples:
from chutes.chute.job import Port
# Single HTTP port
@chute.job(ports=[Port(name="web", port=8080, proto="http")])
async def web_server_job(self, **job_data):
pass
# Multiple ports
@chute.job(ports=[
Port(name="api", port=8000, proto="http"),
Port(name="metrics", port=9090, proto="http"),
Port(name="grpc", port=8001, proto="tcp")
])
async def multi_port_job(self, **job_data):
pass
# No ports (compute-only job)
@chute.job()
async def compute_job(self, **job_data):
pass
timeout: Optional[int] = None
Maximum execution time for the job in seconds.
Constraints:
If specified, must be between 30 seconds and 86400 seconds (24 hours)
None means no timeout (job can run indefinitely - useful for server rentals)
Examples:
# Job with 1 hour timeout
@chute.job(timeout=3600)
async def training_job(self, **job_data):
"""Model training with 1 hour limit."""
await self.train_model()
# Long-running server with no timeout
@chute.job(timeout=None)
async def server_job(self, **job_data):
"""Persistent server process."""
await self.start_server()
# Short batch job (5 minutes)
@chute.job(timeout=300)
async def quick_batch_job(self, **job_data):
"""Quick data processing job."""
await self.process_batch()
upload: bool = True
Whether to automatically upload output files generated by the job.
Purpose:
Automatically collects and uploads files created in the job's output directory
Useful for jobs that generate artifacts, model weights, logs, or result files
Examples:
# Job with file upload (default)
@chute.job(upload=True)
async def generate_report_job(self, **job_data):
"""Generate report and upload results."""
output_dir = job_data["output_dir"]
# Files written to output_dir will be automatically uploaded
with open(f"{output_dir}/report.pdf", "wb") as f:
f.write(await self.generate_pdf_report())
with open(f"{output_dir}/results.json", "w") as f:
json.dump(self.results, f)
# Job without file upload
@chute.job(upload=False)
async def streaming_job(self, **job_data):
"""Streaming job that doesn't generate files."""
while not self.cancel_event.is_set():
await self.process_stream()
await asyncio.sleep(1)
ssh: bool = False
Whether to enable SSH access to the job container.
Purpose:
Debug running jobs
Interactive development
Manual intervention when needed
Examples:
# Job with SSH access for debugging
@chute.job(ssh=True, timeout=7200)
async def debug_job(self, **job_data):
"""Job with SSH access for debugging."""
# SSH key should be provided in job_data["_ssh_public_key"]
await self.complex_operation()
# Regular job without SSH
@chute.job(ssh=False) # Default
async def regular_job(self, **job_data):
"""Regular job without SSH access."""
await self.standard_operation()
Note: When ssh=True, port 2202 is automatically added to the job's ports for SSH access.
@chute.job()
async def my_job(self, **job_data):
# job_data contains:
# - "output_dir": Directory path for output files
# - "_ssh_public_key": SSH public key (if ssh=True and provided)
# - Any other data passed when starting the job
output_dir = job_data["output_dir"]
# Your job logic here
return {"status": "completed"}
Job Lifecycle
Cancellation Support
Jobs have access to a cancel event that can be used to gracefully handle cancellation:
@chute.job(timeout=3600)
async def cancellable_job(self, **job_data):
"""Job that handles cancellation gracefully."""
for i in range(100):
# Check for cancellation
if self.cancel_event.is_set():
print("Job cancelled, cleaning up...")
break
await self.process_step(i)
await asyncio.sleep(1)
return {"processed_steps": i}
Output Directory
Jobs receive an output_dir in job_data where they can write files:
@chute.job(upload=True)
async def job_with_outputs(self, **job_data):
output_dir = job_data["output_dir"]
# Write output files
model_path = f"{output_dir}/model.pt"
torch.save(self.model.state_dict(), model_path)
# Write logs
with open(f"{output_dir}/training_log.txt", "w") as f:
f.write("\n".join(self.logs))
# Files in output_dir are automatically uploaded when job completes
return {"model_path": model_path}
Complete Examples
Model Training Job
from chutes.chute import Chute, NodeSelector
from chutes.chute.job import Port
from chutes.image import Image
image = (
Image(username="myuser", name="training", tag="1.0")
.from_base("parachutes/python:3.12")
.run_command("pip install torch transformers")
)
chute = Chute(
username="myuser",
name="model-trainer",
image=image,
node_selector=NodeSelector(gpu_count=1, min_vram_gb_per_gpu=24)
)
@chute.on_startup()
async def setup(self):
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
@chute.job(
timeout=7200, # 2 hours
upload=True,
ssh=True # Enable SSH for debugging
)
async def train_model(self, **job_data):
"""Train a model and save checkpoints."""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer
output_dir = job_data["output_dir"]
model_name = job_data.get("model_name", "gpt2")
epochs = job_data.get("epochs", 3)
print(f"Loading model: {model_name}")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.to(self.device)
# Training loop
for epoch in range(epochs):
if self.cancel_event.is_set():
print("Training cancelled")
break
print(f"Epoch {epoch + 1}/{epochs}")
# ... training logic ...
# Save checkpoint
checkpoint_path = f"{output_dir}/checkpoint_epoch_{epoch}.pt"
torch.save(model.state_dict(), checkpoint_path)
# Save final model
final_path = f"{output_dir}/final_model.pt"
torch.save(model.state_dict(), final_path)
return {
"status": "completed",
"epochs_completed": epoch + 1,
"model_path": final_path
}
Web Server Job
from chutes.chute.job import Port
@chute.job(
ports=[
Port(name="web", port=8080, proto="http"),
Port(name="metrics", port=9090, proto="http")
],
timeout=None, # Run indefinitely
upload=False
)
async def web_server_job(self, **job_data):
"""Run a web server as a long-running job."""
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello from job!"}
@app.get("/health")
async def health():
return {"status": "healthy"}
config = uvicorn.Config(app, host="0.0.0.0", port=8080)
server = uvicorn.Server(config)
# Run until cancelled
await server.serve()
Batch Processing Job
@chute.job(timeout=1800, upload=True)
async def batch_processing_job(self, **job_data):
"""Process a batch of items."""
output_dir = job_data["output_dir"]
items = job_data.get("items", [])
results = []
processed = 0
failed = 0
for item in items:
if self.cancel_event.is_set():
print(f"Cancelled after processing {processed} items")
break
try:
result = await self.process_item(item)
results.append(result)
processed += 1
except Exception as e:
print(f"Failed to process item: {e}")
failed += 1
# Save results
with open(f"{output_dir}/results.json", "w") as f:
json.dump(results, f)
return {
"status": "completed",
"processed": processed,
"failed": failed,
"total": len(items)
}
Error Handling
Jobs should handle errors gracefully and return appropriate status:
@chute.job(timeout=3600)
async def robust_job(self, **job_data):
"""Job with comprehensive error handling."""
output_dir = job_data["output_dir"]
try:
# Perform main work
result = await self.do_work()
# Save output
with open(f"{output_dir}/output.json", "w") as f:
json.dump(result, f)
return {
"status": "completed",
"result": result
}
except asyncio.CancelledError:
# Handle cancellation
print("Job was cancelled")
raise
except ValueError as e:
# Handle known errors
return {
"status": "failed",
"error": "invalid_input",
"message": str(e)
}
except Exception as e:
# Handle unexpected errors
print(f"Unexpected error: {e}")
# Save error log
with open(f"{output_dir}/error.log", "w") as f:
f.write(f"Error: {e}\n")
import traceback
f.write(traceback.format_exc())
return {
"status": "error",
"error": str(e)
}
Best Practices
1. Always Check for Cancellation
@chute.job(timeout=3600)
async def long_job(self, **job_data):
for i in range(1000):
if self.cancel_event.is_set():
return {"status": "cancelled", "progress": i}
await self.process_step(i)
2. Use Appropriate Timeouts
# Short job - use explicit timeout
@chute.job(timeout=300)
async def quick_job(self, **job_data):
pass
# Long training - longer timeout
@chute.job(timeout=86400) # 24 hours
async def training_job(self, **job_data):
pass
# Server rental - no timeout
@chute.job(timeout=None)
async def server_job(self, **job_data):
pass
3. Write Important Data to Output Directory
@chute.job(upload=True)
async def job_with_checkpoints(self, **job_data):
output_dir = job_data["output_dir"]
for epoch in range(100):
# Train...
# Save checkpoint periodically
if epoch % 10 == 0:
torch.save(model, f"{output_dir}/checkpoint_{epoch}.pt")
4. Use SSH for Debugging Complex Jobs
@chute.job(ssh=True, timeout=7200)
async def debuggable_job(self, **job_data):
"""Enable SSH so you can connect and debug if needed."""
pass