The @chute.job() decorator is used to create long-running jobs or server rentals in Chutes applications. Jobs are different from API endpoints (cords) and are designed for tasks that need persistent compute resources, specific network ports, or long-running processes.
Jobs can expose network ports for external access using the Port class:
from chutes.chute.job import Port
port = Port(
name: str, # Port identifier (lowercase letters + optional numbers)
port: int, # Port number (2202 or 8002-65535)
proto: str# Protocol: "tcp", "udp", or "http"
)
Port Rules
Port must be 2202 (reserved for SSH) or in range 8002-65535
Each port must have a unique number within the job
Name must match pattern ^[a-z]+[0-9]*$ (e.g., "web", "api", "metrics1")
Parameters
ports: list[Port] = []
List of network ports to expose for the job.
Examples:
from chutes.chute.job import Port
# Single HTTP port@chute.job(ports=[Port(name="web", port=8080, proto="http")])asyncdefweb_server_job(self, **job_data):
pass# Multiple ports@chute.job(ports=[
Port(name="api", port=8000, proto="http"),
Port(name="metrics", port=9090, proto="http"),
Port(name="grpc", port=8001, proto="tcp")
])asyncdefmulti_port_job(self, **job_data):
pass# No ports (compute-only job)@chute.job()asyncdefcompute_job(self, **job_data):
pass
timeout: Optional[int] = None
Maximum execution time for the job in seconds.
Constraints:
If specified, must be between 30 seconds and 86400 seconds (24 hours)
None means no timeout (job can run indefinitely - useful for server rentals)
Examples:
# Job with 1 hour timeout@chute.job(timeout=3600)asyncdeftraining_job(self, **job_data):
"""Model training with 1 hour limit."""awaitself.train_model()
# Long-running server with no timeout@chute.job(timeout=None)asyncdefserver_job(self, **job_data):
"""Persistent server process."""awaitself.start_server()
# Short batch job (5 minutes)@chute.job(timeout=300)asyncdefquick_batch_job(self, **job_data):
"""Quick data processing job."""awaitself.process_batch()
upload: bool = True
Whether to automatically upload output files generated by the job.
Purpose:
Automatically collects and uploads files created in the job's output directory
Useful for jobs that generate artifacts, model weights, logs, or result files
Examples:
# Job with file upload (default)@chute.job(upload=True)asyncdefgenerate_report_job(self, **job_data):
"""Generate report and upload results."""
output_dir = job_data["output_dir"]
# Files written to output_dir will be automatically uploadedwithopen(f"{output_dir}/report.pdf", "wb") as f:
f.write(awaitself.generate_pdf_report())
withopen(f"{output_dir}/results.json", "w") as f:
json.dump(self.results, f)
# Job without file upload@chute.job(upload=False)asyncdefstreaming_job(self, **job_data):
"""Streaming job that doesn't generate files."""whilenotself.cancel_event.is_set():
awaitself.process_stream()
await asyncio.sleep(1)
ssh: bool = False
Whether to enable SSH access to the job container.
Purpose:
Debug running jobs
Interactive development
Manual intervention when needed
Examples:
# Job with SSH access for debugging@chute.job(ssh=True, timeout=7200)asyncdefdebug_job(self, **job_data):
"""Job with SSH access for debugging."""# SSH key should be provided in job_data["_ssh_public_key"]awaitself.complex_operation()
# Regular job without SSH@chute.job(ssh=False) # Defaultasyncdefregular_job(self, **job_data):
"""Regular job without SSH access."""awaitself.standard_operation()
Note: When ssh=True, port 2202 is automatically added to the job's ports for SSH access.
@chute.job()asyncdefmy_job(self, **job_data):
# job_data contains:# - "output_dir": Directory path for output files# - "_ssh_public_key": SSH public key (if ssh=True and provided)# - Any other data passed when starting the job
output_dir = job_data["output_dir"]
# Your job logic herereturn {"status": "completed"}
Job Lifecycle
Cancellation Support
Jobs have access to a cancel event that can be used to gracefully handle cancellation:
@chute.job(timeout=3600)asyncdefcancellable_job(self, **job_data):
"""Job that handles cancellation gracefully."""for i inrange(100):
# Check for cancellationifself.cancel_event.is_set():
print("Job cancelled, cleaning up...")
breakawaitself.process_step(i)
await asyncio.sleep(1)
return {"processed_steps": i}
Output Directory
Jobs receive an output_dir in job_data where they can write files:
@chute.job(upload=True)asyncdefjob_with_outputs(self, **job_data):
output_dir = job_data["output_dir"]
# Write output files
model_path = f"{output_dir}/model.pt"
torch.save(self.model.state_dict(), model_path)
# Write logswithopen(f"{output_dir}/training_log.txt", "w") as f:
f.write("\n".join(self.logs))
# Files in output_dir are automatically uploaded when job completesreturn {"model_path": model_path}
Complete Examples
Model Training Job
from chutes.chute import Chute, NodeSelector
from chutes.chute.job import Port
from chutes.image import Image
image = (
Image(username="myuser", name="training", tag="1.0")
.from_base("parachutes/python:3.12")
.run_command("pip install torch transformers")
)
chute = Chute(
username="myuser",
name="model-trainer",
image=image,
node_selector=NodeSelector(gpu_count=1, min_vram_gb_per_gpu=24)
)
@chute.on_startup()asyncdefsetup(self):
import torch
self.device = "cuda"if torch.cuda.is_available() else"cpu"@chute.job(
timeout=7200, # 2 hours
upload=True,
ssh=True# Enable SSH for debugging)asyncdeftrain_model(self, **job_data):
"""Train a model and save checkpoints."""import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer
output_dir = job_data["output_dir"]
model_name = job_data.get("model_name", "gpt2")
epochs = job_data.get("epochs", 3)
print(f"Loading model: {model_name}")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.to(self.device)
# Training loopfor epoch inrange(epochs):
ifself.cancel_event.is_set():
print("Training cancelled")
breakprint(f"Epoch {epoch + 1}/{epochs}")
# ... training logic ...# Save checkpoint
checkpoint_path = f"{output_dir}/checkpoint_epoch_{epoch}.pt"
torch.save(model.state_dict(), checkpoint_path)
# Save final model
final_path = f"{output_dir}/final_model.pt"
torch.save(model.state_dict(), final_path)
return {
"status": "completed",
"epochs_completed": epoch + 1,
"model_path": final_path
}
Web Server Job
from chutes.chute.job import Port
@chute.job(
ports=[
Port(name="web", port=8080, proto="http"),
Port(name="metrics", port=9090, proto="http")
],
timeout=None, # Run indefinitely
upload=False)asyncdefweb_server_job(self, **job_data):
"""Run a web server as a long-running job."""from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/")asyncdefroot():
return {"message": "Hello from job!"}
@app.get("/health")asyncdefhealth():
return {"status": "healthy"}
config = uvicorn.Config(app, host="0.0.0.0", port=8080)
server = uvicorn.Server(config)
# Run until cancelledawait server.serve()
Batch Processing Job
@chute.job(timeout=1800, upload=True)asyncdefbatch_processing_job(self, **job_data):
"""Process a batch of items."""
output_dir = job_data["output_dir"]
items = job_data.get("items", [])
results = []
processed = 0
failed = 0for item in items:
ifself.cancel_event.is_set():
print(f"Cancelled after processing {processed} items")
breaktry:
result = awaitself.process_item(item)
results.append(result)
processed += 1except Exception as e:
print(f"Failed to process item: {e}")
failed += 1# Save resultswithopen(f"{output_dir}/results.json", "w") as f:
json.dump(results, f)
return {
"status": "completed",
"processed": processed,
"failed": failed,
"total": len(items)
}
Error Handling
Jobs should handle errors gracefully and return appropriate status:
@chute.job(timeout=3600)asyncdefrobust_job(self, **job_data):
"""Job with comprehensive error handling."""
output_dir = job_data["output_dir"]
try:
# Perform main work
result = awaitself.do_work()
# Save outputwithopen(f"{output_dir}/output.json", "w") as f:
json.dump(result, f)
return {
"status": "completed",
"result": result
}
except asyncio.CancelledError:
# Handle cancellationprint("Job was cancelled")
raiseexcept ValueError as e:
# Handle known errorsreturn {
"status": "failed",
"error": "invalid_input",
"message": str(e)
}
except Exception as e:
# Handle unexpected errorsprint(f"Unexpected error: {e}")
# Save error logwithopen(f"{output_dir}/error.log", "w") as f:
f.write(f"Error: {e}\n")
import traceback
f.write(traceback.format_exc())
return {
"status": "error",
"error": str(e)
}
Best Practices
1. Always Check for Cancellation
@chute.job(timeout=3600)asyncdeflong_job(self, **job_data):
for i inrange(1000):
ifself.cancel_event.is_set():
return {"status": "cancelled", "progress": i}
awaitself.process_step(i)
2. Use Appropriate Timeouts
# Short job - use explicit timeout@chute.job(timeout=300)asyncdefquick_job(self, **job_data):
pass# Long training - longer timeout@chute.job(timeout=86400) # 24 hoursasyncdeftraining_job(self, **job_data):
pass# Server rental - no timeout@chute.job(timeout=None)asyncdefserver_job(self, **job_data):
pass
3. Write Important Data to Output Directory
@chute.job(upload=True)asyncdefjob_with_checkpoints(self, **job_data):
output_dir = job_data["output_dir"]
for epoch inrange(100):
# Train...# Save checkpoint periodicallyif epoch % 10 == 0:
torch.save(model, f"{output_dir}/checkpoint_{epoch}.pt")
4. Use SSH for Debugging Complex Jobs
@chute.job(ssh=True, timeout=7200)asyncdefdebuggable_job(self, **job_data):
"""Enable SSH so you can connect and debug if needed."""pass