This guide demonstrates building a comprehensive text analysis service that combines multiple AI models for sentiment analysis, entity recognition, text classification, and content moderation.
Overview
This complete example showcases:
Multi-model Architecture: Combining different AI models in a single service
Sentiment Analysis: Understanding emotional tone of text
Named Entity Recognition: Extracting people, places, organizations
Text Classification: Categorizing content by topic or intent
Content Moderation: Detecting inappropriate or harmful content
Create a comprehensive service that orchestrates multiple AI models:
import asyncio
import json
import time
from typing importDict, List, Any, Optionalfrom dataclasses import dataclass
from datetime import datetime
import logging
import torch
import spacy
import redis
from transformers import (
AutoTokenizer, AutoModelForSequenceClassification,
pipeline
)
from prometheus_client import Counter, Histogram, start_http_server
import numpy as np
# Metrics
REQUEST_COUNT = Counter('analysis_requests_total', 'Total analysis requests', ['type'])
REQUEST_DURATION = Histogram('analysis_duration_seconds', 'Request duration', ['type'])
ERROR_COUNT = Counter('analysis_errors_total', 'Total errors', ['type', 'error'])
@dataclassclassAnalysisResult:
text_id: Optional[str]
sentiment: Optional[Dict[str, Any]] = None
entities: Optional[List[Dict[str, Any]]] = None
classification: Optional[Dict[str, Any]] = None
moderation: Optional[Dict[str, Any]] = None
processing_time_ms: Optional[float] = None
metadata: Optional[Dict[str, Any]] = NoneclassTextAnalysisService:
def__init__(self, cache_enabled: bool = True):
self.logger = logging.getLogger(__name__)
self.cache_enabled = cache_enabled
# Initialize Redis cacheif cache_enabled:
try:
self.cache = redis.Redis(host='localhost', port=6379, db=0)
self.cache.ping()
self.logger.info("Cache connection established")
except Exception as e:
self.logger.warning(f"Cache disabled: {e}")
self.cache_enabled = False# Load modelsself._load_models()
# Start metrics server
start_http_server(8001)
self.logger.info("Metrics server started on port 8001")
def_load_models(self):
"""Load all AI models with proper error handling"""self.logger.info("Loading AI models...")
try:
# Sentiment Analysis Modelself.sentiment_tokenizer = AutoTokenizer.from_pretrained(
"cardiffnlp/twitter-roberta-base-sentiment-latest"
)
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(
"cardiffnlp/twitter-roberta-base-sentiment-latest"
)
self.logger.info("✓ Sentiment model loaded")
# Text Classification Modelself.classifier = pipeline(
"text-classification",
model="facebook/bart-large-mnli",
device=0if torch.cuda.is_available() else -1
)
self.logger.info("✓ Classification model loaded")
# Content Moderation Modelself.moderation_pipeline = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=0if torch.cuda.is_available() else -1
)
self.logger.info("✓ Moderation model loaded")
# Named Entity Recognitionself.nlp = spacy.load("en_core_web_lg")
self.logger.info("✓ NER model loaded")
except Exception as e:
self.logger.error(f"Failed to load models: {e}")
raisedef_get_cache_key(self, text: str, analysis_type: str) -> str:
"""Generate cache key for text and analysis type"""import hashlib
text_hash = hashlib.md5(text.encode()).hexdigest()
returnf"analysis:{analysis_type}:{text_hash}"def_get_cached_result(self, cache_key: str) -> Optional[Dict]:
"""Retrieve cached analysis result"""ifnotself.cache_enabled:
returnNonetry:
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
except Exception as e:
self.logger.warning(f"Cache read error: {e}")
returnNonedef_cache_result(self, cache_key: str, result: Dict, ttl: int = 3600):
"""Cache analysis result with TTL"""ifnotself.cache_enabled:
returntry:
self.cache.setex(
cache_key,
ttl,
json.dumps(result, default=str)
)
except Exception as e:
self.logger.warning(f"Cache write error: {e}")
asyncdefanalyze_sentiment(self, text: str) -> Dict[str, Any]:
"""Perform sentiment analysis with caching"""
cache_key = self._get_cache_key(text, "sentiment")
cached = self._get_cached_result(cache_key)
if cached:
return cached
with REQUEST_DURATION.labels(type='sentiment').time():
try:
inputs = self.sentiment_tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = self.sentiment_model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
labels = ['negative', 'neutral', 'positive']
scores = predictions[0].tolist()
result = {
'label': labels[np.argmax(scores)],
'confidence': float(max(scores)),
'scores': {label: float(score) for label, score inzip(labels, scores)}
}
self._cache_result(cache_key, result)
REQUEST_COUNT.labels(type='sentiment').inc()
return result
except Exception as e:
ERROR_COUNT.labels(type='sentiment', error=type(e).__name__).inc()
raise Exception(f"Sentiment analysis failed: {e}")
asyncdefextract_entities(self, text: str) -> List[Dict[str, Any]]:
"""Extract named entities with caching"""
cache_key = self._get_cache_key(text, "entities")
cached = self._get_cached_result(cache_key)
if cached:
return cached
with REQUEST_DURATION.labels(type='entities').time():
try:
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entities.append({
'text': ent.text,
'label': ent.label_,
'description': spacy.explain(ent.label_),
'start': ent.start_char,
'end': ent.end_char,
'confidence': float(ent.kb_id_) if ent.kb_id_ else0.9
})
self._cache_result(cache_key, entities)
REQUEST_COUNT.labels(type='entities').inc()
return entities
except Exception as e:
ERROR_COUNT.labels(type='entities', error=type(e).__name__).inc()
raise Exception(f"Entity extraction failed: {e}")
asyncdefclassify_text(self, text: str, categories: List[str] = None) -> Dict[str, Any]:
"""Classify text into categories"""if categories isNone:
categories = [
"technology", "business", "health", "sports",
"entertainment", "politics", "science", "education"
]
cache_key = self._get_cache_key(f"{text}:{','.join(categories)}", "classification")
cached = self._get_cached_result(cache_key)
if cached:
return cached
with REQUEST_DURATION.labels(type='classification').time():
try:
# Use zero-shot classification
candidate_labels = categories
result = self.classifier(text, candidate_labels)
classification_result = {
'predicted_category': result['labels'][0],
'confidence': float(result['scores'][0]),
'all_scores': {
label: float(score)
for label, score inzip(result['labels'], result['scores'])
}
}
self._cache_result(cache_key, classification_result)
REQUEST_COUNT.labels(type='classification').inc()
return classification_result
except Exception as e:
ERROR_COUNT.labels(type='classification', error=type(e).__name__).inc()
raise Exception(f"Text classification failed: {e}")
asyncdefmoderate_content(self, text: str) -> Dict[str, Any]:
"""Detect inappropriate content"""
cache_key = self._get_cache_key(text, "moderation")
cached = self._get_cached_result(cache_key)
if cached:
return cached
with REQUEST_DURATION.labels(type='moderation').time():
try:
result = self.moderation_pipeline(text)
# Process toxicity detection result
is_toxic = any(item['label'] == 'TOXIC'and item['score'] > 0.7for item in result)
max_toxicity_score = max((item['score'] for item in result if item['label'] == 'TOXIC'), default=0.0)
moderation_result = {
'is_inappropriate': is_toxic,
'toxicity_score': float(max_toxicity_score),
'categories': result,
'action_required': is_toxic
}
self._cache_result(cache_key, moderation_result)
REQUEST_COUNT.labels(type='moderation').inc()
return moderation_result
except Exception as e:
ERROR_COUNT.labels(type='moderation', error=type(e).__name__).inc()
raise Exception(f"Content moderation failed: {e}")
asyncdefanalyze_single_text(
self,
text_input: TextInput,
analysis_types: List[AnalysisType],
include_confidence: bool = True) -> AnalysisResult:
"""Analyze a single text with specified analysis types"""
start_time = time.time()
result = AnalysisResult(text_id=text_input.id, metadata=text_input.metadata)
try:
# Determine which analyses to run
run_all = AnalysisType.ALL in analysis_types
tasks = []
if run_all or AnalysisType.SENTIMENT in analysis_types:
tasks.append(("sentiment", self.analyze_sentiment(text_input.text)))
if run_all or AnalysisType.ENTITIES in analysis_types:
tasks.append(("entities", self.extract_entities(text_input.text)))
if run_all or AnalysisType.CLASSIFICATION in analysis_types:
tasks.append(("classification", self.classify_text(text_input.text)))
if run_all or AnalysisType.MODERATION in analysis_types:
tasks.append(("moderation", self.moderate_content(text_input.text)))
# Run analyses concurrentlyif tasks:
task_names, task_coroutines = zip(*tasks)
results = await asyncio.gather(*task_coroutines, return_exceptions=True)
for name, task_result inzip(task_names, results):
ifisinstance(task_result, Exception):
self.logger.error(f"Analysis {name} failed: {task_result}")
else:
setattr(result, name, task_result)
result.processing_time_ms = (time.time() - start_time) * 1000return result
except Exception as e:
self.logger.error(f"Text analysis failed: {e}")
result.processing_time_ms = (time.time() - start_time) * 1000raise Exception(f"Analysis failed: {e}")
asyncdefanalyze_batch(self, inputs: InputArgs) -> List[AnalysisResult]:
"""Analyze multiple texts concurrently"""self.logger.info(f"Processing batch of {len(inputs.texts)} texts")
# Process texts concurrently with controlled concurrency
semaphore = asyncio.Semaphore(10) # Limit concurrent analysesasyncdefanalyze_with_semaphore(text_input):
asyncwith semaphore:
returnawaitself.analyze_single_text(
text_input,
inputs.analysis_types,
inputs.include_confidence
)
tasks = [analyze_with_semaphore(text_input) for text_input in inputs.texts]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Convert exceptions to error results
final_results = []
for i, result inenumerate(results):
ifisinstance(result, Exception):
error_result = AnalysisResult(
text_id=inputs.texts[i].id,
metadata={"error": str(result)}
)
final_results.append(error_result)
else:
final_results.append(result)
return final_results
# Global service instance
service = Nonedefget_service() -> TextAnalysisService:
"""Get or create the global service instance"""global service
if service isNone:
service = TextAnalysisService()
return service
asyncdefrun(inputs: InputArgs) -> List[Dict[str, Any]]:
"""Main entry point for the chute"""
analysis_service = get_service()
try:
results = await analysis_service.analyze_batch(inputs)
# Convert results to serializable format
output = []
for result in results:
result_dict = {
'text_id': result.text_id,
'processing_time_ms': result.processing_time_ms,
'metadata': result.metadata
}
if result.sentiment:
result_dict['sentiment'] = result.sentiment
if result.entities:
result_dict['entities'] = result.entities
if result.classification:
result_dict['classification'] = result.classification
if result.moderation:
result_dict['moderation'] = result.moderation
output.append(result_dict)
return output
except Exception as e:
logging.error(f"Batch processing failed: {e}")
raise Exception(f"Analysis service error: {e}")
Creating the Complete Chute
Deploy the comprehensive text analysis service:
from chutes.chute import Chute, NodeSelector
# Create the complete text analysis chute
chute = Chute(
username="myuser",
name="text-analysis-complete",
image=image,
entry_file="analysis_service.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=16),
timeout_seconds=300,
concurrency=5
)
# Deploy the serviceprint("Deploying comprehensive text analysis service...")
result = chute.deploy()
print(f"✅ Service deployed: {result}")
Usage Examples
Basic Text Analysis
# Analyze a single text with all models
response = chute.run({
"texts": [
{
"text": "I absolutely love this new AI technology! It's revolutionary and will change everything.",
"id": "text_1"
}
],
"analysis_types": ["all"],
"include_confidence": True
})
# Response includes all analysis types
result = response[0]
print(f"Sentiment: {result['sentiment']['label']} ({result['sentiment']['confidence']:.2f})")
print(f"Category: {result['classification']['predicted_category']}")
print(f"Entities: {[ent['text'] for ent in result['entities']]}")
print(f"Content Safe: {not result['moderation']['is_inappropriate']}")
Batch Processing
# Analyze multiple texts efficiently
texts = [
{"text": "This product is amazing!", "id": "review_1"},
{"text": "The service was terrible and slow.", "id": "review_2"},
{"text": "Apple Inc. reported strong quarterly earnings.", "id": "news_1"},
{"text": "The new iPhone features advanced AI capabilities.", "id": "tech_1"}
]
response = chute.run({
"texts": texts,
"analysis_types": ["sentiment", "entities", "classification"],
"include_confidence": True
})
# Process resultsfor result in response:
print(f"\nText ID: {result['text_id']}")
print(f"Processing time: {result['processing_time_ms']:.2f}ms")
if'sentiment'in result:
print(f"Sentiment: {result['sentiment']['label']}")
if'entities'in result:
print(f"Entities: {[ent['text'] for ent in result['entities']]}")
Selective Analysis
# Run only specific analysis types
response = chute.run({
"texts": [
{"text": "Breaking: Tech giant announces major acquisition", "id": "headline_1"}
],
"analysis_types": ["entities", "classification"], # Only NER and classification"include_confidence": True
})
Content Moderation Focus
# Focus on content safety
user_comments = [
{"text": "This is a great discussion!", "id": "comment_1"},
{"text": "I disagree but respect your opinion.", "id": "comment_2"},
{"text": "This platform needs better moderation.", "id": "comment_3"}
]
response = chute.run({
"texts": user_comments,
"analysis_types": ["moderation", "sentiment"],
"include_confidence": True
})
# Filter inappropriate content
safe_comments = [
result for result in response
ifnot result['moderation']['is_inappropriate']
]
Performance Optimization
Caching Strategy
The service implements intelligent caching:
Redis-based caching for repeated text analyses
1-hour TTL for cached results
Cache keys based on text content and analysis type
Error isolation prevents single failures from affecting the batch
Resource Management
# Optimized node selection for production
chute = Chute(
username="myuser",
name="text-analysis-production",
image=image,
entry_file="analysis_service.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=24, # Larger VRAM for complex models# More RAM for caching
preferred_provider="runpod"# Specify provider if needed
),
timeout_seconds=600, # Longer timeout for large batches
concurrency=10# Higher concurrency for production
)
Monitoring and Observability
Built-in Metrics
The service exposes Prometheus metrics on port 8001:
analysis_requests_total - Total requests by analysis type
# Language detection and processingfrom langdetect import detect
asyncdefanalyze_multilingual_text(self, text: str, language: str = None) -> Dict:
"""Analyze text with language-specific models"""# Auto-detect language if not providedif language isNone:
language = detect(text)
# Load language-specific modelsif language == "es":
nlp = spacy.load("es_core_news_sm")
elif language == "fr":
nlp = spacy.load("fr_core_news_sm")
else:
nlp = self.nlp # Default English model# Process with appropriate model
doc = nlp(text)
returnself._extract_entities_from_doc(doc)
# Cost-optimized configuration for development
dev_chute = Chute(
username="myuser",
name="text-analysis-dev",
image=image,
entry_file="analysis_service.py",
entry_point="run",
node_selector=NodeSelector(
gpu_count=1,
min_vram_gb_per_gpu=8),
timeout_seconds=300,
concurrency=3,
auto_scale=False
)
This comprehensive example demonstrates how to build a production-ready text analysis service that combines multiple AI models, implements proper error handling, includes monitoring and caching, and provides a robust API for various text analysis tasks.