The @chute.job() decorator is used to create long-running jobs or server rentals in Chutes applications. These are different from API endpoints and are designed for tasks that need persistent compute resources, specific network ports, or long-running processes.
Jobs can expose network ports for external access:
from chutes.chute.job import Port
# Define a port
port = Port(name="web", port=8080, proto="http")
Parameters
Parameters
ports: list[Port] = []
List of network ports to expose for the job.
Port Definition:
from chutes.chute.job import Port
classPort(BaseModel):
name: str# Port identifier (alphanumeric + numbers)
port: int# Port number (2202 or 8002-65535)
proto: str# Protocol: "tcp", "udp", or "http"
Examples:
from chutes.chute.job import Port
# Single HTTP port@chute.job(ports=[Port(name="web", port=8080, proto="http")])asyncdefweb_server_job(self):
# Start web server on port 8080pass# Multiple ports@chute.job(ports=[
Port(name="api", port=8000, proto="http"),
Port(name="metrics", port=9090, proto="http"),
Port(name="tcp_service", port=8001, proto="tcp")
])asyncdefmulti_port_job(self):
pass# No ports (compute-only job)@chute.job()asyncdefcompute_job(self):
pass
Port Rules:
Port must be 2202 or in range 8002-65535
Port 2202 is reserved for SSH access
Each port must have a unique number
Name must match pattern ^[a-z]+[0-9]*$
timeout: Optional[int] = None
Maximum execution time for the job in seconds. Can be None for unlimited time.
Constraints:
If specified, must be between 30 seconds and 24 hours (86400 seconds)
None means no timeout (job can run indefinitely)
Examples:
# Job with 1 hour timeout@chute.job(timeout=3600)asyncdeftraining_job(self):
"""Model training with 1 hour limit."""awaitself.train_model()
# Long-running server with no timeout@chute.job(timeout=None)asyncdefserver_job(self):
"""Persistent server process."""awaitself.start_server()
# Runs until manually stopped# Short batch job@chute.job(timeout=300) # 5 minutesasyncdefquick_batch_job(self):
"""Quick data processing job."""awaitself.process_batch()
upload: bool = True
Whether to automatically upload output files generated by the job.
Purpose:
Automatically collect and upload files created in the job's output directory
Useful for jobs that generate artifacts, logs, or results files
Examples:
# Job with file upload (default)@chute.job(upload=True)asyncdefgenerate_report_job(self, output_dir: str):
"""Generate report and upload results."""# Files written to output_dir will be automatically uploadedwithopen(f"{output_dir}/report.pdf", "wb") as f:
f.write(awaitself.generate_pdf_report())
# Job without file upload@chute.job(upload=False)asyncdefstreaming_job(self):
"""Streaming job that doesn't generate files."""whileTrue:
awaitself.process_stream()
await asyncio.sleep(1)
ssh: bool = False
Whether to enable SSH access to the job container.
Purpose:
Debug running jobs
Interactive development
Manual intervention when needed
Examples:
# Job with SSH access for debugging@chute.job(ssh=True)asyncdefdebug_job(self):
"""Job with SSH access for debugging."""awaitself.complex_operation()
# Regular job without SSH@chute.job(ssh=False) # Defaultasyncdefregular_job(self):
"""Regular job without SSH access."""awaitself.standard_operation()
SSH Access:
When ssh=True, port 2202 is automatically added to the job's ports for SSH access.
Job Function Patterns
Basic Job Functions
# Simple compute job@chute.job()asyncdefcompute_job(self, **job_data):
"""Simple computation job."""
output_dir = job_data["output_dir"]
# Perform computation
result = awaitself.run_analysis()
# Save results to output directorywithopen(f"{output_dir}/results.json", "w") as f:
json.dump(result, f)
return {"status": "completed", "result": result}
# Job with network ports@chute.job(ports=[Port(name="api", port=8000, proto="http")])asyncdefapi_server_job(self, **job_data):
"""Run an API server."""# Start FastAPI server on port 8000
app = FastAPI()
@app.get("/health")asyncdefhealth():
return {"status": "healthy"}
awaitself.start_server(app, port=8000)
# Job with file upload and SSH access@chute.job(upload=True, ssh=True, timeout=3600)asyncdeftraining_job(self, **job_data):
"""Model training job with SSH access for debugging."""
output_dir = job_data["output_dir"]
# Train model
model = awaitself.train_model()
# Save model artifacts
model_path = f"{output_dir}/model.pkl"awaitself.save_model(model, model_path)
return {"status": "completed", "model_path": model_path}
Scheduled Jobs
# Daily maintenance@chute.job(
name="daily_maintenance",
schedule="0 3 * * *"# 3 AM daily)asyncdefdaily_maintenance(self):
"""Daily maintenance tasks."""# Clean temporary files
temp_files_cleaned = awaitself.cleanup_temp_files()
# Update model cacheawaitself.update_model_cache()
# Generate daily metrics
metrics = awaitself.generate_daily_metrics()
return {
"maintenance_completed": True,
"temp_files_cleaned": temp_files_cleaned,
"metrics": metrics
}
# Hourly data sync@chute.job(
name="data_sync",
schedule="0 * * * *", # Every hour
retry_count=5)asyncdefsync_external_data(self):
"""Sync data from external sources."""
sources = ["api1", "api2", "database"]
results = {}
for source in sources:
try:
data = awaitself.fetch_from_source(source)
awaitself.update_local_data(source, data)
results[source] = {"status": "success", "records": len(data)}
except Exception as e:
results[source] = {"status": "error", "error": str(e)}
self.logger.error(f"Failed to sync from {source}: {e}")
return results
Long-Running Jobs
# Model training job@chute.job(
name="model_training",
timeout=7200, # 2 hours
retry_count=1,
queue="gpu_intensive")asyncdeftrain_model(self, training_config: dict):
"""Train machine learning model."""self.logger.info("Starting model training")
try:
# Prepare training data
training_data = awaitself.prepare_training_data(training_config)
# Train model
model = awaitself.train_model_async(training_data, training_config)
# Validate model
validation_score = awaitself.validate_model(model)
# Save model if validation passesif validation_score > training_config.get("min_score", 0.8):
model_path = awaitself.save_model(model)
return {
"status": "success",
"validation_score": validation_score,
"model_path": model_path
}
else:
return {
"status": "failed",
"reason": "validation_score_too_low",
"validation_score": validation_score
}
except Exception as e:
self.logger.error(f"Model training failed: {e}")
return {
"status": "error",
"error": str(e)
}
# Data processing pipeline@chute.job(
name="data_pipeline",
timeout=3600, # 1 hour
queue="data_processing")asyncdefprocess_data_pipeline(self, pipeline_config: dict):
"""Process data through multiple stages."""
stages = pipeline_config.get("stages", [])
results = {}
for stage_name, stage_config in stages.items():
self.logger.info(f"Processing stage: {stage_name}")
try:
result = awaitself.process_stage(stage_name, stage_config)
results[stage_name] = result
except Exception as e:
self.logger.error(f"Stage {stage_name} failed: {e}")
results[stage_name] = {"status": "error", "error": str(e)}
# Stop pipeline on critical stage failureif stage_config.get("critical", False):
breakreturn {
"pipeline_completed": True,
"stage_results": results
}
Error Handling and Recovery
# Job with comprehensive error handling@chute.job(
name="robust_job",
retry_count=3,
timeout=300)asyncdefrobust_job_with_error_handling(self, task_id: str):
"""Job with comprehensive error handling."""try:
# Update job statusawaitself.update_job_status(task_id, "started")
# Perform main work
result = awaitself.perform_work(task_id)
# Update success statusawaitself.update_job_status(task_id, "completed")
return {
"task_id": task_id,
"status": "success",
"result": result
}
except ValueError as e:
# Non-retryable errorself.logger.error(f"Invalid input for task {task_id}: {e}")
awaitself.update_job_status(task_id, "failed")
return {
"task_id": task_id,
"status": "failed",
"error": "invalid_input",
"message": str(e)
}
except ConnectionError as e:
# Retryable errorself.logger.warning(f"Connection error for task {task_id}: {e}")
awaitself.update_job_status(task_id, "retrying")
raise# Will be retriedexcept Exception as e:
# Unexpected errorself.logger.error(f"Unexpected error for task {task_id}: {e}")
awaitself.update_job_status(task_id, "error")
# Send alert for unexpected errorsawaitself.send_error_alert(task_id, str(e))
return {
"task_id": task_id,
"status": "error",
"error": "unexpected_error"
}
# Job with graceful shutdown@chute.job(
name="graceful_job",
timeout=1800# 30 minutes)asyncdefjob_with_graceful_shutdown(self, batch_data: List[dict]):
"""Job that handles graceful shutdown."""
processed_items = 0
failed_items = 0try:
for item in batch_data:
# Check for shutdown signalifself.should_shutdown:
self.logger.info(f"Graceful shutdown requested, processed {processed_items} items")
breaktry:
awaitself.process_item(item)
processed_items += 1except Exception as e:
self.logger.error(f"Failed to process item {item.get('id')}: {e}")
failed_items += 1except asyncio.CancelledError:
self.logger.info(f"Job cancelled, processed {processed_items} items")
raisereturn {
"processed": processed_items,
"failed": failed_items,
"total": len(batch_data)
}
Job Scheduling and Management
Triggering Jobs from Endpoints
# Trigger job from API endpoint@chute.cord(public_api_path="/start_training")asyncdefstart_training_job(self, config: dict):
"""Start a model training job."""# Validate configurationifnotself.validate_training_config(config):
raise HTTPException(400, "Invalid training configuration")
# Start background job
job_id = awaitself.start_job("model_training", config)
return {
"job_id": job_id,
"status": "started",
"message": "Model training job started"
}
# Check job status@chute.cord(public_api_path="/job_status/{job_id}")asyncdefget_job_status(self, job_id: str):
"""Get status of a background job."""
status = awaitself.get_job_status(job_id)
ifnot status:
raise HTTPException(404, "Job not found")
return status
# Cancel running job@chute.cord(public_api_path="/cancel_job/{job_id}", method="DELETE")asyncdefcancel_job(self, job_id: str):
"""Cancel a running job."""
success = awaitself.cancel_job(job_id)
ifnot success:
raise HTTPException(404, "Job not found or cannot be cancelled")
return {"message": "Job cancelled successfully"}
Job Workflows and Dependencies
# Sequential job workflow@chute.job(name="workflow_step_1")asyncdefworkflow_step_1(self, workflow_id: str):
"""First step in a workflow."""
result = awaitself.perform_step_1(workflow_id)
# Trigger next stepawaitself.start_job("workflow_step_2", {
"workflow_id": workflow_id,
"previous_result": result
})
return result
@chute.job(name="workflow_step_2")asyncdefworkflow_step_2(self, workflow_id: str, previous_result: dict):
"""Second step in a workflow."""
result = awaitself.perform_step_2(workflow_id, previous_result)
# Trigger final stepawaitself.start_job("workflow_step_final", {
"workflow_id": workflow_id,
"all_results": [previous_result, result]
})
return result
@chute.job(name="workflow_step_final")asyncdefworkflow_final_step(self, workflow_id: str, all_results: List[dict]):
"""Final step in a workflow."""
final_result = awaitself.combine_results(all_results)
# Mark workflow as completeawaitself.complete_workflow(workflow_id, final_result)
return final_result
Job Monitoring and Metrics
# Job with progress tracking@chute.job(name="progress_job")asyncdefjob_with_progress(self, items: List[dict]):
"""Job that reports progress."""
total_items = len(items)
processed = 0for item in items:
# Process itemawaitself.process_item(item)
processed += 1# Update progress
progress = (processed / total_items) * 100awaitself.update_job_progress(
job_id=self.current_job_id,
progress=progress,
message=f"Processed {processed}/{total_items} items"
)
return {"total_processed": processed}
# Job with custom metrics@chute.job(name="metrics_job")asyncdefjob_with_metrics(self, data: dict):
"""Job that collects custom metrics."""
start_time = time.time()
try:
result = awaitself.process_data(data)
# Record success metrics
processing_time = time.time() - start_time
awaitself.record_metric("job_processing_time", processing_time)
awaitself.record_metric("job_success_count", 1)
return result
except Exception as e:
# Record failure metricsawaitself.record_metric("job_failure_count", 1)
awaitself.record_metric("job_error_type", type(e).__name__)
raise# Health check job@chute.job(
name="health_check",
schedule="*/5 * * * *"# Every 5 minutes)asyncdefhealth_check_job(self):
"""Regular health check job."""
checks = {
"database": awaitself.check_database_health(),
"external_api": awaitself.check_external_api_health(),
"model": awaitself.check_model_health(),
"cache": awaitself.check_cache_health()
}
all_healthy = all(checks.values())
# Update health statusawaitself.update_health_status(all_healthy, checks)
# Send alert if unhealthyifnot all_healthy:
awaitself.send_health_alert(checks)
return {
"overall_health": all_healthy,
"individual_checks": checks,
"timestamp": datetime.now().isoformat()
}
Testing Jobs
Unit Testing
import pytest
import asyncio
classTestJobs:
"""Test suite for background jobs.""" @pytest.mark.asyncioasyncdeftest_simple_job(self, chute_instance):
"""Test simple job execution."""
result = await chute_instance.simple_task()
assert result["status"] == "completed" @pytest.mark.asyncioasyncdeftest_job_with_parameters(self, chute_instance):
"""Test job with parameters."""
user_id = "test_user"
data = {"key": "value"}
result = await chute_instance.process_user_data(user_id, data)
assert result["user_id"] == user_id
assert"result"in result
@pytest.mark.asyncioasyncdeftest_job_error_handling(self, chute_instance):
"""Test job error handling."""# Mock a method to raise an exception
chute_instance.process_data = AsyncMock(side_effect=ValueError("Test error"))
result = await chute_instance.robust_job_with_error_handling("test_task")
assert result["status"] == "failed"assert result["error"] == "invalid_input" @pytest.mark.asyncioasyncdeftest_job_timeout(self, chute_instance):
"""Test job timeout handling."""with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(
chute_instance.long_running_job(),
timeout=1.0# Short timeout for testing
)
deftest_job_retry_logic(self, chute_instance):
"""Test job retry mechanism."""# This would test the retry logic# Implementation depends on your job runnerpass
Integration Testing
@pytest.mark.integrationasyncdeftest_job_workflow(chute_instance):
"""Test complete job workflow."""
workflow_id = "test_workflow"# Start workflow
result1 = await chute_instance.workflow_step_1(workflow_id)
assert result1 isnotNone# Second step
result2 = await chute_instance.workflow_step_2(workflow_id, result1)
assert result2 isnotNone# Final step
final_result = await chute_instance.workflow_final_step(
workflow_id,
[result1, result2]
)
assert final_result isnotNone@pytest.mark.integrationasyncdeftest_scheduled_job_execution():
"""Test scheduled job execution."""# This would test actual job scheduling# Implementation depends on your schedulerpass
Best Practices
Job Design
Keep Jobs Idempotent
@chute.job(name="idempotent_job")asyncdefidempotent_job(self, item_id: str):
"""Job that can be safely retried."""# Check if already processedifawaitself.is_already_processed(item_id):
return {"status": "already_processed", "item_id": item_id}
# Process item
result = awaitself.process_item(item_id)
# Mark as processedawaitself.mark_as_processed(item_id)
return result
Use Appropriate Timeouts
# Short timeout for quick jobs@chute.job(timeout=30)asyncdefquick_job(self):
pass# Longer timeout for complex jobs@chute.job(timeout=3600)asyncdefcomplex_job(self):
pass
Handle Errors Gracefully
@chute.job(retry_count=3)asyncdefresilient_job(self):
try:
returnawaitself.do_work()
except RetriableError:
raise# Will be retriedexcept PermanentError as e:
self.logger.error(f"Permanent error: {e}")
return {"status": "failed", "error": str(e)}
Use Appropriate Queues
# High priority queue for critical jobs@chute.job(queue="critical", priority=10)asyncdefcritical_job(self):
pass# Separate queue for resource-intensive jobs@chute.job(queue="gpu_intensive")asyncdefgpu_job(self):
pass
Monitor Job Performance
@chute.job(name="monitored_job")asyncdefmonitored_job(self, data: dict):
start_time = time.time()
try:
result = awaitself.process_data(data)
# Record success metrics
duration = time.time() - start_time
awaitself.record_metric("job_duration", duration)
awaitself.record_metric("job_success", 1)
return result
except Exception as e:
awaitself.record_metric("job_failure", 1)
raise
This comprehensive guide covers all aspects of the @chute.job() decorator for building robust background job systems in Chutes applications.