Developers

Building Custom Chutes

This guide walks you through creating custom Chutes from scratch, covering everything from basic setup to advanced patterns for production applications.

Overview

Custom Chutes give you complete control over your AI application architecture, allowing you to:

  • Build Complex Logic: Implement sophisticated AI pipelines
  • Custom Dependencies: Use any Python packages or system libraries
  • Multiple Models: Combine different AI models in a single service
  • Advanced Processing: Add preprocessing, postprocessing, and business logic
  • Custom APIs: Design exactly the endpoints you need

Basic Custom Chute Structure

Minimal Example

Here's the simplest possible custom Chute:

from chutes.chute import Chute
from chutes.image import Image

# Create custom image
image = (
    Image(username="myuser", name="my-custom-app", tag="1.0")
    .from_base("python:3.11-slim")
    .run_command("pip install numpy pandas")
)

# Create chute
chute = Chute(
    username="myuser",
    name="my-custom-app",
    image=image
)

@chute.on_startup()
async def initialize(self):
    """Initialize any resources needed by your app."""
    self.message = "Hello from custom chute!"

@chute.cord(public_api_path="/hello", method="GET")
async def hello(self):
    """Simple endpoint that returns a greeting."""
    return {"message": self.message}

Adding Dependencies and Models

from chutes.chute import Chute, NodeSelector
from chutes.image import Image
from pydantic import BaseModel
from typing import List, Optional

# Define input/output schemas
class AnalysisInput(BaseModel):
    text: str
    options: Optional[List[str]] = []

class AnalysisOutput(BaseModel):
    result: str
    confidence: float
    metadata: dict

# Create custom image with AI dependencies
image = (
    Image(username="myuser", name="text-analyzer", tag="1.0")
    .from_base("nvidia/cuda:11.8-devel-ubuntu22.04")
    .run_command("apt update && apt install -y python3 python3-pip")
    .run_command("pip3 install torch transformers tokenizers")
    .run_command("pip3 install numpy pandas scikit-learn")
    .run_command("pip3 install fastapi uvicorn pydantic")
    .set_workdir("/app")
)

# Create chute with GPU support
chute = Chute(
    username="myuser",
    name="text-analyzer",
    image=image,
    node_selector=NodeSelector(
        gpu_count=1,
        min_vram_gb_per_gpu=8
    ),
    concurrency=4
)

@chute.on_startup()
async def initialize_models(self):
    """Load AI models during startup."""
    from transformers import pipeline
    import torch

    # Load sentiment analysis model
    self.sentiment_analyzer = pipeline(
        "sentiment-analysis",
        model="cardiffnlp/twitter-roberta-base-sentiment-latest",
        device=0 if torch.cuda.is_available() else -1
    )

    # Load text classification model
    self.classifier = pipeline(
        "zero-shot-classification",
        model="facebook/bart-large-mnli",
        device=0 if torch.cuda.is_available() else -1
    )

@chute.cord(
    public_api_path="/analyze",
    method="POST",
    input_schema=AnalysisInput,
    output_schema=AnalysisOutput
)
async def analyze_text(self, input_data: AnalysisInput) -> AnalysisOutput:
    """Analyze text with multiple AI models."""

    # Sentiment analysis
    sentiment_result = self.sentiment_analyzer(input_data.text)[0]

    # Classification (if options provided)
    classification_result = None
    if input_data.options:
        classification_result = self.classifier(
            input_data.text,
            input_data.options
        )

    # Combine results
    result = f"Sentiment: {sentiment_result['label']}"
    if classification_result:
        result += f", Category: {classification_result['labels'][0]}"

    return AnalysisOutput(
        result=result,
        confidence=sentiment_result['score'],
        metadata={
            "sentiment": sentiment_result,
            "classification": classification_result
        }
    )

Advanced Patterns

Multi-Model Pipeline

from chutes.chute import Chute, NodeSelector
from chutes.image import Image
from pydantic import BaseModel, Field
from typing import List, Dict, Any
import asyncio

class DocumentInput(BaseModel):
    text: str
    analyze_sentiment: bool = True
    extract_entities: bool = True
    summarize: bool = False
    max_summary_length: int = Field(default=150, ge=50, le=500)

class DocumentOutput(BaseModel):
    original_text: str
    sentiment: Optional[Dict[str, Any]] = None
    entities: Optional[List[Dict[str, Any]]] = None
    summary: Optional[str] = None
    processing_time: float

# Advanced image with multiple AI libraries
image = (
    Image(username="myuser", name="document-processor", tag="2.0")
    .from_base("nvidia/cuda:11.8-devel-ubuntu22.04")
    .run_command("apt update && apt install -y python3 python3-pip git")
    .run_command("pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")
    .run_command("pip3 install transformers tokenizers")
    .run_command("pip3 install spacy")
    .run_command("python3 -m spacy download en_core_web_sm")
    .run_command("pip3 install sumy nltk")
    .run_command("pip3 install asyncio aiofiles")
    .set_workdir("/app")
)

chute = Chute(
    username="myuser",
    name="document-processor",
    image=image,
    node_selector=NodeSelector(
        gpu_count=1,
        min_vram_gb_per_gpu=16
    ),
    concurrency=6
)

@chute.on_startup()
async def initialize_pipeline(self):
    """Initialize multiple AI models for document processing."""
    from transformers import pipeline
    import spacy
    import torch
    import time

    self.device = 0 if torch.cuda.is_available() else -1

    # Load models
    print("Loading sentiment analyzer...")
    self.sentiment_analyzer = pipeline(
        "sentiment-analysis",
        model="cardiffnlp/twitter-roberta-base-sentiment-latest",
        device=self.device
    )

    print("Loading NER model...")
    self.ner_model = pipeline(
        "ner",
        model="dbmdz/bert-large-cased-finetuned-conll03-english",
        device=self.device,
        aggregation_strategy="simple"
    )

    print("Loading summarization model...")
    self.summarizer = pipeline(
        "summarization",
        model="facebook/bart-large-cnn",
        device=self.device
    )

    print("Loading spaCy model...")
    self.nlp = spacy.load("en_core_web_sm")

    print("All models loaded successfully!")

async def analyze_sentiment_async(self, text: str) -> Dict[str, Any]:
    """Asynchronous sentiment analysis."""
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,
        lambda: self.sentiment_analyzer(text)[0]
    )
    return result

async def extract_entities_async(self, text: str) -> List[Dict[str, Any]]:
    """Asynchronous named entity recognition."""
    loop = asyncio.get_event_loop()

    # Use transformers NER
    ner_results = await loop.run_in_executor(
        None,
        lambda: self.ner_model(text)
    )

    # Also use spaCy for additional entity types
    spacy_results = await loop.run_in_executor(
        None,
        lambda: [(ent.text, ent.label_, ent.start_char, ent.end_char)
                for ent in self.nlp(text).ents]
    )

    # Combine results
    entities = []

    # Add transformer results
    for entity in ner_results:
        entities.append({
            "text": entity["word"],
            "label": entity["entity_group"],
            "confidence": entity["score"],
            "start": entity["start"],
            "end": entity["end"],
            "source": "transformers"
        })

    # Add spaCy results
    for text_span, label, start, end in spacy_results:
        entities.append({
            "text": text_span,
            "label": label,
            "confidence": 1.0,  # spaCy doesn't provide confidence
            "start": start,
            "end": end,
            "source": "spacy"
        })

    return entities

async def summarize_async(self, text: str, max_length: int = 150) -> str:
    """Asynchronous text summarization."""
    if len(text.split()) < 50:
        return text  # Too short to summarize

    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,
        lambda: self.summarizer(
            text,
            max_length=max_length,
            min_length=30,
            do_sample=False
        )[0]
    )
    return result["summary_text"]

@chute.cord(
    public_api_path="/process",
    method="POST",
    input_schema=DocumentInput,
    output_schema=DocumentOutput
)
async def process_document(self, input_data: DocumentInput) -> DocumentOutput:
    """Process document with multiple AI models in parallel."""
    import time

    start_time = time.time()

    # Create tasks for parallel processing
    tasks = []

    if input_data.analyze_sentiment:
        tasks.append(analyze_sentiment_async(self, input_data.text))
    else:
        tasks.append(asyncio.create_task(asyncio.sleep(0, result=None)))

    if input_data.extract_entities:
        tasks.append(extract_entities_async(self, input_data.text))
    else:
        tasks.append(asyncio.create_task(asyncio.sleep(0, result=None)))

    if input_data.summarize:
        tasks.append(summarize_async(self, input_data.text, input_data.max_summary_length))
    else:
        tasks.append(asyncio.create_task(asyncio.sleep(0, result=None)))

    # Execute all tasks in parallel
    sentiment_result, entities_result, summary_result = await asyncio.gather(*tasks)

    processing_time = time.time() - start_time

    return DocumentOutput(
        original_text=input_data.text,
        sentiment=sentiment_result,
        entities=entities_result,
        summary=summary_result,
        processing_time=processing_time
    )

State Management and Caching

from chutes.chute import Chute
from chutes.image import Image
import asyncio
from typing import Dict, Any, Optional
import hashlib
import json
import time

class StatefulChute(Chute):
    """Custom chute with built-in state management."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = {}
        self.session_data = {}
        self.request_history = []

# Create image with caching dependencies
image = (
    Image(username="myuser", name="stateful-app", tag="1.0")
    .from_base("python:3.11-slim")
    .run_command("pip install redis aioredis")
    .run_command("pip install sqlalchemy aiosqlite")
    .run_command("pip install fastapi uvicorn pydantic")
)

chute = StatefulChute(
    username="myuser",
    name="stateful-app",
    image=image
)

@chute.on_startup()
async def initialize_storage(self):
    """Initialize storage systems."""
    import aioredis

    # In-memory cache
    self.memory_cache = {}
    self.cache_ttl = {}

    # Try to connect to Redis (optional)
    try:
        self.redis = await aioredis.create_redis_pool('redis://localhost')
        self.has_redis = True
    except:
        self.redis = None
        self.has_redis = False
        print("Redis not available, using memory cache only")

    # Session storage
    self.sessions = {}

    # Request tracking
    self.request_count = 0
    self.last_requests = []

async def get_cached(self, key: str) -> Optional[Any]:
    """Get value from cache (Redis or memory)."""

    # Check memory cache first
    if key in self.memory_cache:
        if key in self.cache_ttl and time.time() > self.cache_ttl[key]:
            del self.memory_cache[key]
            del self.cache_ttl[key]
        else:
            return self.memory_cache[key]

    # Check Redis if available
    if self.has_redis:
        try:
            value = await self.redis.get(key)
            if value:
                return json.loads(value)
        except:
            pass

    return None

async def set_cached(self, key: str, value: Any, ttl: int = 3600):
    """Set value in cache with TTL."""

    # Store in memory cache
    self.memory_cache[key] = value
    self.cache_ttl[key] = time.time() + ttl

    # Store in Redis if available
    if self.has_redis:
        try:
            await self.redis.setex(key, ttl, json.dumps(value))
        except:
            pass

def get_cache_key(self, data: str, operation: str) -> str:
    """Generate cache key from data and operation."""
    content = f"{operation}:{data}"
    return hashlib.md5(content.encode()).hexdigest()

class ProcessingRequest(BaseModel):
    text: str
    operation: str = "analyze"
    use_cache: bool = True
    session_id: Optional[str] = None

@chute.cord(
    public_api_path="/process_cached",
    method="POST",
    input_schema=ProcessingRequest
)
async def process_with_caching(self, input_data: ProcessingRequest) -> Dict[str, Any]:
    """Process request with caching and session management."""

    # Track request
    self.request_count += 1
    request_info = {
        "timestamp": time.time(),
        "operation": input_data.operation,
        "session_id": input_data.session_id
    }
    self.last_requests.append(request_info)

    # Keep only last 100 requests
    if len(self.last_requests) > 100:
        self.last_requests = self.last_requests[-100:]

    # Check cache
    cache_key = get_cache_key(self, input_data.text, input_data.operation)
    if input_data.use_cache:
        cached_result = await get_cached(self, cache_key)
        if cached_result:
            cached_result["from_cache"] = True
            cached_result["request_id"] = self.request_count
            return cached_result

    # Process request (simulate AI processing)
    await asyncio.sleep(0.1)  # Simulate processing time

    result = {
        "text": input_data.text,
        "operation": input_data.operation,
        "result": f"Processed: {input_data.text[:50]}...",
        "timestamp": time.time(),
        "request_id": self.request_count,
        "from_cache": False
    }

    # Store in cache
    if input_data.use_cache:
        await set_cached(self, cache_key, result, ttl=1800)  # 30 minutes

    # Update session data
    if input_data.session_id:
        if input_data.session_id not in self.sessions:
            self.sessions[input_data.session_id] = {
                "created": time.time(),
                "requests": []
            }

        self.sessions[input_data.session_id]["requests"].append({
            "request_id": self.request_count,
            "operation": input_data.operation,
            "timestamp": time.time()
        })

    return result

@chute.cord(public_api_path="/stats", method="GET")
async def get_stats(self) -> Dict[str, Any]:
    """Get service statistics."""

    cache_size = len(self.memory_cache)
    session_count = len(self.sessions)

    # Recent request stats
    recent_requests = [r for r in self.last_requests
                      if time.time() - r["timestamp"] < 3600]  # Last hour

    operation_counts = {}
    for req in recent_requests:
        op = req["operation"]
        operation_counts[op] = operation_counts.get(op, 0) + 1

    return {
        "total_requests": self.request_count,
        "cache_size": cache_size,
        "session_count": session_count,
        "recent_requests_1h": len(recent_requests),
        "operation_counts": operation_counts,
        "has_redis": self.has_redis
    }

Background Jobs and Queues

from chutes.chute import Chute
from chutes.image import Image
from pydantic import BaseModel
from typing import Dict, List, Optional
import asyncio
import uuid
import time
from enum import Enum

class JobStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class JobRequest(BaseModel):
    task_type: str
    data: Dict
    priority: int = Field(default=1, ge=1, le=5)

class JobResponse(BaseModel):
    job_id: str
    status: JobStatus
    created_at: float
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    result: Optional[Dict] = None
    error: Optional[str] = None

# Create image with job processing capabilities
image = (
    Image(username="myuser", name="job-processor", tag="1.0")
    .from_base("python:3.11-slim")
    .run_command("pip install asyncio aiofiles")
    .run_command("pip install celery redis")  # For advanced job queues
    .run_command("pip install fastapi uvicorn pydantic")
)

chute = Chute(
    username="myuser",
    name="job-processor",
    image=image,
    concurrency=8
)

@chute.on_startup()
async def initialize_job_system(self):
    """Initialize job processing system."""

    # Job storage
    self.jobs = {}
    self.job_queue = asyncio.Queue()

    # Job processing
    self.workers = []
    self.max_workers = 4

    # Start background workers
    for i in range(self.max_workers):
        worker = asyncio.create_task(self.job_worker(f"worker-{i}"))
        self.workers.append(worker)

    print(f"Started {self.max_workers} job workers")

async def job_worker(self, worker_name: str):
    """Background worker to process jobs."""

    while True:
        try:
            # Get job from queue
            job_id = await self.job_queue.get()

            if job_id not in self.jobs:
                continue

            job = self.jobs[job_id]

            # Update job status
            job["status"] = JobStatus.RUNNING
            job["started_at"] = time.time()
            job["worker"] = worker_name

            print(f"{worker_name} processing job {job_id}")

            # Process job based on type
            try:
                if job["task_type"] == "text_analysis":
                    result = await self.process_text_analysis(job["data"])
                elif job["task_type"] == "data_processing":
                    result = await self.process_data(job["data"])
                elif job["task_type"] == "file_conversion":
                    result = await self.process_file_conversion(job["data"])
                else:
                    raise ValueError(f"Unknown task type: {job['task_type']}")

                # Job completed successfully
                job["status"] = JobStatus.COMPLETED
                job["completed_at"] = time.time()
                job["result"] = result

            except Exception as e:
                # Job failed
                job["status"] = JobStatus.FAILED
                job["completed_at"] = time.time()
                job["error"] = str(e)
                print(f"Job {job_id} failed: {e}")

            # Mark task as done
            self.job_queue.task_done()

        except Exception as e:
            print(f"Worker {worker_name} error: {e}")
            await asyncio.sleep(1)

async def process_text_analysis(self, data: Dict) -> Dict:
    """Process text analysis job."""
    text = data.get("text", "")

    # Simulate AI processing
    await asyncio.sleep(2)  # Simulate processing time

    return {
        "text": text,
        "length": len(text),
        "word_count": len(text.split()),
        "analysis": "Text analysis completed"
    }

async def process_data(self, data: Dict) -> Dict:
    """Process data processing job."""
    items = data.get("items", [])

    # Simulate data processing
    await asyncio.sleep(len(items) * 0.1)

    return {
        "processed_items": len(items),
        "total_value": sum(item.get("value", 0) for item in items)
    }

async def process_file_conversion(self, data: Dict) -> Dict:
    """Process file conversion job."""
    file_type = data.get("file_type", "")
    target_type = data.get("target_type", "")

    # Simulate file conversion
    await asyncio.sleep(3)

    return {
        "source_type": file_type,
        "target_type": target_type,
        "status": "converted",
        "file_size": "1.2MB"
    }

@chute.cord(
    public_api_path="/jobs",
    method="POST",
    input_schema=JobRequest
)
async def submit_job(self, job_request: JobRequest) -> Dict[str, str]:
    """Submit a new job for processing."""

    job_id = str(uuid.uuid4())

    # Create job record
    job = {
        "id": job_id,
        "task_type": job_request.task_type,
        "data": job_request.data,
        "priority": job_request.priority,
        "status": JobStatus.PENDING,
        "created_at": time.time(),
        "started_at": None,
        "completed_at": None,
        "result": None,
        "error": None,
        "worker": None
    }

    self.jobs[job_id] = job

    # Add to queue
    await self.job_queue.put(job_id)

    return {"job_id": job_id, "status": "submitted"}

@chute.cord(public_api_path="/jobs/{job_id}", method="GET")
async def get_job_status(self, job_id: str) -> JobResponse:
    """Get status of a specific job."""

    if job_id not in self.jobs:
        raise HTTPException(status_code=404, detail="Job not found")

    job = self.jobs[job_id]

    return JobResponse(
        job_id=job["id"],
        status=job["status"],
        created_at=job["created_at"],
        started_at=job["started_at"],
        completed_at=job["completed_at"],
        result=job["result"],
        error=job["error"]
    )

@chute.cord(public_api_path="/jobs", method="GET")
async def list_jobs(self, status: Optional[JobStatus] = None, limit: int = 50) -> Dict:
    """List jobs with optional filtering."""

    jobs = list(self.jobs.values())

    # Filter by status if specified
    if status:
        jobs = [job for job in jobs if job["status"] == status]

    # Sort by creation time (newest first)
    jobs.sort(key=lambda x: x["created_at"], reverse=True)

    # Limit results
    jobs = jobs[:limit]

    # Convert to response format
    job_list = []
    for job in jobs:
        job_list.append(JobResponse(
            job_id=job["id"],
            status=job["status"],
            created_at=job["created_at"],
            started_at=job["started_at"],
            completed_at=job["completed_at"],
            result=job["result"],
            error=job["error"]
        ))

    return {
        "jobs": job_list,
        "total": len(job_list),
        "queue_size": self.job_queue.qsize()
    }

# Background job decorator
@chute.job()
async def cleanup_old_jobs(self):
    """Clean up completed jobs older than 24 hours."""

    cutoff_time = time.time() - (24 * 60 * 60)  # 24 hours ago

    jobs_to_remove = []
    for job_id, job in self.jobs.items():
        if (job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED] and
            job["completed_at"] and job["completed_at"] < cutoff_time):
            jobs_to_remove.append(job_id)

    for job_id in jobs_to_remove:
        del self.jobs[job_id]

    if jobs_to_remove:
        print(f"Cleaned up {len(jobs_to_remove)} old jobs")

Best Practices

1. Error Handling

from fastapi import HTTPException
import traceback
from loguru import logger

@chute.cord(public_api_path="/robust", method="POST")
async def robust_endpoint(self, input_data: Dict) -> Dict:
    """Endpoint with comprehensive error handling."""

    try:
        # Validate input
        if not input_data.get("text"):
            raise HTTPException(
                status_code=400,
                detail="Missing required field: text"
            )

        # Process with timeout
        result = await asyncio.wait_for(
            self.process_text(input_data["text"]),
            timeout=30.0
        )

        return {"result": result, "status": "success"}

    except asyncio.TimeoutError:
        logger.error("Processing timeout")
        raise HTTPException(
            status_code=408,
            detail="Processing timeout - request took too long"
        )

    except ValueError as e:
        logger.error(f"Validation error: {e}")
        raise HTTPException(
            status_code=400,
            detail=f"Invalid input: {str(e)}"
        )

    except Exception as e:
        logger.error(f"Unexpected error: {e}\n{traceback.format_exc()}")
        raise HTTPException(
            status_code=500,
            detail="Internal server error"
        )

2. Resource Management

@chute.on_startup()
async def initialize_with_resource_management(self):
    """Initialize with proper resource management."""
    import torch

    # GPU memory management
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        self.device = torch.device("cuda")

        # Monitor GPU memory
        self.gpu_memory_threshold = 0.9  # 90% usage threshold
    else:
        self.device = torch.device("cpu")

    # Connection pools
    self.session = aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(limit=100)
    )

    # Resource cleanup tracking
    self.cleanup_tasks = []

@chute.on_shutdown()
async def cleanup_resources(self):
    """Clean up resources on shutdown."""

    # Close HTTP session
    if hasattr(self, 'session'):
        await self.session.close()

    # Cancel background tasks
    for task in self.cleanup_tasks:
        task.cancel()

    # Clear GPU memory
    if hasattr(self, 'device') and self.device.type == 'cuda':
        torch.cuda.empty_cache()

    print("Resources cleaned up successfully")

3. Monitoring and Metrics

import time
from collections import defaultdict

@chute.on_startup()
async def initialize_metrics(self):
    """Initialize metrics collection."""

    self.metrics = {
        "request_count": 0,
        "error_count": 0,
        "response_times": [],
        "endpoint_usage": defaultdict(int)
    }

    # Start metrics collection task
    self.metrics_task = asyncio.create_task(self.collect_metrics())

async def collect_metrics(self):
    """Background task to collect and log metrics."""

    while True:
        try:
            await asyncio.sleep(60)  # Collect every minute

            if self.metrics["response_times"]:
                avg_response_time = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
                self.metrics["response_times"] = []  # Reset
            else:
                avg_response_time = 0

            logger.info(f"Metrics - Requests: {self.metrics['request_count']}, "
                       f"Errors: {self.metrics['error_count']}, "
                       f"Avg Response Time: {avg_response_time:.2f}s")

        except Exception as e:
            logger.error(f"Metrics collection error: {e}")

# Decorator for automatic metrics collection
def with_metrics(func):
    """Decorator to automatically collect metrics."""

    async def wrapper(self, *args, **kwargs):
        start_time = time.time()

        try:
            self.metrics["request_count"] += 1
            self.metrics["endpoint_usage"][func.__name__] += 1

            result = await func(self, *args, **kwargs)

            response_time = time.time() - start_time
            self.metrics["response_times"].append(response_time)

            return result

        except Exception as e:
            self.metrics["error_count"] += 1
            raise

    return wrapper

@chute.cord(public_api_path="/monitored", method="POST")
@with_metrics
async def monitored_endpoint(self, input_data: Dict) -> Dict:
    """Endpoint with automatic metrics collection."""

    # Your processing logic here
    await asyncio.sleep(0.1)  # Simulate work

    return {"result": "processed", "input": input_data}

Testing and Development

Local Testing

# test_custom_chute.py
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock

@pytest.mark.asyncio
async def test_chute_initialization():
    """Test chute startup."""

    # Mock the chute
    chute_mock = Mock()
    chute_mock.initialize_models = AsyncMock()

    # Test initialization
    await chute_mock.initialize_models()

    assert chute_mock.initialize_models.called

@pytest.mark.asyncio
async def test_endpoint_functionality():
    """Test endpoint logic."""

    # Create test instance
    chute_instance = Mock()
    chute_instance.process_text = AsyncMock(return_value="processed result")

    # Test data
    test_input = {"text": "test input"}

    # Call function
    result = await chute_instance.process_text(test_input["text"])

    assert result == "processed result"

# Run tests
# pytest test_custom_chute.py -v

Development Workflow

# 1. Create and test locally
python my_chute.py  # Test locally first

# 2. Build image
chutes build my-custom-app:chute --wait

# 3. Deploy to staging
chutes deploy my-custom-app:chute --wait

# 4. Test deployed service
curl https://myuser-my-custom-app.chutes.ai/hello

# 5. Monitor and iterate
chutes chutes logs my-custom-app
chutes chutes metrics my-custom-app

Advanced Topics

1. Custom Middleware

from fastapi import Request, Response
import time

@chute.middleware("http")
async def add_process_time_header(request: Request, call_next):
    """Add processing time header to all responses."""

    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time

    response.headers["X-Process-Time"] = str(process_time)
    return response

2. Custom Dependencies

from fastapi import Depends, HTTPException

async def verify_api_key(api_key: str = Header(None)) -> str:
    """Verify API key dependency."""

    if not api_key or api_key != "your-secret-key":
        raise HTTPException(status_code=401, detail="Invalid API key")

    return api_key

@chute.cord(public_api_path="/secure", method="POST")
async def secure_endpoint(
    self,
    input_data: Dict,
    api_key: str = Depends(verify_api_key)
) -> Dict:
    """Secure endpoint requiring API key."""

    return {"message": "Access granted", "data": input_data}

3. WebSocket Support

from fastapi import WebSocket

@chute.websocket("/ws")
async def websocket_endpoint(self, websocket: WebSocket):
    """WebSocket endpoint for real-time communication."""

    await websocket.accept()

    try:
        while True:
            # Receive message
            data = await websocket.receive_text()

            # Process message
            response = await self.process_message(data)

            # Send response
            await websocket.send_text(response)

    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

Next Steps

  • Production Deployment: Scale and monitor custom chutes
  • Advanced Patterns: Implement microservices architectures
  • Integration: Connect with external APIs and databases
  • Optimization: Profile and optimize performance

For more advanced topics, see: