← Back to Projects

Twitter/X API Sentiment Pipeline

Published on: December 10, 2023  ·  Python · X API v2 · TextBlob · Pandas


Built a sentiment analysis pipeline on live Twitter/X data using the v2 API, with custom text preprocessing, sentiment scoring, and trend detection across keyword-defined topics.


Overview

The X (formerly Twitter) API v2 provides access to recent and filtered tweet streams. This project uses the filtered stream endpoint to collect tweets matching specific keywords in real time, runs preprocessing to clean the raw text, and applies TextBlob for sentiment scoring along with custom rule-based enhancements for domain-specific language.


1. Connecting to the X API v2

Authentication uses OAuth 2.0 Bearer Token. The filtered stream endpoint lets you define rules (keyword filters) that the API applies server-side before streaming results.

import requests
import os

BEARER_TOKEN = os.environ.get("TWITTER_BEARER_TOKEN")

def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {BEARER_TOKEN}"
    r.headers["User-Agent"]    = "v2FilteredStreamPython"
    return r

def add_rules(rules):
    payload = {"add": rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )
    return response.json()

# Define keyword filters
rules = [
    {"value": "fintech payments -is:retweet lang:en", "tag": "fintech"},
    {"value": "UPI NEFT IMPS -is:retweet lang:en",    "tag": "payments"},
]
add_rules(rules)

2. Text Preprocessing

Raw tweets are noisy — URLs, mentions, hashtags, and special characters all reduce sentiment accuracy. A preprocessing pipeline cleans each tweet before scoring:

import re

def clean_tweet(text: str) -> str:
    text = re.sub(r"http\S+",  "",  text)   # Remove URLs
    text = re.sub(r"@\w+",     "",  text)   # Remove mentions
    text = re.sub(r"#(\w+)",   r"\1", text) # Hashtag → word
    text = re.sub(r"[^\w\s]",  "",  text)   # Remove punctuation
    text = re.sub(r"\s+",      " ", text)   # Normalise whitespace
    return text.strip().lower()

3. Sentiment Scoring

TextBlob's polarity score (−1 to +1) is used as the primary signal, supplemented by a domain keyword dictionary for finance-specific terms (e.g., "payment failed" is strongly negative even if TextBlob scores it neutrally).

from textblob import TextBlob

DOMAIN_OVERRIDES = {
    "payment failed": -0.8,
    "transaction declined": -0.7,
    "instant transfer": +0.5,
    "zero fee": +0.6,
}

def score_sentiment(raw_text: str) -> dict:
    clean = clean_tweet(raw_text)

    # Check domain overrides first
    for phrase, score in DOMAIN_OVERRIDES.items():
        if phrase in clean:
            return {"score": score, "label": "positive" if score > 0 else "negative"}

    polarity = TextBlob(clean).sentiment.polarity
    label = "positive" if polarity > 0.05 else ("negative" if polarity < -0.05 else "neutral")
    return {"score": round(polarity, 3), "label": label}

4. Trend Detection

Scores are aggregated into 15-minute buckets to detect sentiment spikes — useful for catching negative viral moments before they escalate. A simple z-score alert fires when the rolling negative sentiment rate crosses 2 standard deviations above the 7-day baseline.

import pandas as pd

def detect_sentiment_spike(df: pd.DataFrame, window: str = "15min") -> pd.DataFrame:
    df['ts'] = pd.to_datetime(df['created_at'])
    agg = (
        df.set_index('ts')
        .resample(window)['label']
        .value_counts(normalize=True)
        .unstack(fill_value=0)
    )
    agg['neg_zscore'] = (
        (agg.get('negative', 0) - agg.get('negative', 0).mean()) /
        (agg.get('negative', 0).std() + 1e-9)
    )
    agg['alert'] = agg['neg_zscore'] > 2.0
    return agg

Results & Use Cases