Real-time Streaming Responses
This guide covers how to implement real-time streaming responses in Chutes, enabling live data transmission, progressive content delivery, and interactive AI applications.
Overview
Streaming in Chutes provides:
- Real-time Response: Send data as it's generated
- Better UX: Users see progress instead of waiting
- Memory Efficiency: Process large outputs without memory buildup
- Interactive Applications: Enable chat-like experiences
- Scalability: Handle long-running tasks efficiently
- WebSocket Support: Full duplex communication
Basic Streaming Concepts
HTTP Streaming vs WebSockets
from chutes.chute import Chute
from fastapi import Response, WebSocket
from fastapi.responses import StreamingResponse
import asyncio
import json
chute = Chute(username="myuser", name="streaming-demo")
# HTTP Streaming - Server-sent events
@chute.cord(
public_api_path="/stream_text",
method="POST",
stream=True # Enable streaming
)
async def stream_text_generation(self, prompt: str):
"""Stream text generation token by token."""
async def generate_tokens():
"""Generate tokens progressively."""
# Simulate token generation
tokens = ["Hello", " world", "!", " This", " is", " streaming", " text", "."]
for token in tokens:
# Yield each token as it's generated
yield f"data: {json.dumps({'token': token})}\n\n"
await asyncio.sleep(0.1) # Simulate processing time
# Send completion signal
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate_tokens(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
# WebSocket - Full duplex communication
@chute.websocket("/ws")
async def websocket_endpoint(self, websocket: WebSocket):
"""WebSocket endpoint for interactive communication."""
await websocket.accept()
try:
while True:
# Receive message from client
data = await websocket.receive_text()
# Process message
response = await self.process_message(data)
# Send response back
await websocket.send_text(response)
except Exception as e:
print(f"WebSocket error: {e}")
finally:
await websocket.close()
async def process_message(self, message: str) -> str:
"""Process incoming message."""
return f"Echo: {message}"
AI Model Streaming
Streaming LLM Text Generation
from typing import AsyncGenerator, Dict, Any
import time
@chute.on_startup()
async def initialize_streaming_llm(self):
"""Initialize streaming-capable LLM."""
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
model_name = "microsoft/DialoGPT-medium"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)
if torch.cuda.is_available():
self.model = self.model.to("cuda")
# Add padding token if not present
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
async def stream_llm_generation(
self,
prompt: str,
max_tokens: int = 100,
temperature: float = 0.7
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream LLM generation token by token."""
# Tokenize input
inputs = self.tokenizer.encode(prompt, return_tensors="pt")
if torch.cuda.is_available():
inputs = inputs.to("cuda")
# Generation parameters
attention_mask = torch.ones_like(inputs)
generated_tokens = 0
with torch.no_grad():
while generated_tokens < max_tokens:
# Generate next token
outputs = self.model(inputs, attention_mask=attention_mask)
logits = outputs.logits[0, -1, :]
# Apply temperature
if temperature > 0:
logits = logits / temperature
probs = torch.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)
else:
next_token = torch.argmax(logits, keepdim=True)
# Decode token
token_text = self.tokenizer.decode(next_token, skip_special_tokens=True)
# Yield token data
yield {
"token": token_text,
"token_id": next_token.item(),
"generated_tokens": generated_tokens + 1,
"is_complete": False
}
# Update inputs for next iteration
inputs = torch.cat([inputs, next_token.unsqueeze(0)], dim=-1)
attention_mask = torch.cat([attention_mask, torch.ones((1, 1), device=attention_mask.device)], dim=-1)
generated_tokens += 1
# Check for end token
if next_token.item() == self.tokenizer.eos_token_id:
break
# Small delay to prevent overwhelming the client
await asyncio.sleep(0.01)
# Send completion
yield {
"token": "",
"token_id": None,
"generated_tokens": generated_tokens,
"is_complete": True
}
@chute.cord(
public_api_path="/generate_stream",
method="POST",
stream=True
)
async def generate_streaming_text(self, prompt: str, max_tokens: int = 100):
"""Generate streaming text response."""
async def stream_response():
# Send SSE headers
yield "event: start\n"
yield f"data: {json.dumps({'message': 'Starting generation'})}\n\n"
async for token_data in self.stream_llm_generation(prompt, max_tokens):
if token_data["is_complete"]:
yield "event: complete\n"
yield f"data: {json.dumps(token_data)}\n\n"
else:
yield "event: token\n"
yield f"data: {json.dumps(token_data)}\n\n"
return StreamingResponse(
stream_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
Streaming Image Generation
from PIL import Image
import io
import base64
class StreamingImageGenerator:
"""Stream image generation progress."""
def __init__(self, diffusion_model):
self.model = diffusion_model
async def stream_image_generation(
self,
prompt: str,
steps: int = 20
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream image generation progress."""
# Initialize generation
yield {
"step": 0,
"total_steps": steps,
"status": "initializing",
"image": None
}
# Simulate diffusion steps
for step in range(1, steps + 1):
# Process one diffusion step
await asyncio.sleep(0.1) # Simulate processing
# Every few steps, send intermediate image
if step % 5 == 0 or step == steps:
# Generate intermediate or final image
if step == steps:
image = await self._generate_final_image(prompt)
status = "complete"
else:
image = await self._generate_intermediate_image(prompt, step, steps)
status = "processing"
# Convert image to base64
img_buffer = io.BytesIO()
image.save(img_buffer, format='JPEG', quality=85)
img_b64 = base64.b64encode(img_buffer.getvalue()).decode()
yield {
"step": step,
"total_steps": steps,
"status": status,
"image": img_b64,
"progress": step / steps
}
else:
# Send progress update without image
yield {
"step": step,
"total_steps": steps,
"status": "processing",
"image": None,
"progress": step / steps
}
async def _generate_intermediate_image(self, prompt: str, step: int, total_steps: int):
"""Generate intermediate image (placeholder for actual implementation)."""
# This would use your actual diffusion model's intermediate output
# For demo, create a simple placeholder
img = Image.new('RGB', (512, 512), color=f'#{step*10:02x}{step*5:02x}{step*15:02x}')
return img
async def _generate_final_image(self, prompt: str):
"""Generate final high-quality image."""
# This would use your actual diffusion model
img = Image.new('RGB', (512, 512), color='blue')
return img
@chute.cord(
public_api_path="/generate_image_stream",
method="POST",
stream=True
)
async def generate_streaming_image(self, prompt: str, steps: int = 20):
"""Stream image generation with progress updates."""
generator = StreamingImageGenerator(self.diffusion_model)
async def stream_response():
async for update in generator.stream_image_generation(prompt, steps):
yield f"data: {json.dumps(update)}\n\n"
return StreamingResponse(
stream_response(),
media_type="text/event-stream"
)
Advanced Streaming Patterns
Chunked Data Processing
from typing import AsyncIterator
import hashlib
class ChunkedProcessor:
"""Process large datasets in chunks with streaming updates."""
async def process_large_dataset(
self,
data: List[str],
chunk_size: int = 10
) -> AsyncIterator[Dict[str, Any]]:
"""Process data in chunks and stream results."""
total_items = len(data)
processed_items = 0
results = []
# Process in chunks
for i in range(0, total_items, chunk_size):
chunk = data[i:i + chunk_size]
# Process chunk
chunk_results = await self._process_chunk(chunk)
results.extend(chunk_results)
processed_items += len(chunk)
# Yield progress update
yield {
"type": "progress",
"processed": processed_items,
"total": total_items,
"progress": processed_items / total_items,
"chunk_results": chunk_results
}
# Allow other coroutines to run
await asyncio.sleep(0)
# Send final results
yield {
"type": "complete",
"processed": processed_items,
"total": total_items,
"progress": 1.0,
"all_results": results,
"summary": self._generate_summary(results)
}
async def _process_chunk(self, chunk: List[str]) -> List[Dict[str, Any]]:
"""Process a single chunk of data."""
results = []
for item in chunk:
# Simulate processing
await asyncio.sleep(0.01)
result = {
"original": item,
"processed": item.upper(),
"length": len(item),
"hash": hashlib.md5(item.encode()).hexdigest()[:8]
}
results.append(result)
return results
def _generate_summary(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate summary statistics."""
total_length = sum(r["length"] for r in results)
avg_length = total_length / len(results) if results else 0
return {
"total_items": len(results),
"total_length": total_length,
"average_length": avg_length
}
@chute.cord(
public_api_path="/process_stream",
method="POST",
stream=True
)
async def process_data_stream(self, data: List[str], chunk_size: int = 10):
"""Stream large data processing."""
processor = ChunkedProcessor()
async def stream_response():
async for update in processor.process_large_dataset(data, chunk_size):
yield f"data: {json.dumps(update)}\n\n"
return StreamingResponse(
stream_response(),
media_type="text/event-stream"
)
Multi-Model Streaming Pipeline
class StreamingPipeline:
"""Stream processing through multiple AI models."""
def __init__(self):
self.models = {}
async def stream_multi_model_processing(
self,
text: str
) -> AsyncIterator[Dict[str, Any]]:
"""Process text through multiple models with streaming updates."""
pipeline_steps = [
("preprocessing", self._preprocess),
("sentiment", self._analyze_sentiment),
("entities", self._extract_entities),
("summary", self._generate_summary),
("translation", self._translate_text)
]
current_data = {"text": text}
for step_name, step_func in pipeline_steps:
yield {
"step": step_name,
"status": "starting",
"input": current_data
}
try:
# Process step
step_result = await step_func(current_data)
current_data.update(step_result)
yield {
"step": step_name,
"status": "completed",
"result": step_result,
"accumulated_data": current_data
}
except Exception as e:
yield {
"step": step_name,
"status": "error",
"error": str(e)
}
break
# Send final result
yield {
"step": "pipeline_complete",
"status": "completed",
"final_result": current_data
}
async def _preprocess(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Preprocessing step."""
await asyncio.sleep(0.1)
return {
"cleaned_text": data["text"].strip().lower(),
"word_count": len(data["text"].split())
}
async def _analyze_sentiment(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Sentiment analysis step."""
await asyncio.sleep(0.2)
# Simulate sentiment analysis
return {
"sentiment": "positive",
"sentiment_score": 0.8
}
async def _extract_entities(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Entity extraction step."""
await asyncio.sleep(0.15)
return {
"entities": [
{"text": "example", "type": "MISC", "confidence": 0.9}
]
}
async def _generate_summary(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Text summarization step."""
await asyncio.sleep(0.3)
return {
"summary": f"Summary of: {data['text'][:50]}..."
}
async def _translate_text(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Translation step."""
await asyncio.sleep(0.25)
return {
"translated_text": f"Translated: {data['text']}"
}
@chute.cord(
public_api_path="/pipeline_stream",
method="POST",
stream=True
)
async def stream_pipeline_processing(self, text: str):
"""Stream multi-model pipeline processing."""
pipeline = StreamingPipeline()
async def stream_response():
async for update in pipeline.stream_multi_model_processing(text):
yield f"data: {json.dumps(update)}\n\n"
return StreamingResponse(
stream_response(),
media_type="text/event-stream"
)
WebSocket Applications
Interactive Chat Application
from typing import Dict, Set
import uuid
class ChatManager:
"""Manage WebSocket chat sessions."""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.chat_sessions: Dict[str, Dict] = {}
async def connect(self, websocket: WebSocket, session_id: str = None):
"""Connect a new WebSocket client."""
await websocket.accept()
if session_id is None:
session_id = str(uuid.uuid4())
self.active_connections[session_id] = websocket
self.chat_sessions[session_id] = {
"messages": [],
"connected_at": time.time()
}
# Send welcome message
await self.send_message(session_id, {
"type": "system",
"message": f"Connected to chat session {session_id}",
"session_id": session_id
})
return session_id
async def disconnect(self, session_id: str):
"""Disconnect a WebSocket client."""
if session_id in self.active_connections:
del self.active_connections[session_id]
if session_id in self.chat_sessions:
del self.chat_sessions[session_id]
async def send_message(self, session_id: str, message: Dict):
"""Send message to specific session."""
if session_id in self.active_connections:
websocket = self.active_connections[session_id]
await websocket.send_text(json.dumps(message))
async def broadcast_message(self, message: Dict, exclude_session: str = None):
"""Broadcast message to all connected sessions."""
for session_id, websocket in self.active_connections.items():
if session_id != exclude_session:
try:
await websocket.send_text(json.dumps(message))
except:
# Connection may be closed
pass
@chute.on_startup()
async def initialize_chat(self):
"""Initialize chat manager."""
self.chat_manager = ChatManager()
@chute.websocket("/chat")
async def chat_websocket(self, websocket: WebSocket, session_id: str = None):
"""WebSocket endpoint for interactive chat."""
session_id = await self.chat_manager.connect(websocket, session_id)
try:
while True:
# Receive message
data = await websocket.receive_text()
message_data = json.loads(data)
# Process based on message type
if message_data.get("type") == "user_message":
await self._handle_user_message(session_id, message_data)
elif message_data.get("type") == "typing":
await self._handle_typing_indicator(session_id, message_data)
elif message_data.get("type") == "ping":
await self._handle_ping(session_id)
except Exception as e:
print(f"Chat error for session {session_id}: {e}")
finally:
await self.chat_manager.disconnect(session_id)
async def _handle_user_message(self, session_id: str, message_data: Dict):
"""Handle user message and generate AI response."""
user_message = message_data.get("message", "")
# Store user message
self.chat_manager.chat_sessions[session_id]["messages"].append({
"role": "user",
"content": user_message,
"timestamp": time.time()
})
# Send typing indicator
await self.chat_manager.send_message(session_id, {
"type": "ai_typing",
"typing": True
})
# Generate streaming AI response
ai_response = ""
async for token_data in self.stream_llm_generation(user_message):
if not token_data["is_complete"]:
ai_response += token_data["token"]
# Send partial response
await self.chat_manager.send_message(session_id, {
"type": "ai_message_partial",
"content": ai_response,
"token": token_data["token"]
})
else:
# Send complete response
await self.chat_manager.send_message(session_id, {
"type": "ai_message_complete",
"content": ai_response
})
# Store AI message
self.chat_manager.chat_sessions[session_id]["messages"].append({
"role": "assistant",
"content": ai_response,
"timestamp": time.time()
})
async def _handle_typing_indicator(self, session_id: str, message_data: Dict):
"""Handle typing indicator."""
typing = message_data.get("typing", False)
# Broadcast typing status to other users (if multi-user chat)
await self.chat_manager.broadcast_message({
"type": "user_typing",
"session_id": session_id,
"typing": typing
}, exclude_session=session_id)
async def _handle_ping(self, session_id: str):
"""Handle ping for connection keepalive."""
await self.chat_manager.send_message(session_id, {
"type": "pong",
"timestamp": time.time()
})
Real-time Collaboration
class CollaborativeEditor:
"""Real-time collaborative document editing."""
def __init__(self):
self.documents: Dict[str, Dict] = {}
self.subscribers: Dict[str, Set[str]] = {} # doc_id -> set of session_ids
self.session_connections: Dict[str, WebSocket] = {}
async def join_document(self, doc_id: str, session_id: str, websocket: WebSocket):
"""Join a collaborative document."""
# Initialize document if doesn't exist
if doc_id not in self.documents:
self.documents[doc_id] = {
"content": "",
"version": 0,
"last_modified": time.time()
}
self.subscribers[doc_id] = set()
# Add subscriber
self.subscribers[doc_id].add(session_id)
self.session_connections[session_id] = websocket
# Send current document state
await websocket.send_text(json.dumps({
"type": "document_state",
"doc_id": doc_id,
"content": self.documents[doc_id]["content"],
"version": self.documents[doc_id]["version"]
}))
# Notify other users
await self._broadcast_to_document(doc_id, {
"type": "user_joined",
"session_id": session_id
}, exclude_session=session_id)
async def leave_document(self, doc_id: str, session_id: str):
"""Leave a collaborative document."""
if doc_id in self.subscribers:
self.subscribers[doc_id].discard(session_id)
if session_id in self.session_connections:
del self.session_connections[session_id]
# Notify other users
await self._broadcast_to_document(doc_id, {
"type": "user_left",
"session_id": session_id
}, exclude_session=session_id)
async def apply_operation(self, doc_id: str, session_id: str, operation: Dict):
"""Apply an edit operation to the document."""
if doc_id not in self.documents:
return
doc = self.documents[doc_id]
# Apply operation (simplified - real implementation would use OT)
if operation["type"] == "insert":
pos = operation["position"]
text = operation["text"]
content = doc["content"]
doc["content"] = content[:pos] + text + content[pos:]
elif operation["type"] == "delete":
start = operation["start"]
length = operation["length"]
content = doc["content"]
doc["content"] = content[:start] + content[start + length:]
# Update version
doc["version"] += 1
doc["last_modified"] = time.time()
# Broadcast operation to other users
await self._broadcast_to_document(doc_id, {
"type": "operation",
"operation": operation,
"version": doc["version"],
"author": session_id
}, exclude_session=session_id)
async def _broadcast_to_document(self, doc_id: str, message: Dict, exclude_session: str = None):
"""Broadcast message to all document subscribers."""
if doc_id not in self.subscribers:
return
for session_id in self.subscribers[doc_id]:
if session_id != exclude_session and session_id in self.session_connections:
try:
websocket = self.session_connections[session_id]
await websocket.send_text(json.dumps(message))
except:
# Connection may be closed
pass
@chute.websocket("/collaborate/{doc_id}")
async def collaborative_editing(self, websocket: WebSocket, doc_id: str):
"""WebSocket endpoint for collaborative editing."""
session_id = str(uuid.uuid4())
editor = getattr(self, 'collaborative_editor', None)
if editor is None:
self.collaborative_editor = CollaborativeEditor()
editor = self.collaborative_editor
await websocket.accept()
await editor.join_document(doc_id, session_id, websocket)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "operation":
await editor.apply_operation(doc_id, session_id, message["operation"])
elif message["type"] == "cursor_position":
# Broadcast cursor position to other users
await editor._broadcast_to_document(doc_id, {
"type": "cursor_update",
"session_id": session_id,
"position": message["position"]
}, exclude_session=session_id)
except Exception as e:
print(f"Collaboration error: {e}")
finally:
await editor.leave_document(doc_id, session_id)
Performance and Optimization
Streaming Buffer Management
import asyncio
from collections import deque
class StreamingBuffer:
"""Manage streaming data with buffering and backpressure handling."""
def __init__(self, max_buffer_size: int = 1000):
self.buffer = deque(maxlen=max_buffer_size)
self.consumers = set()
self.producer_task = None
self.is_producing = False
async def start_producing(self, producer_func):
"""Start producing data."""
if self.is_producing:
return
self.is_producing = True
self.producer_task = asyncio.create_task(self._produce_data(producer_func))
async def stop_producing(self):
"""Stop producing data."""
self.is_producing = False
if self.producer_task:
self.producer_task.cancel()
try:
await self.producer_task
except asyncio.CancelledError:
pass
async def _produce_data(self, producer_func):
"""Internal producer loop."""
try:
async for data in producer_func():
self.buffer.append(data)
# Notify consumers
await self._notify_consumers(data)
# Backpressure handling
if len(self.buffer) >= self.buffer.maxlen * 0.8:
await asyncio.sleep(0.01) # Slow down production
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Producer error: {e}")
finally:
self.is_producing = False
async def _notify_consumers(self, data):
"""Notify all consumers of new data."""
dead_consumers = set()
for consumer in self.consumers:
try:
await consumer.put_nowait(data)
except:
dead_consumers.add(consumer)
# Remove dead consumers
self.consumers -= dead_consumers
async def subscribe(self) -> asyncio.Queue:
"""Subscribe to the stream."""
consumer_queue = asyncio.Queue(maxsize=100)
self.consumers.add(consumer_queue)
# Send buffered data to new consumer
for data in self.buffer:
await consumer_queue.put(data)
return consumer_queue
def unsubscribe(self, consumer_queue: asyncio.Queue):
"""Unsubscribe from the stream."""
self.consumers.discard(consumer_queue)
# Usage in streaming endpoint
@chute.on_startup()
async def initialize_streaming_buffer(self):
"""Initialize streaming buffer."""
self.streaming_buffer = StreamingBuffer(max_buffer_size=500)
@chute.cord(
public_api_path="/buffered_stream",
method="GET",
stream=True
)
async def buffered_streaming_endpoint(self):
"""Stream with buffering and backpressure handling."""
# Start producing if not already started
if not self.streaming_buffer.is_producing:
await self.streaming_buffer.start_producing(self._data_producer)
# Subscribe to stream
consumer_queue = await self.streaming_buffer.subscribe()
async def stream_response():
try:
while True:
# Get data from buffer
data = await asyncio.wait_for(consumer_queue.get(), timeout=30.0)
yield f"data: {json.dumps(data)}\n\n"
except asyncio.TimeoutError:
yield "event: timeout\ndata: {}\n\n"
except Exception as e:
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
finally:
self.streaming_buffer.unsubscribe(consumer_queue)
return StreamingResponse(
stream_response(),
media_type="text/event-stream"
)
async def _data_producer(self):
"""Example data producer."""
counter = 0
while True:
yield {
"timestamp": time.time(),
"counter": counter,
"data": f"Generated data {counter}"
}
counter += 1
await asyncio.sleep(0.1)
Connection Management
class ConnectionManager:
"""Manage WebSocket connections with health monitoring."""
def __init__(self):
self.connections: Dict[str, Dict] = {}
self.monitoring_task = None
async def start_monitoring(self):
"""Start connection health monitoring."""
if self.monitoring_task is None:
self.monitoring_task = asyncio.create_task(self._monitor_connections())
async def stop_monitoring(self):
"""Stop connection monitoring."""
if self.monitoring_task:
self.monitoring_task.cancel()
try:
await self.monitoring_task
except asyncio.CancelledError:
pass
self.monitoring_task = None
async def add_connection(self, session_id: str, websocket: WebSocket):
"""Add a new WebSocket connection."""
self.connections[session_id] = {
"websocket": websocket,
"connected_at": time.time(),
"last_ping": time.time(),
"is_alive": True
}
# Start monitoring if first connection
if len(self.connections) == 1:
await self.start_monitoring()
async def remove_connection(self, session_id: str):
"""Remove a WebSocket connection."""
if session_id in self.connections:
del self.connections[session_id]
# Stop monitoring if no connections
if len(self.connections) == 0:
await self.stop_monitoring()
async def send_to_connection(self, session_id: str, message: Dict) -> bool:
"""Send message to specific connection."""
if session_id not in self.connections:
return False
try:
websocket = self.connections[session_id]["websocket"]
await websocket.send_text(json.dumps(message))
return True
except:
# Mark connection as dead
self.connections[session_id]["is_alive"] = False
return False
async def broadcast(self, message: Dict, exclude: Set[str] = None):
"""Broadcast message to all connections."""
if exclude is None:
exclude = set()
dead_connections = []
for session_id, conn_info in self.connections.items():
if session_id not in exclude and conn_info["is_alive"]:
success = await self.send_to_connection(session_id, message)
if not success:
dead_connections.append(session_id)
# Clean up dead connections
for session_id in dead_connections:
await self.remove_connection(session_id)
async def _monitor_connections(self):
"""Monitor connection health."""
try:
while True:
await asyncio.sleep(30) # Check every 30 seconds
current_time = time.time()
dead_connections = []
for session_id, conn_info in self.connections.items():
# Check if connection is stale
if current_time - conn_info["last_ping"] > 60: # 1 minute timeout
dead_connections.append(session_id)
continue
# Send ping
success = await self.send_to_connection(session_id, {
"type": "ping",
"timestamp": current_time
})
if success:
conn_info["last_ping"] = current_time
else:
dead_connections.append(session_id)
# Clean up dead connections
for session_id in dead_connections:
await self.remove_connection(session_id)
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Connection monitoring error: {e}")
Client-Side Integration
JavaScript/TypeScript Client
class ChutesStreamingClient {
constructor(baseUrl) {
this.baseUrl = baseUrl;
this.eventSource = null;
this.websocket = null;
}
// HTTP Streaming (Server-Sent Events)
streamHTTP(endpoint, options = {}) {
return new Promise((resolve, reject) => {
const url = `${this.baseUrl}${endpoint}`;
this.eventSource = new EventSource(url);
const results = [];
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
results.push(data);
// Call progress callback if provided
if (options.onProgress) {
options.onProgress(data);
}
// Check for completion
if (data.done || data.is_complete) {
this.eventSource.close();
resolve(results);
}
} catch (e) {
console.error('Failed to parse SSE data:', e);
}
};
this.eventSource.onerror = (error) => {
this.eventSource.close();
reject(error);
};
});
}
// WebSocket Streaming
async connectWebSocket(endpoint) {
return new Promise((resolve, reject) => {
const wsUrl = `ws${
this.baseUrl.startsWith('https') ? 's' : ''
}://${this.baseUrl.replace(/^https?:\/\//, '')}${endpoint}`;
this.websocket = new WebSocket(wsUrl);
this.websocket.onopen = () => {
resolve(this);
};
this.websocket.onerror = (error) => {
reject(error);
};
this.websocket.onclose = () => {
console.log('WebSocket connection closed');
};
});
}
// Send message via WebSocket
sendMessage(message) {
if (this.websocket && this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send(JSON.stringify(message));
}
}
// Set message handler
onMessage(handler) {
if (this.websocket) {
this.websocket.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
handler(data);
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
};
}
}
// Clean up connections
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.websocket) {
this.websocket.close();
this.websocket = null;
}
}
}
// Usage examples
const client = new ChutesStreamingClient('https://myuser-my-chute.chutes.ai');
// HTTP Streaming example
client
.streamHTTP('/generate_stream', {
onProgress: (data) => {
console.log('Received token:', data.token);
// Update UI with streaming content
document.getElementById('output').textContent += data.token;
}
})
.then((results) => {
console.log('Streaming complete:', results);
});
// WebSocket example
client.connectWebSocket('/chat').then(() => {
client.onMessage((data) => {
if (data.type === 'ai_message_partial') {
// Update chat interface with partial message
updateChatInterface(data.content);
}
});
// Send a message
client.sendMessage({
type: 'user_message',
message: 'Hello, AI!'
});
});
Python Client
import asyncio
import aiohttp
import json
from typing import AsyncIterator, Callable, Optional
class ChutesAsyncClient:
"""Async Python client for Chutes streaming APIs."""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def stream_http(
self,
endpoint: str,
method: str = 'GET',
data: dict = None,
progress_callback: Callable = None
) -> AsyncIterator[dict]:
"""Stream data via HTTP Server-Sent Events."""
url = f"{self.base_url}{endpoint}"
async with self.session.request(
method,
url,
json=data,
headers={'Accept': 'text/event-stream'}
) as response:
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
try:
data_str = line_str[6:] # Remove 'data: ' prefix
data_obj = json.loads(data_str)
if progress_callback:
progress_callback(data_obj)
yield data_obj
except json.JSONDecodeError:
continue
async def connect_websocket(
self,
endpoint: str,
message_handler: Callable = None
):
"""Connect to WebSocket endpoint."""
ws_url = f"ws{self.base_url[4:]}{endpoint}"
async with self.session.ws_connect(ws_url) as ws:
self.websocket = ws
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
if message_handler:
await message_handler(data)
yield data
except json.JSONDecodeError:
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
break
async def send_websocket_message(self, message: dict):
"""Send message via WebSocket."""
if hasattr(self, 'websocket'):
await self.websocket.send_str(json.dumps(message))
# Usage example
async def example_usage():
async with ChutesAsyncClient('https://myuser-my-chute.chutes.ai') as client:
# HTTP Streaming
async for token_data in client.stream_http(
'/generate_stream',
method='POST',
data={'prompt': 'Tell me a story'},
progress_callback=lambda x: print(f"Token: {x.get('token', '')}")
):
if token_data.get('is_complete'):
print("Generation complete!")
break
# WebSocket example
async for message in client.connect_websocket(
'/chat',
message_handler=lambda msg: print(f"Received: {msg}")
):
if message.get('type') == 'system':
# Send a message
await client.send_websocket_message({
'type': 'user_message',
'message': 'Hello from Python client!'
})
# Run the example
# asyncio.run(example_usage())
Best Practices and Troubleshooting
Error Handling in Streams
class StreamErrorHandler:
"""Handle errors in streaming applications."""
@staticmethod
async def safe_stream_wrapper(stream_func, error_callback=None):
"""Wrap streaming function with error handling."""
try:
async for item in stream_func():
yield item
except asyncio.CancelledError:
yield {"type": "error", "error": "Stream cancelled"}
except Exception as e:
error_msg = {
"type": "error",
"error": str(e),
"error_type": type(e).__name__
}
if error_callback:
await error_callback(error_msg)
yield error_msg
@staticmethod
async def retry_stream(stream_func, max_retries=3, delay=1.0):
"""Retry streaming function on failure."""
for attempt in range(max_retries):
try:
async for item in stream_func():
yield item
return # Success, exit retry loop
except Exception as e:
if attempt == max_retries - 1:
yield {
"type": "error",
"error": f"Failed after {max_retries} attempts: {str(e)}"
}
return
yield {
"type": "retry",
"attempt": attempt + 1,
"max_retries": max_retries,
"error": str(e)
}
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
# Usage
@chute.cord(public_api_path="/safe_stream", method="POST", stream=True)
async def safe_streaming_endpoint(self, prompt: str):
"""Streaming endpoint with error handling."""
async def stream_with_errors():
error_handler = StreamErrorHandler()
async for item in error_handler.safe_stream_wrapper(
lambda: self.stream_llm_generation(prompt),
error_callback=lambda err: self.log_error(err)
):
yield f"data: {json.dumps(item)}\n\n"
return StreamingResponse(
stream_with_errors(),
media_type="text/event-stream"
)
Performance Monitoring
class StreamingMetrics:
"""Monitor streaming performance."""
def __init__(self):
self.active_streams = 0
self.total_streams = 0
self.avg_stream_duration = 0
self.stream_start_times = {}
def start_stream(self, stream_id: str):
"""Record stream start."""
self.active_streams += 1
self.total_streams += 1
self.stream_start_times[stream_id] = time.time()
def end_stream(self, stream_id: str):
"""Record stream end."""
self.active_streams = max(0, self.active_streams - 1)
if stream_id in self.stream_start_times:
duration = time.time() - self.stream_start_times[stream_id]
self.avg_stream_duration = (
(self.avg_stream_duration * (self.total_streams - 1) + duration) /
self.total_streams
)
del self.stream_start_times[stream_id]
def get_metrics(self) -> dict:
"""Get current metrics."""
return {
"active_streams": self.active_streams,
"total_streams": self.total_streams,
"avg_duration": self.avg_stream_duration,
"current_streams": list(self.stream_start_times.keys())
}
@chute.on_startup()
async def initialize_metrics(self):
"""Initialize streaming metrics."""
self.streaming_metrics = StreamingMetrics()
@chute.cord(public_api_path="/metrics", method="GET")
async def get_streaming_metrics(self):
"""Get streaming performance metrics."""
return self.streaming_metrics.get_metrics()
Next Steps
- Advanced Protocols: Implement WebRTC for peer-to-peer streaming
- Scale Optimization: Handle thousands of concurrent streams
- Security: Implement authentication and rate limiting for streams
- Integration: Connect with real-time databases and message queues
For more advanced topics, see: