Developers

Job Decorator API Reference

The decorator is used to create long-running jobs or server rentals in Chutes applications. Jobs are different from API endpoints (cords) and are designed for tasks that need persistent compute resources, specific network ports, or long-running processes.

Decorator Signature

@chute.job(
    ports: list[Port] = [],
    timeout: Optional[int] = None,
    upload: bool = True,
    ssh: bool = False
)

Port Class

Jobs can expose network ports for external access using the class:

from chutes.chute.job import Port

port = Port(
    name: str,      # Port identifier (lowercase letters + optional numbers)
    port: int,      # Port number (2202 or 8002-65535)
    proto: str      # Protocol: "tcp", "udp", or "http"
)

Port Rules

  • Port must be 2202 (reserved for SSH) or in range 8002-65535
  • Each port must have a unique number within the job
  • Name must match pattern (e.g., "web", "api", "metrics1")

Parameters

List of network ports to expose for the job.

Examples:

from chutes.chute.job import Port

# Single HTTP port
@chute.job(ports=[Port(name="web", port=8080, proto="http")])
async def web_server_job(self, **job_data):
    pass

# Multiple ports
@chute.job(ports=[
    Port(name="api", port=8000, proto="http"),
    Port(name="metrics", port=9090, proto="http"),
    Port(name="grpc", port=8001, proto="tcp")
])
async def multi_port_job(self, **job_data):
    pass

# No ports (compute-only job)
@chute.job()
async def compute_job(self, **job_data):
    pass

Maximum execution time for the job in seconds.

Constraints:

  • If specified, must be between 30 seconds and 86400 seconds (24 hours)
  • means no timeout (job can run indefinitely - useful for server rentals)

Examples:

# Job with 1 hour timeout
@chute.job(timeout=3600)
async def training_job(self, **job_data):
    """Model training with 1 hour limit."""
    await self.train_model()

# Long-running server with no timeout
@chute.job(timeout=None)
async def server_job(self, **job_data):
    """Persistent server process."""
    await self.start_server()

# Short batch job (5 minutes)
@chute.job(timeout=300)
async def quick_batch_job(self, **job_data):
    """Quick data processing job."""
    await self.process_batch()

Whether to automatically upload output files generated by the job.

Purpose:

  • Automatically collects and uploads files created in the job's output directory
  • Useful for jobs that generate artifacts, model weights, logs, or result files

Examples:

# Job with file upload (default)
@chute.job(upload=True)
async def generate_report_job(self, **job_data):
    """Generate report and upload results."""
    output_dir = job_data["output_dir"]
    
    # Files written to output_dir will be automatically uploaded
    with open(f"{output_dir}/report.pdf", "wb") as f:
        f.write(await self.generate_pdf_report())
    
    with open(f"{output_dir}/results.json", "w") as f:
        json.dump(self.results, f)

# Job without file upload
@chute.job(upload=False)
async def streaming_job(self, **job_data):
    """Streaming job that doesn't generate files."""
    while not self.cancel_event.is_set():
        await self.process_stream()
        await asyncio.sleep(1)

Whether to enable SSH access to the job container.

Purpose:

  • Debug running jobs
  • Interactive development
  • Manual intervention when needed

Examples:

# Job with SSH access for debugging
@chute.job(ssh=True, timeout=7200)
async def debug_job(self, **job_data):
    """Job with SSH access for debugging."""
    # SSH key should be provided in job_data["_ssh_public_key"]
    await self.complex_operation()

# Regular job without SSH
@chute.job(ssh=False)  # Default
async def regular_job(self, **job_data):
    """Regular job without SSH access."""
    await self.standard_operation()

Note: When , port 2202 is automatically added to the job's ports for SSH access.

Job Function Signature

Job functions receive keyword arguments containing job data:

@chute.job()
async def my_job(self, **job_data):
    # job_data contains:
    # - "output_dir": Directory path for output files
    # - "_ssh_public_key": SSH public key (if ssh=True and provided)
    # - Any other data passed when starting the job
    
    output_dir = job_data["output_dir"]
    # Your job logic here
    return {"status": "completed"}

Job Lifecycle

Cancellation Support

Jobs have access to a cancel event that can be used to gracefully handle cancellation:

@chute.job(timeout=3600)
async def cancellable_job(self, **job_data):
    """Job that handles cancellation gracefully."""
    
    for i in range(100):
        # Check for cancellation
        if self.cancel_event.is_set():
            print("Job cancelled, cleaning up...")
            break
        
        await self.process_step(i)
        await asyncio.sleep(1)
    
    return {"processed_steps": i}

Output Directory

Jobs receive an in where they can write files:

@chute.job(upload=True)
async def job_with_outputs(self, **job_data):
    output_dir = job_data["output_dir"]

    # Write output files
    model_path = f"{output_dir}/model.pt"
    torch.save(self.model.state_dict(), model_path)

    # Write logs
    with open(f"{output_dir}/training_log.txt", "w") as f:
        f.write("\n".join(self.logs))
    
    # Files in output_dir are automatically uploaded when job completes
    return {"model_path": model_path}

Complete Examples

Model Training Job

from chutes.chute import Chute, NodeSelector
from chutes.chute.job import Port
from chutes.image import Image

image = (
    Image(username="myuser", name="training", tag="1.0")
    .from_base("parachutes/python:3.12")
    .run_command("pip install torch transformers")
)

chute = Chute(
    username="myuser",
    name="model-trainer",
    image=image,
    node_selector=NodeSelector(gpu_count=1, min_vram_gb_per_gpu=24)
)

@chute.on_startup()
async def setup(self):
    import torch
    self.device = "cuda" if torch.cuda.is_available() else "cpu"

@chute.job(
    timeout=7200,  # 2 hours
    upload=True,
    ssh=True  # Enable SSH for debugging
)
async def train_model(self, **job_data):
    """Train a model and save checkpoints."""
    import torch
    from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer
    
    output_dir = job_data["output_dir"]
    model_name = job_data.get("model_name", "gpt2")
    epochs = job_data.get("epochs", 3)
    
    print(f"Loading model: {model_name}")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    model.to(self.device)
    
    # Training loop
    for epoch in range(epochs):
        if self.cancel_event.is_set():
            print("Training cancelled")
            break
        
        print(f"Epoch {epoch + 1}/{epochs}")
        # ... training logic ...
        
        # Save checkpoint
        checkpoint_path = f"{output_dir}/checkpoint_epoch_{epoch}.pt"
        torch.save(model.state_dict(), checkpoint_path)
    
    # Save final model
    final_path = f"{output_dir}/final_model.pt"
    torch.save(model.state_dict(), final_path)

    return {
        "status": "completed",
        "epochs_completed": epoch + 1,
        "model_path": final_path
    }

Web Server Job

from chutes.chute.job import Port

@chute.job(
    ports=[
        Port(name="web", port=8080, proto="http"),
        Port(name="metrics", port=9090, proto="http")
    ],
    timeout=None,  # Run indefinitely
    upload=False
)
async def web_server_job(self, **job_data):
    """Run a web server as a long-running job."""
    from fastapi import FastAPI
    import uvicorn
    
    app = FastAPI()
    
    @app.get("/")
    async def root():
        return {"message": "Hello from job!"}
    
    @app.get("/health")
    async def health():
        return {"status": "healthy"}
    
    config = uvicorn.Config(app, host="0.0.0.0", port=8080)
    server = uvicorn.Server(config)
    
    # Run until cancelled
    await server.serve()

Batch Processing Job

@chute.job(timeout=1800, upload=True)
async def batch_processing_job(self, **job_data):
    """Process a batch of items."""
    output_dir = job_data["output_dir"]
    items = job_data.get("items", [])
    
    results = []
    processed = 0
    failed = 0

    for item in items:
        if self.cancel_event.is_set():
            print(f"Cancelled after processing {processed} items")
            break

        try:
            result = await self.process_item(item)
            results.append(result)
            processed += 1
        except Exception as e:
            print(f"Failed to process item: {e}")
            failed += 1
    
    # Save results
    with open(f"{output_dir}/results.json", "w") as f:
        json.dump(results, f)

    return {
        "status": "completed",
        "processed": processed,
        "failed": failed,
        "total": len(items)
    }

Error Handling

Jobs should handle errors gracefully and return appropriate status:

@chute.job(timeout=3600)
async def robust_job(self, **job_data):
    """Job with comprehensive error handling."""
    output_dir = job_data["output_dir"]

    try:
        # Perform main work
        result = await self.do_work()

        # Save output
        with open(f"{output_dir}/output.json", "w") as f:
            json.dump(result, f)

        return {
            "status": "completed",
            "result": result
        }

    except asyncio.CancelledError:
        # Handle cancellation
        print("Job was cancelled")
        raise
    
    except ValueError as e:
        # Handle known errors
        return {
            "status": "failed",
            "error": "invalid_input",
            "message": str(e)
        }

    except Exception as e:
        # Handle unexpected errors
        print(f"Unexpected error: {e}")
        
        # Save error log
        with open(f"{output_dir}/error.log", "w") as f:
            f.write(f"Error: {e}\n")
            import traceback
            f.write(traceback.format_exc())

        return {
            "status": "error",
            "error": str(e)
        }

Best Practices

1. Always Check for Cancellation

@chute.job(timeout=3600)
async def long_job(self, **job_data):
    for i in range(1000):
        if self.cancel_event.is_set():
            return {"status": "cancelled", "progress": i}
        await self.process_step(i)

2. Use Appropriate Timeouts

# Short job - use explicit timeout
@chute.job(timeout=300)
async def quick_job(self, **job_data):
    pass

# Long training - longer timeout
@chute.job(timeout=86400)  # 24 hours
async def training_job(self, **job_data):
    pass

# Server rental - no timeout
@chute.job(timeout=None)
async def server_job(self, **job_data):
    pass

3. Write Important Data to Output Directory

@chute.job(upload=True)
async def job_with_checkpoints(self, **job_data):
    output_dir = job_data["output_dir"]
    
    for epoch in range(100):
        # Train...
        
        # Save checkpoint periodically
        if epoch % 10 == 0:
            torch.save(model, f"{output_dir}/checkpoint_{epoch}.pt")

4. Use SSH for Debugging Complex Jobs

@chute.job(ssh=True, timeout=7200)
async def debuggable_job(self, **job_data):
    """Enable SSH so you can connect and debug if needed."""
        pass

5. Return Meaningful Status

@chute.job()
async def well_documented_job(self, **job_data):
    return {
        "status": "completed",
        "items_processed": 150,
        "errors": 2,
        "duration_seconds": 342,
        "output_files": ["results.json", "model.pt"]
    }

See Also