Developers

Job Decorator API Reference

The decorator is used to create long-running jobs or server rentals in Chutes applications. These are different from API endpoints and are designed for tasks that need persistent compute resources, specific network ports, or long-running processes.

Decorator Signature

@chute.job(
    ports: list[Port] = [],
    timeout: Optional[int] = None,
    upload: bool = True,
    ssh: bool = False
)

Port Configuration

Jobs can expose network ports for external access:

from chutes.chute.job import Port

# Define a port
port = Port(name="web", port=8080, proto="http")

Parameters

Parameters

List of network ports to expose for the job.

Port Definition:

from chutes.chute.job import Port

class Port(BaseModel):
    name: str           # Port identifier (alphanumeric + numbers)
    port: int          # Port number (2202 or 8002-65535)
    proto: str         # Protocol: "tcp", "udp", or "http"

Examples:

from chutes.chute.job import Port

# Single HTTP port
@chute.job(ports=[Port(name="web", port=8080, proto="http")])
async def web_server_job(self):
    # Start web server on port 8080
    pass

# Multiple ports
@chute.job(ports=[
    Port(name="api", port=8000, proto="http"),
    Port(name="metrics", port=9090, proto="http"),
    Port(name="tcp_service", port=8001, proto="tcp")
])
async def multi_port_job(self):
    pass

# No ports (compute-only job)
@chute.job()
async def compute_job(self):
    pass

Port Rules:

  • Port must be 2202 or in range 8002-65535
  • Port 2202 is reserved for SSH access
  • Each port must have a unique number
  • Name must match pattern

Maximum execution time for the job in seconds. Can be None for unlimited time.

Constraints:

  • If specified, must be between 30 seconds and 24 hours (86400 seconds)
  • None means no timeout (job can run indefinitely)

Examples:

# Job with 1 hour timeout
@chute.job(timeout=3600)
async def training_job(self):
    """Model training with 1 hour limit."""
    await self.train_model()

# Long-running server with no timeout
@chute.job(timeout=None)
async def server_job(self):
    """Persistent server process."""
    await self.start_server()
    # Runs until manually stopped

# Short batch job
@chute.job(timeout=300)  # 5 minutes
async def quick_batch_job(self):
    """Quick data processing job."""
    await self.process_batch()

Whether to automatically upload output files generated by the job.

Purpose:

  • Automatically collect and upload files created in the job's output directory
  • Useful for jobs that generate artifacts, logs, or results files

Examples:

# Job with file upload (default)
@chute.job(upload=True)
async def generate_report_job(self, output_dir: str):
    """Generate report and upload results."""
    # Files written to output_dir will be automatically uploaded
    with open(f"{output_dir}/report.pdf", "wb") as f:
        f.write(await self.generate_pdf_report())

# Job without file upload
@chute.job(upload=False)
async def streaming_job(self):
    """Streaming job that doesn't generate files."""
    while True:
        await self.process_stream()
        await asyncio.sleep(1)

Whether to enable SSH access to the job container.

Purpose:

  • Debug running jobs
  • Interactive development
  • Manual intervention when needed

Examples:

# Job with SSH access for debugging
@chute.job(ssh=True)
async def debug_job(self):
    """Job with SSH access for debugging."""
    await self.complex_operation()

# Regular job without SSH
@chute.job(ssh=False)  # Default
async def regular_job(self):
    """Regular job without SSH access."""
    await self.standard_operation()

SSH Access:

When , port 2202 is automatically added to the job's ports for SSH access.

Job Function Patterns

Basic Job Functions

# Simple compute job
@chute.job()
async def compute_job(self, **job_data):
    """Simple computation job."""
    output_dir = job_data["output_dir"]

    # Perform computation
    result = await self.run_analysis()

    # Save results to output directory
    with open(f"{output_dir}/results.json", "w") as f:
        json.dump(result, f)

    return {"status": "completed", "result": result}

# Job with network ports
@chute.job(ports=[Port(name="api", port=8000, proto="http")])
async def api_server_job(self, **job_data):
    """Run an API server."""
    # Start FastAPI server on port 8000
    app = FastAPI()

    @app.get("/health")
    async def health():
        return {"status": "healthy"}

    await self.start_server(app, port=8000)

# Job with file upload and SSH access
@chute.job(upload=True, ssh=True, timeout=3600)
async def training_job(self, **job_data):
    """Model training job with SSH access for debugging."""
    output_dir = job_data["output_dir"]

    # Train model
    model = await self.train_model()

    # Save model artifacts
    model_path = f"{output_dir}/model.pkl"
    await self.save_model(model, model_path)

    return {"status": "completed", "model_path": model_path}

Scheduled Jobs

# Daily maintenance
@chute.job(
    name="daily_maintenance",
    schedule="0 3 * * *"  # 3 AM daily
)
async def daily_maintenance(self):
    """Daily maintenance tasks."""

    # Clean temporary files
    temp_files_cleaned = await self.cleanup_temp_files()

    # Update model cache
    await self.update_model_cache()

    # Generate daily metrics
    metrics = await self.generate_daily_metrics()

    return {
        "maintenance_completed": True,
        "temp_files_cleaned": temp_files_cleaned,
        "metrics": metrics
    }

# Hourly data sync
@chute.job(
    name="data_sync",
    schedule="0 * * * *",  # Every hour
    retry_count=5
)
async def sync_external_data(self):
    """Sync data from external sources."""

    sources = ["api1", "api2", "database"]
    results = {}

    for source in sources:
        try:
            data = await self.fetch_from_source(source)
            await self.update_local_data(source, data)
            results[source] = {"status": "success", "records": len(data)}
        except Exception as e:
            results[source] = {"status": "error", "error": str(e)}
            self.logger.error(f"Failed to sync from {source}: {e}")

    return results

Long-Running Jobs

# Model training job
@chute.job(
    name="model_training",
    timeout=7200,  # 2 hours
    retry_count=1,
    queue="gpu_intensive"
)
async def train_model(self, training_config: dict):
    """Train machine learning model."""

    self.logger.info("Starting model training")

    try:
        # Prepare training data
        training_data = await self.prepare_training_data(training_config)

        # Train model
        model = await self.train_model_async(training_data, training_config)

        # Validate model
        validation_score = await self.validate_model(model)

        # Save model if validation passes
        if validation_score > training_config.get("min_score", 0.8):
            model_path = await self.save_model(model)

            return {
                "status": "success",
                "validation_score": validation_score,
                "model_path": model_path
            }
        else:
            return {
                "status": "failed",
                "reason": "validation_score_too_low",
                "validation_score": validation_score
            }

    except Exception as e:
        self.logger.error(f"Model training failed: {e}")
        return {
            "status": "error",
            "error": str(e)
        }

# Data processing pipeline
@chute.job(
    name="data_pipeline",
    timeout=3600,  # 1 hour
    queue="data_processing"
)
async def process_data_pipeline(self, pipeline_config: dict):
    """Process data through multiple stages."""

    stages = pipeline_config.get("stages", [])
    results = {}

    for stage_name, stage_config in stages.items():
        self.logger.info(f"Processing stage: {stage_name}")

        try:
            result = await self.process_stage(stage_name, stage_config)
            results[stage_name] = result
        except Exception as e:
            self.logger.error(f"Stage {stage_name} failed: {e}")
            results[stage_name] = {"status": "error", "error": str(e)}

            # Stop pipeline on critical stage failure
            if stage_config.get("critical", False):
                break

    return {
        "pipeline_completed": True,
        "stage_results": results
    }

Error Handling and Recovery

# Job with comprehensive error handling
@chute.job(
    name="robust_job",
    retry_count=3,
    timeout=300
)
async def robust_job_with_error_handling(self, task_id: str):
    """Job with comprehensive error handling."""

    try:
        # Update job status
        await self.update_job_status(task_id, "started")

        # Perform main work
        result = await self.perform_work(task_id)

        # Update success status
        await self.update_job_status(task_id, "completed")

        return {
            "task_id": task_id,
            "status": "success",
            "result": result
        }

    except ValueError as e:
        # Non-retryable error
        self.logger.error(f"Invalid input for task {task_id}: {e}")
        await self.update_job_status(task_id, "failed")

        return {
            "task_id": task_id,
            "status": "failed",
            "error": "invalid_input",
            "message": str(e)
        }

    except ConnectionError as e:
        # Retryable error
        self.logger.warning(f"Connection error for task {task_id}: {e}")
        await self.update_job_status(task_id, "retrying")
        raise  # Will be retried

    except Exception as e:
        # Unexpected error
        self.logger.error(f"Unexpected error for task {task_id}: {e}")
        await self.update_job_status(task_id, "error")

        # Send alert for unexpected errors
        await self.send_error_alert(task_id, str(e))

        return {
            "task_id": task_id,
            "status": "error",
            "error": "unexpected_error"
        }

# Job with graceful shutdown
@chute.job(
    name="graceful_job",
    timeout=1800  # 30 minutes
)
async def job_with_graceful_shutdown(self, batch_data: List[dict]):
    """Job that handles graceful shutdown."""

    processed_items = 0
    failed_items = 0

    try:
        for item in batch_data:
            # Check for shutdown signal
            if self.should_shutdown:
                self.logger.info(f"Graceful shutdown requested, processed {processed_items} items")
                break

            try:
                await self.process_item(item)
                processed_items += 1
            except Exception as e:
                self.logger.error(f"Failed to process item {item.get('id')}: {e}")
                failed_items += 1

    except asyncio.CancelledError:
        self.logger.info(f"Job cancelled, processed {processed_items} items")
        raise

    return {
        "processed": processed_items,
        "failed": failed_items,
        "total": len(batch_data)
    }

Job Scheduling and Management

Triggering Jobs from Endpoints

# Trigger job from API endpoint
@chute.cord(public_api_path="/start_training")
async def start_training_job(self, config: dict):
    """Start a model training job."""

    # Validate configuration
    if not self.validate_training_config(config):
        raise HTTPException(400, "Invalid training configuration")

    # Start background job
    job_id = await self.start_job("model_training", config)

    return {
        "job_id": job_id,
        "status": "started",
        "message": "Model training job started"
    }

# Check job status
@chute.cord(public_api_path="/job_status/{job_id}")
async def get_job_status(self, job_id: str):
    """Get status of a background job."""

    status = await self.get_job_status(job_id)

    if not status:
        raise HTTPException(404, "Job not found")

    return status

# Cancel running job
@chute.cord(public_api_path="/cancel_job/{job_id}", method="DELETE")
async def cancel_job(self, job_id: str):
    """Cancel a running job."""

    success = await self.cancel_job(job_id)

    if not success:
        raise HTTPException(404, "Job not found or cannot be cancelled")

    return {"message": "Job cancelled successfully"}

Job Workflows and Dependencies

# Sequential job workflow
@chute.job(name="workflow_step_1")
async def workflow_step_1(self, workflow_id: str):
    """First step in a workflow."""

    result = await self.perform_step_1(workflow_id)

    # Trigger next step
    await self.start_job("workflow_step_2", {
        "workflow_id": workflow_id,
        "previous_result": result
    })

    return result

@chute.job(name="workflow_step_2")
async def workflow_step_2(self, workflow_id: str, previous_result: dict):
    """Second step in a workflow."""

    result = await self.perform_step_2(workflow_id, previous_result)

    # Trigger final step
    await self.start_job("workflow_step_final", {
        "workflow_id": workflow_id,
        "all_results": [previous_result, result]
    })

    return result

@chute.job(name="workflow_step_final")
async def workflow_final_step(self, workflow_id: str, all_results: List[dict]):
    """Final step in a workflow."""

    final_result = await self.combine_results(all_results)

    # Mark workflow as complete
    await self.complete_workflow(workflow_id, final_result)

    return final_result

Job Monitoring and Metrics

# Job with progress tracking
@chute.job(name="progress_job")
async def job_with_progress(self, items: List[dict]):
    """Job that reports progress."""

    total_items = len(items)
    processed = 0

    for item in items:
        # Process item
        await self.process_item(item)
        processed += 1

        # Update progress
        progress = (processed / total_items) * 100
        await self.update_job_progress(
            job_id=self.current_job_id,
            progress=progress,
            message=f"Processed {processed}/{total_items} items"
        )

    return {"total_processed": processed}

# Job with custom metrics
@chute.job(name="metrics_job")
async def job_with_metrics(self, data: dict):
    """Job that collects custom metrics."""

    start_time = time.time()

    try:
        result = await self.process_data(data)

        # Record success metrics
        processing_time = time.time() - start_time
        await self.record_metric("job_processing_time", processing_time)
        await self.record_metric("job_success_count", 1)

        return result

    except Exception as e:
        # Record failure metrics
        await self.record_metric("job_failure_count", 1)
        await self.record_metric("job_error_type", type(e).__name__)
        raise

# Health check job
@chute.job(
    name="health_check",
    schedule="*/5 * * * *"  # Every 5 minutes
)
async def health_check_job(self):
    """Regular health check job."""

    checks = {
        "database": await self.check_database_health(),
        "external_api": await self.check_external_api_health(),
        "model": await self.check_model_health(),
        "cache": await self.check_cache_health()
    }

    all_healthy = all(checks.values())

    # Update health status
    await self.update_health_status(all_healthy, checks)

    # Send alert if unhealthy
    if not all_healthy:
        await self.send_health_alert(checks)

    return {
        "overall_health": all_healthy,
        "individual_checks": checks,
        "timestamp": datetime.now().isoformat()
    }

Testing Jobs

Unit Testing

import pytest
import asyncio

class TestJobs:
    """Test suite for background jobs."""

    @pytest.mark.asyncio
    async def test_simple_job(self, chute_instance):
        """Test simple job execution."""

        result = await chute_instance.simple_task()

        assert result["status"] == "completed"

    @pytest.mark.asyncio
    async def test_job_with_parameters(self, chute_instance):
        """Test job with parameters."""

        user_id = "test_user"
        data = {"key": "value"}

        result = await chute_instance.process_user_data(user_id, data)

        assert result["user_id"] == user_id
        assert "result" in result

    @pytest.mark.asyncio
    async def test_job_error_handling(self, chute_instance):
        """Test job error handling."""

        # Mock a method to raise an exception
        chute_instance.process_data = AsyncMock(side_effect=ValueError("Test error"))

        result = await chute_instance.robust_job_with_error_handling("test_task")

        assert result["status"] == "failed"
        assert result["error"] == "invalid_input"

    @pytest.mark.asyncio
    async def test_job_timeout(self, chute_instance):
        """Test job timeout handling."""

        with pytest.raises(asyncio.TimeoutError):
            await asyncio.wait_for(
                chute_instance.long_running_job(),
                timeout=1.0  # Short timeout for testing
            )

    def test_job_retry_logic(self, chute_instance):
        """Test job retry mechanism."""

        # This would test the retry logic
        # Implementation depends on your job runner
        pass

Integration Testing

@pytest.mark.integration
async def test_job_workflow(chute_instance):
    """Test complete job workflow."""

    workflow_id = "test_workflow"

    # Start workflow
    result1 = await chute_instance.workflow_step_1(workflow_id)
    assert result1 is not None

    # Second step
    result2 = await chute_instance.workflow_step_2(workflow_id, result1)
    assert result2 is not None

    # Final step
    final_result = await chute_instance.workflow_final_step(
        workflow_id,
        [result1, result2]
    )
    assert final_result is not None

@pytest.mark.integration
async def test_scheduled_job_execution():
    """Test scheduled job execution."""

    # This would test actual job scheduling
    # Implementation depends on your scheduler
    pass

Best Practices

Job Design

  1. Keep Jobs Idempotent
    @chute.job(name="idempotent_job")
    async def idempotent_job(self, item_id: str):
        """Job that can be safely retried."""
    
        # Check if already processed
        if await self.is_already_processed(item_id):
            return {"status": "already_processed", "item_id": item_id}
    
        # Process item
        result = await self.process_item(item_id)
    
        # Mark as processed
        await self.mark_as_processed(item_id)
    
        return result
  2. Use Appropriate Timeouts
    # Short timeout for quick jobs
    @chute.job(timeout=30)
    async def quick_job(self):
        pass
    
    # Longer timeout for complex jobs
    @chute.job(timeout=3600)
    async def complex_job(self):
        pass
  3. Handle Errors Gracefully
    @chute.job(retry_count=3)
    async def resilient_job(self):
        try:
            return await self.do_work()
        except RetriableError:
            raise  # Will be retried
        except PermanentError as e:
            self.logger.error(f"Permanent error: {e}")
            return {"status": "failed", "error": str(e)}
  4. Use Appropriate Queues
    # High priority queue for critical jobs
    @chute.job(queue="critical", priority=10)
    async def critical_job(self):
        pass
    
    # Separate queue for resource-intensive jobs
    @chute.job(queue="gpu_intensive")
    async def gpu_job(self):
        pass
  5. Monitor Job Performance
    @chute.job(name="monitored_job")
    async def monitored_job(self, data: dict):
        start_time = time.time()
    
        try:
            result = await self.process_data(data)
    
            # Record success metrics
            duration = time.time() - start_time
            await self.record_metric("job_duration", duration)
            await self.record_metric("job_success", 1)
    
            return result
    
        except Exception as e:
            await self.record_metric("job_failure", 1)
            raise

This comprehensive guide covers all aspects of the decorator for building robust background job systems in Chutes applications.