Jobs are background tasks in Chutes that handle long-running operations, file uploads, and asynchronous processing. Unlike Cords (API endpoints), Jobs don't need to respond immediately and can run for extended periods.
@chute.job(upload=True, timeout=3600)asyncdefprocess_dataset(self, dataset_file: bytes) -> dict:
# Save uploaded filewithopen("/tmp/dataset.csv", "wb") as f:
f.write(dataset_file)
# Process the dataset
df = pd.read_csv("/tmp/dataset.csv")
results = analyze_dataset(df)
return {"rows": len(df), "analysis": results}
Progress Tracking
For long-running jobs, you can track and report progress:
@chute.job(timeout=7200)asyncdefbatch_process(self, items: list) -> dict:
results = []
total = len(items)
for i, item inenumerate(items):
# Process each item
result = await process_item(item)
results.append(result)
# Report progress (this is logged)
progress = (i + 1) / total * 100print(f"Progress: {progress:.1f}% ({i+1}/{total})")
return {"processed": len(results), "results": results}
Error Handling
@chute.job(retry=2, timeout=1800)asyncdefresilient_job(self, data: dict) -> dict:
try:
result = await risky_operation(data)
return {"success": True, "result": result}
except TemporaryError as e:
# This will trigger a retryraise e
except PermanentError as e:
# Return error instead of raising to avoid retriesreturn {"success": False, "error": str(e)}
Working with Files
Processing Uploaded Files
import tempfile
import os
@chute.job(upload=True, timeout=1800)asyncdefprocess_image(self, image_file: bytes) -> dict:
# Create temporary filewith tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(image_file)
tmp_path = tmp.name
try:
# Process the image
processed = await image_processing_function(tmp_path)
return {"processed": True, "features": processed}
finally:
# Clean up
os.unlink(tmp_path)
Generating Files for Download
@chute.job(timeout=3600)asyncdefgenerate_report(self, report_config: dict) -> dict:
# Generate report
report_data = await create_report(report_config)
# Save to file (this could be uploaded to cloud storage)
report_path = f"/tmp/report_{report_config['id']}.pdf"
save_report_as_pdf(report_data, report_path)
return {
"report_generated": True,
"report_path": report_path,
"pages": len(report_data)
}
State Management
Jobs can maintain state throughout their execution:
@chute.job(timeout=7200)asyncdeftraining_job(self, config: dict) -> dict:
# Initialize training stateself.training_state = {
"epoch": 0,
"best_accuracy": 0.0,
"model_checkpoints": []
}
for epoch inrange(config["epochs"]):
self.training_state["epoch"] = epoch
# Train for one epoch
accuracy = await train_epoch(epoch)
if accuracy > self.training_state["best_accuracy"]:
self.training_state["best_accuracy"] = accuracy
# Save checkpoint
checkpoint_path = f"/tmp/checkpoint_epoch_{epoch}.pt"
save_checkpoint(checkpoint_path)
self.training_state["model_checkpoints"].append(checkpoint_path)
returnself.training_state
Job Lifecycle
Queued: Job is submitted and waiting to run
Running: Job is executing
Completed: Job finished successfully
Failed: Job encountered an error
Retrying: Job failed but will retry (if retry > 0)
Timeout: Job exceeded timeout limit
Running Jobs
Programmatically
# Submit a job
job_id = await chute.submit_job("process_data", {"input": "data"})
# Check job status
status = await chute.get_job_status(job_id)
# Get job results (when completed)
results = await chute.get_job_results(job_id)
Via HTTP API
# Submit a job
curl -X POST https://your-username-your-chute.chutes.ai/jobs/process_data \
-H "Content-Type: application/json" \
-d '{"input": "data"}'# Check status
curl https://your-username-your-chute.chutes.ai/jobs/{job_id}/status
# Get results
curl https://your-username-your-chute.chutes.ai/jobs/{job_id}/results
Best Practices
1. Set Appropriate Timeouts
# Short tasks@chute.job(timeout=300) # 5 minutes# Medium tasks@chute.job(timeout=1800) # 30 minutes# Long training jobs@chute.job(timeout=14400) # 4 hours
2. Handle Failures Gracefully
@chute.job(retry=2)asyncdefrobust_job(self, data: dict) -> dict:
try:
returnawait process_data(data)
except Exception as e:
# Log the error
logger.error(f"Job failed: {e}")
# Return error info instead of raisingreturn {"success": False, "error": str(e)}
3. Use Progress Tracking
@chute.job(timeout=3600)asyncdefbatch_job(self, items: list) -> dict:
for i, item inenumerate(items):
# Process itemawait process_item(item)
# Log progress every 10 itemsif i % 10 == 0:
print(f"Processed {i}/{len(items)} items")