Source code for socialia.twitter
"""Twitter/X API v2 poster."""
__all__ = ["Twitter"]
from typing import Any, Callable, Optional
from requests_oauthlib import OAuth1Session
from ._branding import get_env
from ._base import _Base
from ._twitter_growth import TwitterGrowthMixin
from ._twitter_read_backend import XquikReadBackend
[docs]
class Twitter(TwitterGrowthMixin, _Base):
"""Twitter/X API v2 client using OAuth 1.0a."""
platform_name = "twitter"
MAX_TWEET_LENGTH = 280
POST_ENDPOINT = "https://api.x.com/2/tweets"
DELETE_ENDPOINT = "https://api.x.com/2/tweets/{tweet_id}"
ME_ENDPOINT = "https://api.x.com/2/users/me"
USER_TWEETS_ENDPOINT = "https://api.x.com/2/users/{user_id}/tweets"
USER_MENTIONS_ENDPOINT = "https://api.x.com/2/users/{user_id}/mentions"
SEARCH_ENDPOINT = "https://api.x.com/2/tweets/search/recent"
MEDIA_UPLOAD_ENDPOINT = "https://upload.twitter.com/1.1/media/upload.json"
USER_BY_USERNAME_ENDPOINT = "https://api.x.com/2/users/by/username/{username}"
FOLLOW_ENDPOINT = "https://api.x.com/2/users/{source_user_id}/following"
UNFOLLOW_ENDPOINT = (
"https://api.x.com/2/users/{source_user_id}/following/{target_user_id}"
)
def __init__(
self,
consumer_key: Optional[str] = None,
consumer_secret: Optional[str] = None,
access_token: Optional[str] = None,
access_token_secret: Optional[str] = None,
*,
session_factory: Optional[Callable[[], object]] = None,
read_backend: Optional[Any] = None,
read_username: Optional[str] = None,
):
# ``session_factory`` is a zero-arg callable that returns the HTTP
# session object used to talk to the Twitter API. Production code
# leaves it ``None`` (and we build a real ``OAuth1Session``). Tests
# pass a hand-rolled fake to exercise behaviour without hitting the
# network — see ``tests/test_twitter.py``.
self.consumer_key = consumer_key or get_env("X_CONSUMER_KEY")
self.consumer_secret = consumer_secret or get_env("X_CONSUMER_KEY_SECRET")
self.access_token = access_token or get_env("X_ACCESSTOKEN")
self.access_token_secret = access_token_secret or get_env(
"X_ACCESSTOKEN_SECRET"
)
self._session_factory = session_factory
self.read_username = (read_username or get_env("X_READ_USERNAME") or "").lstrip(
"@"
)
self._read_backend = read_backend or self._configured_read_backend()
def _configured_read_backend(self) -> Optional[XquikReadBackend]:
backend = (get_env("X_READ_BACKEND") or "").lower().replace("_", "-")
if backend == "xquik":
return XquikReadBackend()
return None
def _has_read_backend(self) -> bool:
if self._read_backend is None:
return False
available = getattr(self._read_backend, "available", None)
if callable(available):
return bool(available())
return True
def _read_backend_call(self, method_name: str, *args, **kwargs) -> Optional[dict]:
if not self._has_read_backend():
return None
method = getattr(self._read_backend, method_name, None)
if callable(method):
return method(*args, **kwargs)
return None
def _read_backend_user_call(self, method_name: str, **kwargs) -> Optional[dict]:
if not self.read_username:
return None
return self._read_backend_call(method_name, self.read_username, **kwargs)
def _get_session(self):
"""Create OAuth1 session (or the injected fake session)."""
if self._session_factory is not None:
return self._session_factory()
return OAuth1Session(
self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.access_token,
resource_owner_secret=self.access_token_secret,
)
[docs]
def validate_credentials(self) -> bool:
"""Check if all credentials are set."""
return all(
[
self.consumer_key,
self.consumer_secret,
self.access_token,
self.access_token_secret,
]
)
[docs]
def validate_read_credentials(self) -> bool:
"""Check if read operations can use OAuth or the optional read backend."""
return self.validate_credentials() or (
self._has_read_backend() and bool(self.read_username)
)
[docs]
def upload_media(self, file_path: str) -> dict:
"""
Upload media file to Twitter (images and videos).
Automatically detects file type and uses:
- Simple upload for images (jpg, png, gif, webp)
- Chunked upload for videos (mp4, mov)
Args:
file_path: Path to media file
Returns:
dict with 'success', 'media_id' or 'error'
"""
from . import _twitter_media
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
return _twitter_media.upload_media(self._get_session(), file_path)
[docs]
def post(
self,
text: str,
reply_to: Optional[str] = None,
quote_tweet_id: Optional[str] = None,
media_ids: Optional[list] = None,
) -> dict:
"""
Post a tweet.
Args:
text: Tweet content (max 280 chars standard, 25000 premium)
reply_to: Tweet ID to reply to
quote_tweet_id: Tweet ID to quote
media_ids: List of media IDs from upload_media()
Returns:
dict with 'success', 'id', 'url' or 'error'
"""
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
# Validate tweet length
if len(text) > self.MAX_TWEET_LENGTH:
return {
"success": False,
"error": f"Tweet too long: {len(text)} chars (max {self.MAX_TWEET_LENGTH})",
}
oauth = self._get_session()
payload = {"text": text}
if reply_to:
payload["reply"] = {"in_reply_to_tweet_id": reply_to}
if quote_tweet_id:
payload["quote_tweet_id"] = quote_tweet_id
if media_ids:
payload["media"] = {"media_ids": media_ids}
response = oauth.post(self.POST_ENDPOINT, json=payload)
if response.status_code == 201:
data = response.json()
tweet_id = data["data"]["id"]
return {
"success": True,
"id": tweet_id,
"url": f"https://x.com/i/web/status/{tweet_id}",
}
else:
return {
"success": False,
"error": f"{response.status_code}: {response.text}",
}
[docs]
def delete(self, post_id: str) -> dict:
"""
Delete a tweet.
Args:
post_id: Tweet ID to delete
Returns:
dict with 'success' and 'deleted' or 'error'
"""
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
oauth = self._get_session()
url = self.DELETE_ENDPOINT.format(tweet_id=post_id)
response = oauth.delete(url)
if response.status_code == 200:
return {"success": True, "deleted": True}
else:
return {
"success": False,
"error": f"{response.status_code}: {response.text}",
}
[docs]
def post_thread(self, tweets: list[str]) -> dict:
"""
Post a thread of tweets.
Args:
tweets: List of tweet texts
Returns:
dict with 'success', 'ids', 'urls' or 'error'
"""
# Validate all tweets before posting
for i, text in enumerate(tweets):
if len(text) > self.MAX_TWEET_LENGTH:
return {
"success": False,
"error": f"Tweet {i + 1} too long: {len(text)} chars (max {self.MAX_TWEET_LENGTH})",
}
ids = []
urls = []
reply_to = None
for i, text in enumerate(tweets):
result = self.post(text, reply_to=reply_to)
if result["success"]:
ids.append(result["id"])
urls.append(result["url"])
reply_to = result["id"]
else:
return {
"success": False,
"error": f"Failed at tweet {i + 1}: {result['error']}",
"partial_ids": ids,
}
return {"success": True, "ids": ids, "urls": urls}
[docs]
def me(self) -> dict:
"""
Get authenticated user information.
Returns:
dict with 'success', 'id', 'username', 'name' or 'error'
"""
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
oauth = self._get_session()
response = oauth.get(
self.ME_ENDPOINT,
params={"user.fields": "id,name,username,public_metrics,profile_image_url"},
)
if response.status_code == 200:
data = response.json()["data"]
return {
"success": True,
"id": data["id"],
"username": data["username"],
"name": data["name"],
"followers": data.get("public_metrics", {}).get("followers_count", 0),
"following": data.get("public_metrics", {}).get("following_count", 0),
"tweets": data.get("public_metrics", {}).get("tweet_count", 0),
"url": f"https://x.com/{data['username']}",
}
return {"success": False, "error": f"{response.status_code}: {response.text}"}
[docs]
def feed(self, limit: int = 10) -> dict:
"""
Get user's recent tweets.
Args:
limit: Maximum number of tweets to return (max 100)
Returns:
dict with 'success', 'tweets' list or 'error'
"""
backend_result = self._read_backend_user_call("user_tweets", limit=limit)
if backend_result is not None:
return backend_result
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
# First get user ID
user_info = self.me()
if not user_info.get("success"):
return user_info
oauth = self._get_session()
url = self.USER_TWEETS_ENDPOINT.format(user_id=user_info["id"])
response = oauth.get(
url,
params={
"max_results": max(5, min(limit, 100)), # Twitter API requires 5-100
"tweet.fields": "created_at,public_metrics,text",
},
)
if response.status_code == 200:
data = response.json()
tweets = []
for tweet in data.get("data", []):
metrics = tweet.get("public_metrics", {})
tweets.append(
{
"id": tweet["id"],
"text": tweet["text"],
"created_at": tweet.get("created_at"),
"likes": metrics.get("like_count", 0),
"retweets": metrics.get("retweet_count", 0),
"replies": metrics.get("reply_count", 0),
"url": f"https://x.com/i/web/status/{tweet['id']}",
}
)
return {"success": True, "tweets": tweets, "count": len(tweets)}
return {"success": False, "error": f"{response.status_code}: {response.text}"}
[docs]
def mentions(self, limit: int = 10) -> dict:
"""
Get recent mentions of the user.
Args:
limit: Maximum number of mentions to return (max 100)
Returns:
dict with 'success', 'mentions' list or 'error'
"""
backend_result = self._read_backend_user_call("mentions", limit=limit)
if backend_result is not None:
return backend_result
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
# First get user ID
user_info = self.me()
if not user_info.get("success"):
return user_info
oauth = self._get_session()
url = self.USER_MENTIONS_ENDPOINT.format(user_id=user_info["id"])
response = oauth.get(
url,
params={
"max_results": max(5, min(limit, 100)), # Twitter API requires 5-100
"tweet.fields": "created_at,public_metrics,text,author_id",
"expansions": "author_id",
"user.fields": "username,name",
},
)
if response.status_code == 200:
data = response.json()
# Build user lookup
users = {}
for user in data.get("includes", {}).get("users", []):
users[user["id"]] = user
mentions = []
for tweet in data.get("data", []):
author = users.get(tweet.get("author_id"), {})
mentions.append(
{
"id": tweet["id"],
"text": tweet["text"],
"created_at": tweet.get("created_at"),
"author_id": tweet.get("author_id"),
"author_username": author.get("username"),
"author_name": author.get("name"),
"url": f"https://x.com/i/web/status/{tweet['id']}",
}
)
return {"success": True, "mentions": mentions, "count": len(mentions)}
return {"success": False, "error": f"{response.status_code}: {response.text}"}
[docs]
def replies(self, limit: int = 10) -> dict:
"""
Get recent replies to the user's tweets.
Args:
limit: Maximum number of replies to return (max 100)
Returns:
dict with 'success', 'replies' list or 'error'
"""
backend_result = self._read_backend_user_call("replies", limit=limit)
if backend_result is not None:
return backend_result
if not self.validate_credentials():
return {"success": False, "error": "Missing credentials"}
# First get user info
user_info = self.me()
if not user_info.get("success"):
return user_info
oauth = self._get_session()
# Search for replies to this user (excluding own tweets)
query = f"to:{user_info['username']} -from:{user_info['username']}"
response = oauth.get(
self.SEARCH_ENDPOINT,
params={
"query": query,
"max_results": max(10, min(limit, 100)), # Search requires 10-100
"tweet.fields": "created_at,public_metrics,text,author_id,in_reply_to_user_id,conversation_id",
"expansions": "author_id",
"user.fields": "username,name",
},
)
if response.status_code == 200:
data = response.json()
# Build user lookup
users = {}
for user in data.get("includes", {}).get("users", []):
users[user["id"]] = user
replies = []
for tweet in data.get("data", []):
author = users.get(tweet.get("author_id"), {})
metrics = tweet.get("public_metrics", {})
replies.append(
{
"id": tweet["id"],
"text": tweet["text"],
"created_at": tweet.get("created_at"),
"author_id": tweet.get("author_id"),
"author_username": author.get("username"),
"author_name": author.get("name"),
"conversation_id": tweet.get("conversation_id"),
"likes": metrics.get("like_count", 0),
"retweets": metrics.get("retweet_count", 0),
"url": f"https://x.com/i/web/status/{tweet['id']}",
}
)
return {"success": True, "replies": replies, "count": len(replies)}
return {"success": False, "error": f"{response.status_code}: {response.text}"}