This example shows how to create streaming API endpoints that send results in real-time as they become available. Perfect for long-running AI tasks where you want to show progress.
Real-time Text Streaming {#real-time-text-streaming}
Real-time text streaming allows you to process and return results as they become available, providing immediate feedback to users instead of waiting for all processing to complete. This is especially valuable for:
Long-running AI operations - Show progress during model inference
Interactive applications - Provide immediate feedback as users type
Large text processing - Stream results chunk by chunk
Multi-step workflows - Display each processing step as it completes
What We'll Build
A text processing service that streams results as they're computed:
🌊 Streaming responses with real-time updates
📊 Progress tracking for long operations
🔄 Multiple processing steps shown incrementally
📝 Chunked text processing for large inputs
Complete Example
streaming_processor.py
import asyncio
import time
import json
from typing import AsyncGenerator
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from pydantic import BaseModel, Field
from fastapi import HTTPException
from chutes.chute import Chute, NodeSelector
from chutes.image import Image
# === INPUT SCHEMAS ===classStreamingTextInput(BaseModel):
text: str = Field(..., min_length=10, max_length=5000)
include_sentiment: bool = Field(True, description="Include sentiment analysis")
include_summary: bool = Field(True, description="Include text summarization")
include_entities: bool = Field(False, description="Include named entity recognition")
chunk_size: int = Field(200, ge=50, le=500, description="Text chunk size for processing")
# === CUSTOM IMAGE ===
image = (
Image(username="myuser", name="streaming-processor", tag="1.0")
.from_base("nvidia/cuda:12.2-runtime-ubuntu22.04")
.with_python("3.11")
.run_command("pip install torch==2.1.0 transformers==4.30.0 accelerate==0.20.0 spacy>=3.6.0")
.run_command("python -m spacy download en_core_web_sm")
.with_env("TRANSFORMERS_CACHE", "/app/models")
)
# === CHUTE DEFINITION ===
chute = Chute(
username="myuser",
name="streaming-processor",
image=image,
tagline="Real-time streaming text processing",
readme="""
# Streaming Text Processor
Process text with real-time streaming results.
## Usage
```bash
curl -X POST https://myuser-streaming-processor.chutes.ai/process-stream \\
-H "Content-Type: application/json" \\
-d '{"text": "Your long text here..."}' \\
--no-buffer
```
Each response line contains JSON with the current processing step.
""",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=12
)
)
# === MODEL LOADING ===@chute.on_startup()asyncdefload_models(self):
"""Load all models needed for processing."""print("Loading models for streaming processing...")
# Sentiment analysis model
sentiment_model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"self.sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name)
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name)
# Summarization pipelineself.summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0if torch.cuda.is_available() else -1
)
# Named entity recognition (spaCy)import spacy
self.nlp = spacy.load("en_core_web_sm")
# Move sentiment model to GPUself.device = "cuda"if torch.cuda.is_available() else"cpu"self.sentiment_model.to(self.device)
print(f"All models loaded on device: {self.device}")
# === STREAMING ENDPOINTS ===@chute.cord(
public_api_path="/process-stream",
method="POST",
input_schema=StreamingTextInput,
stream=True,
output_content_type="application/json")asyncdefprocess_text_stream(self, data: StreamingTextInput) -> AsyncGenerator[dict, None]:
"""Process text with streaming results."""
start_time = time.time()
# Initial statusyield {
"status": "started",
"message": "Beginning text processing...",
"timestamp": time.time(),
"text_length": len(data.text)
}
# Step 1: Text chunkingyield {"status": "chunking", "message": "Splitting text into chunks..."}
chunks = []
text = data.text
for i inrange(0, len(text), data.chunk_size):
chunk = text[i:i + data.chunk_size]
chunks.append(chunk)
yield {
"status": "chunked",
"message": f"Split into {len(chunks)} chunks",
"chunks": len(chunks)
}
# Step 2: Sentiment Analysis (if requested)if data.include_sentiment:
yield {"status": "sentiment_processing", "message": "Analyzing sentiment..."}
try:
sentiments = []
for i, chunk inenumerate(chunks):
# Process chunk
inputs = self.sentiment_tokenizer(
chunk, return_tensors="pt", truncation=True, max_length=512
).to(self.device)
with torch.no_grad():
outputs = self.sentiment_model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Get sentiment
labels = ["NEGATIVE", "NEUTRAL", "POSITIVE"]
predicted_class = predictions.argmax().item()
confidence = predictions[0][predicted_class].item()
chunk_sentiment = {
"chunk": i + 1,
"sentiment": labels[predicted_class],
"confidence": confidence
}
sentiments.append(chunk_sentiment)
# Stream progressyield {
"status": "sentiment_progress",
"progress": (i + 1) / len(chunks),
"chunk_result": chunk_sentiment
}
# Small delay to show streaming effectawait asyncio.sleep(0.1)
# Overall sentiment
positive_chunks = sum(1for s in sentiments if s["sentiment"] == "POSITIVE")
negative_chunks = sum(1for s in sentiments if s["sentiment"] == "NEGATIVE")
if positive_chunks > negative_chunks:
overall_sentiment = "POSITIVE"elif negative_chunks > positive_chunks:
overall_sentiment = "NEGATIVE"else:
overall_sentiment = "NEUTRAL"yield {
"status": "sentiment_complete",
"overall_sentiment": overall_sentiment,
"chunk_sentiments": sentiments,
"positive_chunks": positive_chunks,
"negative_chunks": negative_chunks
}
except Exception as e:
yield {"status": "sentiment_error", "error": str(e)}
# Step 3: Summarization (if requested)if data.include_summary andlen(data.text) > 100:
yield {"status": "summarization_processing", "message": "Generating summary..."}
try:
# Summarize the full text
summary_result = self.summarizer(
data.text,
max_length=130,
min_length=30,
do_sample=False
)
summary = summary_result[0]['summary_text']
yield {
"status": "summarization_complete",
"summary": summary,
"compression_ratio": len(summary) / len(data.text)
}
except Exception as e:
yield {"status": "summarization_error", "error": str(e)}
# Step 4: Named Entity Recognition (if requested)if data.include_entities:
yield {"status": "entities_processing", "message": "Extracting entities..."}
try:
doc = self.nlp(data.text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"description": spacy.explain(ent.label_),
"start": ent.start_char,
"end": ent.end_char
})
# Group by entity type
entity_types = {}
for ent in entities:
label = ent["label"]
if label notin entity_types:
entity_types[label] = []
entity_types[label].append(ent)
yield {
"status": "entities_complete",
"entities": entities,
"entity_types": entity_types,
"total_entities": len(entities)
}
except Exception as e:
yield {"status": "entities_error", "error": str(e)}
# Final status
total_time = time.time() - start_time
yield {
"status": "completed",
"message": "All processing complete!",
"total_processing_time": total_time,
"timestamp": time.time()
}
@chute.cord(
public_api_path="/generate-stream",
method="POST",
stream=True,
output_content_type="text/plain")asyncdefgenerate_text_stream(self, prompt: str) -> AsyncGenerator[str, None]:
"""Simple text generation with streaming (simulated)."""# Simulate text generation word by word
words = [
"Artificial", "intelligence", "is", "revolutionizing", "how", "we",
"process", "and", "understand", "text", "data.", "With", "advanced",
"models", "like", "transformers,", "we", "can", "perform", "complex",
"natural", "language", "tasks", "with", "unprecedented", "accuracy."
]
yieldf"Prompt: {prompt}\n\nGenerated text: "for word in words:
yield word + " "await asyncio.sleep(0.2) # Simulate processing timeyield"\n\n[Generation complete]"# === REGULAR (NON-STREAMING) ENDPOINT FOR COMPARISON ===@chute.cord(
public_api_path="/process-batch",
method="POST",
input_schema=StreamingTextInput,
output_content_type="application/json")asyncdefprocess_text_batch(self, data: StreamingTextInput) -> dict:
"""Non-streaming version that returns all results at once."""
start_time = time.time()
results = {}
# Sentiment analysisif data.include_sentiment:
inputs = self.sentiment_tokenizer(
data.text, return_tensors="pt", truncation=True, max_length=512
).to(self.device)
with torch.no_grad():
outputs = self.sentiment_model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
labels = ["NEGATIVE", "NEUTRAL", "POSITIVE"]
predicted_class = predictions.argmax().item()
confidence = predictions[0][predicted_class].item()
results["sentiment"] = {
"label": labels[predicted_class],
"confidence": confidence
}
# Summarizationif data.include_summary andlen(data.text) > 100:
summary_result = self.summarizer(data.text, max_length=130, min_length=30)
results["summary"] = summary_result[0]['summary_text']
# Entitiesif data.include_entities:
doc = self.nlp(data.text)
entities = [{"text": ent.text, "label": ent.label_} for ent in doc.ents]
results["entities"] = entities
results["processing_time"] = time.time() - start_time
return results
Testing the Streaming API
Using curl
# Test streaming processing
curl -X POST https://myuser-streaming-processor.chutes.ai/process-stream \
-H "Content-Type: application/json" \
-d '{
"text": "I love using this amazing new technology! It has completely transformed how I work. The artificial intelligence capabilities are impressive and the user interface is intuitive. However, there are still some areas that could be improved.",
"include_sentiment": true,
"include_summary": true,
"include_entities": true,
"chunk_size": 100
}' \
--no-buffer
Using Python
import asyncio
import aiohttp
import json
asyncdefstream_text_processing():
"""Test the streaming text processing endpoint."""
payload = {
"text": """
Artificial intelligence is rapidly transforming industries across the globe.
Companies like Google, Microsoft, and OpenAI are leading the charge with
innovative models and applications. The technology is being used in healthcare,
finance, education, and many other sectors. While the potential is enormous,
there are also important ethical considerations that need to be addressed.
""",
"include_sentiment": True,
"include_summary": True,
"include_entities": True,
"chunk_size": 150
}
asyncwith aiohttp.ClientSession() as session:
url = "https://myuser-streaming-processor.chutes.ai/process-stream"asyncwith session.post(url, json=payload) as response:
asyncfor line in response.content:
if line:
try:
data = json.loads(line.decode())
print(f"[{data['status']}] {data.get('message', '')}")
# Handle specific result typesif data['status'] == 'sentiment_complete':
print(f"Overall sentiment: {data['overall_sentiment']}")
elif data['status'] == 'summarization_complete':
print(f"Summary: {data['summary']}")
elif data['status'] == 'entities_complete':
print(f"Found {data['total_entities']} entities")
except json.JSONDecodeError:
continue# Run the test
asyncio.run(stream_text_processing())
asyncdefmy_stream() -> AsyncGenerator[dict, None]:
for i inrange(10):
yield {"step": i, "data": f"Processing item {i}"}
await asyncio.sleep(0.1) # Simulate work
2. Progress Tracking
total_items = len(items)
for i, item inenumerate(items):
# Process item
result = await process_item(item)
# Yield progressyield {
"status": "processing",
"progress": (i + 1) / total_items,
"current_item": i + 1,
"total_items": total_items,
"result": result
}
3. Error Handling in Streams
try:
result = await risky_operation()
yield {"status": "success", "result": result}
except Exception as e:
yield {"status": "error", "error": str(e)}
# Continue with other operations if possible
4. Multiple Content Types
# JSON streaming@chute.cord(stream=True, output_content_type="application/json")asyncdefjson_stream(self):
yield {"message": "JSON data"}
# Plain text streaming@chute.cord(stream=True, output_content_type="text/plain")asyncdeftext_stream(self):
yield"Plain text data\n"
Performance Considerations
1. Chunk Size Optimization
# Too small: many HTTP chunks, overhead
chunk_size = 10# Too large: delayed responses, memory usage
chunk_size = 10000# Just right: balance responsiveness and efficiency
chunk_size = 200