This example shows how to create streaming API endpoints that send results in real-time as they become available. Perfect for long-running AI tasks where you want to show progress.
Real-time Text Streaming
Real-time text streaming allows you to process and return results as they become available, providing immediate feedback to users instead of waiting for all processing to complete. This is especially valuable for:
Long-running AI operations - Show progress during model inference
Interactive applications - Provide immediate feedback as users type
Large text processing - Stream results chunk by chunk
Multi-step workflows - Display each processing step as it completes
What We'll Build
A text processing service that streams results as they're computed:
🌊 Streaming responses with real-time updates
📊 Progress tracking for long operations
🔄 Multiple processing steps shown incrementally
📝 Chunked text processing for large inputs
Complete Example
streaming_processor.py
import asyncio
import time
import json
from typing import AsyncGenerator
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from pydantic import BaseModel, Field
from fastapi import HTTPException
from chutes.chute import Chute, NodeSelector
from chutes.image import Image
# === INPUT SCHEMAS ===classStreamingTextInput(BaseModel):
text: str = Field(..., min_length=10, max_length=5000)
include_sentiment: bool = Field(True, description="Include sentiment analysis")
include_summary: bool = Field(True, description="Include text summarization")
include_entities: bool = Field(False, description="Include named entity recognition")
chunk_size: int = Field(200, ge=50, le=500, description="Text chunk size for processing")
# === CUSTOM IMAGE ===
image = (
Image(username="myuser", name="streaming-processor", tag="1.0")
.from_base("nvidia/cuda:12.4.1-runtime-ubuntu22.04")
.with_python("3.11")
.run_command("pip install torch>=2.4.0 transformers>=4.44.0 accelerate>=0.33.0 spacy>=3.7.0")
.run_command("python -m spacy download en_core_web_sm")
.with_env("TRANSFORMERS_CACHE", "/app/models")
)
# === CHUTE DEFINITION ===
chute = Chute(
username="myuser",
name="streaming-processor",
image=image,
tagline="Real-time streaming text processing",
readme="""
# Streaming Text Processor
Process text with real-time streaming results.
## Usage
```bash
curl -X POST https://myuser-streaming-processor.chutes.ai/process-stream \\
-H "Content-Type: application/json" \\
-d '{"text": "Your long text here..."}' \\
--no-buffer
Each response line contains JSON with the current processing step.
""",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=12
)
)
=== MODEL LOADING ===
@chute.on_startup()
async def load_models(self):
"""Load all models needed for processing."""
print("Loading models for streaming processing...")
import torch
# Sentiment analysis model
sentiment_model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
self.sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name)
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name)
# Summarization pipeline
self.summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0 if torch.cuda.is_available() else -1
)
# Named entity recognition (spaCy)
import spacy
self.nlp = spacy.load("en_core_web_sm")
# Move sentiment model to GPU
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.sentiment_model.to(self.device)
print(f"All models loaded on device: {self.device}")
=== STREAMING ENDPOINTS ===
@chute.cord(
public_api_path="/process-stream",
method="POST",
input_schema=StreamingTextInput,
stream=True,
output_content_type="application/json"
)
async def process_text_stream(self, data: StreamingTextInput) -> AsyncGenerator[dict, None]:
"""Process text with streaming results."""
start_time = time.time()
# Initial status
yield {
"status": "started",
"message": "Beginning text processing...",
"timestamp": time.time(),
"text_length": len(data.text)
}
# Step 1: Text chunking
yield {"status": "chunking", "message": "Splitting text into chunks..."}
chunks = []
text = data.text
for i in range(0, len(text), data.chunk_size):
chunk = text[i:i + data.chunk_size]
chunks.append(chunk)
yield {
"status": "chunked",
"message": f"Split into {len(chunks)} chunks",
"chunks": len(chunks)
}
# Step 2: Sentiment Analysis (if requested)
if data.include_sentiment:
yield {"status": "sentiment_processing", "message": "Analyzing sentiment..."}
import torch
try:
sentiments = []
for i, chunk in enumerate(chunks):
# Process chunk
inputs = self.sentiment_tokenizer(
chunk, return_tensors="pt", truncation=True, max_length=512
).to(self.device)
with torch.no_grad():
outputs = self.sentiment_model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Get sentiment
labels = ["NEGATIVE", "NEUTRAL", "POSITIVE"]
predicted_class = predictions.argmax().item()
confidence = predictions[0][predicted_class].item()
chunk_sentiment = {
"chunk": i + 1,
"sentiment": labels[predicted_class],
"confidence": confidence
}
sentiments.append(chunk_sentiment)
# Stream progress
yield {
"status": "sentiment_progress",
"progress": (i + 1) / len(chunks),
"chunk_result": chunk_sentiment
}
# Small delay to show streaming effect
await asyncio.sleep(0.1)
# Overall sentiment
positive_chunks = sum(1 for s in sentiments if s["sentiment"] == "POSITIVE")
negative_chunks = sum(1 for s in sentiments if s["sentiment"] == "NEGATIVE")
if positive_chunks > negative_chunks:
overall_sentiment = "POSITIVE"
elif negative_chunks > positive_chunks:
overall_sentiment = "NEGATIVE"
else:
overall_sentiment = "NEUTRAL"
yield {
"status": "sentiment_complete",
"overall_sentiment": overall_sentiment,
"chunk_sentiments": sentiments,
"positive_chunks": positive_chunks,
"negative_chunks": negative_chunks
}
except Exception as e:
yield {"status": "sentiment_error", "error": str(e)}
# Step 3: Summarization (if requested)
if data.include_summary and len(data.text) > 100:
yield {"status": "summarization_processing", "message": "Generating summary..."}
try:
# Summarize the full text
summary_result = self.summarizer(
data.text,
max_length=130,
min_length=30,
do_sample=False
)
summary = summary_result[0]['summary_text']
yield {
"status": "summarization_complete",
"summary": summary,
"compression_ratio": len(summary) / len(data.text)
}
except Exception as e:
yield {"status": "summarization_error", "error": str(e)}
# Step 4: Named Entity Recognition (if requested)
if data.include_entities:
yield {"status": "entities_processing", "message": "Extracting entities..."}
import spacy
try:
doc = self.nlp(data.text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"description": spacy.explain(ent.label_),
"start": ent.start_char,
"end": ent.end_char
})
# Group by entity type
entity_types = {}
for ent in entities:
label = ent["label"]
if label not in entity_types:
entity_types[label] = []
entity_types[label].append(ent)
yield {
"status": "entities_complete",
"entities": entities,
"entity_types": entity_types,
"total_entities": len(entities)
}
except Exception as e:
yield {"status": "entities_error", "error": str(e)}
# Final status
total_time = time.time() - start_time
yield {
"status": "completed",
"message": "All processing complete!",
"total_processing_time": total_time,
"timestamp": time.time()
}
@chute.cord(
public_api_path="/generate-stream",
method="POST",
stream=True,
output_content_type="text/plain"
)
async def generate_text_stream(self, prompt: str) -> AsyncGenerator[str, None]:
"""Simple text generation with streaming (simulated)."""
# Simulate text generation word by word
words = [
"Artificial", "intelligence", "is", "revolutionizing", "how", "we",
"process", "and", "understand", "text", "data.", "With", "advanced",
"models", "like", "transformers,", "we", "can", "perform", "complex",
"natural", "language", "tasks", "with", "unprecedented", "accuracy."
]
yield f"Prompt: {prompt}\n\nGenerated text: "
for word in words:
yield word + " "
await asyncio.sleep(0.2) # Simulate processing time
yield "\n\n[Generation complete]"
=== REGULAR (NON-STREAMING) ENDPOINT FOR COMPARISON ===
@chute.cord(
public_api_path="/process-batch",
method="POST",
input_schema=StreamingTextInput,
output_content_type="application/json"
)
async def process_text_batch(self, data: StreamingTextInput) -> dict:
"""Non-streaming version that returns all results at once."""
import torch
## Testing the Streaming API
### Using curl
```bash
# Test streaming processing
curl -X POST https://myuser-streaming-processor.chutes.ai/process-stream \
-H "Content-Type: application/json" \
-d '{
"text": "I love using this amazing new technology! It has completely transformed how I work. The artificial intelligence capabilities are impressive and the user interface is intuitive. However, there are still some areas that could be improved.",
"include_sentiment": true,
"include_summary": true,
"include_entities": true,
"chunk_size": 100
}' \
--no-buffer
Using Python
import asyncio
import aiohttp
import json
asyncdefstream_text_processing():
"""Test the streaming text processing endpoint."""
payload = {
"text": """
Artificial intelligence is rapidly transforming industries across the globe.
Companies like Google, Microsoft, and OpenAI are leading the charge with
innovative models and applications. The technology is being used in healthcare,
finance, education, and many other sectors. While the potential is enormous,
there are also important ethical considerations that need to be addressed.
""",
"include_sentiment": True,
"include_summary": True,
"include_entities": True,
"chunk_size": 150
}
asyncwith aiohttp.ClientSession() as session:
url = "https://myuser-streaming-processor.chutes.ai/process-stream"asyncwith session.post(url, json=payload) as response:
asyncfor line in response.content:
if line:
try:
data = json.loads(line.decode())
print(f"[{data['status']}] {data.get('message', '')}")
# Handle specific result typesif data['status'] == 'sentiment_complete':
print(f"Overall sentiment: {data['overall_sentiment']}")
elif data['status'] == 'summarization_complete':
print(f"Summary: {data['summary']}")
elif data['status'] == 'entities_complete':
print(f"Found {data['total_entities']} entities")
except json.JSONDecodeError:
continue# Run the test
asyncio.run(stream_text_processing())
asyncdefmy_stream() -> AsyncGenerator[dict, None]:
for i inrange(10):
yield {"step": i, "data": f"Processing item {i}"}
await asyncio.sleep(0.1) # Simulate work
2. Progress Tracking
total_items = len(items)
for i, item inenumerate(items):
# Process item
result = await process_item(item)
# Yield progressyield {
"status": "processing",
"progress": (i + 1) / total_items,
"current_item": i + 1,
"total_items": total_items,
"result": result
}
3. Error Handling in Streams
try:
result = await risky_operation()
yield {"status": "success", "result": result}
except Exception as e:
yield {"status": "error", "error": str(e)}
# Continue with other operations if possible
4. Multiple Content Types
# JSON streaming@chute.cord(stream=True, output_content_type="application/json")asyncdefjson_stream(self):
yield {"message": "JSON data"}
# Plain text streaming@chute.cord(stream=True, output_content_type="text/plain")asyncdeftext_stream(self):
yield"Plain text data\n"
Performance Considerations
1. Chunk Size Optimization
# Too small: many HTTP chunks, overhead
chunk_size = 10# Too large: delayed responses, memory usage
chunk_size = 10000# Just right: balance responsiveness and efficiency
chunk_size = 200