Published on: December 10, 2023 · Python · X API v2 · TextBlob · Pandas
Built a sentiment analysis pipeline on live Twitter/X data using the v2 API, with custom text preprocessing, sentiment scoring, and trend detection across keyword-defined topics.
The X (formerly Twitter) API v2 provides access to recent and filtered tweet streams. This project uses the filtered stream endpoint to collect tweets matching specific keywords in real time, runs preprocessing to clean the raw text, and applies TextBlob for sentiment scoring along with custom rule-based enhancements for domain-specific language.
Authentication uses OAuth 2.0 Bearer Token. The filtered stream endpoint lets you define rules (keyword filters) that the API applies server-side before streaming results.
import requests
import os
BEARER_TOKEN = os.environ.get("TWITTER_BEARER_TOKEN")
def bearer_oauth(r):
r.headers["Authorization"] = f"Bearer {BEARER_TOKEN}"
r.headers["User-Agent"] = "v2FilteredStreamPython"
return r
def add_rules(rules):
payload = {"add": rules}
response = requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
auth=bearer_oauth,
json=payload,
)
return response.json()
# Define keyword filters
rules = [
{"value": "fintech payments -is:retweet lang:en", "tag": "fintech"},
{"value": "UPI NEFT IMPS -is:retweet lang:en", "tag": "payments"},
]
add_rules(rules)
Raw tweets are noisy — URLs, mentions, hashtags, and special characters all reduce sentiment accuracy. A preprocessing pipeline cleans each tweet before scoring:
import re
def clean_tweet(text: str) -> str:
text = re.sub(r"http\S+", "", text) # Remove URLs
text = re.sub(r"@\w+", "", text) # Remove mentions
text = re.sub(r"#(\w+)", r"\1", text) # Hashtag → word
text = re.sub(r"[^\w\s]", "", text) # Remove punctuation
text = re.sub(r"\s+", " ", text) # Normalise whitespace
return text.strip().lower()
TextBlob's polarity score (−1 to +1) is used as the primary signal, supplemented by a domain keyword dictionary for finance-specific terms (e.g., "payment failed" is strongly negative even if TextBlob scores it neutrally).
from textblob import TextBlob
DOMAIN_OVERRIDES = {
"payment failed": -0.8,
"transaction declined": -0.7,
"instant transfer": +0.5,
"zero fee": +0.6,
}
def score_sentiment(raw_text: str) -> dict:
clean = clean_tweet(raw_text)
# Check domain overrides first
for phrase, score in DOMAIN_OVERRIDES.items():
if phrase in clean:
return {"score": score, "label": "positive" if score > 0 else "negative"}
polarity = TextBlob(clean).sentiment.polarity
label = "positive" if polarity > 0.05 else ("negative" if polarity < -0.05 else "neutral")
return {"score": round(polarity, 3), "label": label}
Scores are aggregated into 15-minute buckets to detect sentiment spikes — useful for catching negative viral moments before they escalate. A simple z-score alert fires when the rolling negative sentiment rate crosses 2 standard deviations above the 7-day baseline.
import pandas as pd
def detect_sentiment_spike(df: pd.DataFrame, window: str = "15min") -> pd.DataFrame:
df['ts'] = pd.to_datetime(df['created_at'])
agg = (
df.set_index('ts')
.resample(window)['label']
.value_counts(normalize=True)
.unstack(fill_value=0)
)
agg['neg_zscore'] = (
(agg.get('negative', 0) - agg.get('negative', 0).mean()) /
(agg.get('negative', 0).std() + 1e-9)
)
agg['alert'] = agg['neg_zscore'] > 2.0
return agg