Source code for socialia.youtube

"""YouTube API integration for video uploads and community posts."""

__all__ = ["YouTube"]

import os
from typing import Optional

from ._base import _Base
from ._branding import get_env
from ._paths import get_youtube_token_file as _get_youtube_token_file

from scitex_dev import try_import_optional

Credentials = try_import_optional(
    "google.oauth2.credentials", attr="Credentials", extra="youtube", pkg="google-auth"
)
InstalledAppFlow = try_import_optional(
    "google_auth_oauthlib.flow",
    attr="InstalledAppFlow",
    extra="youtube",
    pkg="google-auth-oauthlib",
)
Request = try_import_optional(
    "google.auth.transport.requests",
    attr="Request",
    extra="youtube",
    pkg="google-auth",
)
build = try_import_optional(
    "googleapiclient.discovery",
    attr="build",
    extra="youtube",
    pkg="google-api-python-client",
)
MediaFileUpload = try_import_optional(
    "googleapiclient.http",
    attr="MediaFileUpload",
    extra="youtube",
    pkg="google-api-python-client",
)
HAS_YOUTUBE = all(
    x is not None
    for x in (Credentials, InstalledAppFlow, Request, build, MediaFileUpload)
)

SCOPES = [
    "https://www.googleapis.com/auth/youtube.upload",
    "https://www.googleapis.com/auth/youtube",
    "https://www.googleapis.com/auth/youtube.force-ssl",
]


[docs] class YouTube(_Base): """YouTube API client for video uploads and management. Environment Variables: YOUTUBE_CLIENT_SECRETS_FILE: Path to OAuth client secrets JSON YOUTUBE_TOKEN_FILE: Path to store OAuth tokens (default: runtime/youtube_token.json) """ platform_name = "youtube"
[docs] def __init__( self, client_secrets_file: Optional[str] = None, token_file: Optional[str] = None, ): """ Initialize YouTube client. Args: client_secrets_file: Path to OAuth client secrets JSON from Google Console token_file: Path to store/load OAuth tokens """ self.client_secrets_file = client_secrets_file or get_env( "YOUTUBE_CLIENT_SECRETS_FILE" ) default_token = str(_get_youtube_token_file()) self.token_file = token_file or (get_env("YOUTUBE_TOKEN_FILE") or default_token) self._youtube = None self._credentials = None
[docs] def validate_credentials(self) -> bool: """Check if credentials are available.""" if not HAS_YOUTUBE: return False if os.path.exists(self.token_file): return True if self.client_secrets_file and os.path.exists(self.client_secrets_file): return True return False
def _get_credentials(self) -> Optional["Credentials"]: """Get or refresh OAuth credentials.""" if not HAS_YOUTUBE: return None creds = None if os.path.exists(self.token_file): creds = Credentials.from_authorized_user_file(self.token_file, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) elif self.client_secrets_file and os.path.exists(self.client_secrets_file): flow = InstalledAppFlow.from_client_secrets_file( self.client_secrets_file, SCOPES ) creds = flow.run_local_server(port=0) else: return None with open(self.token_file, "w") as f: f.write(creds.to_json()) return creds def _get_client(self): """Get authenticated YouTube client.""" if self._youtube: return self._youtube creds = self._get_credentials() if not creds: return None self._youtube = build("youtube", "v3", credentials=creds) return self._youtube
[docs] def post( self, text: str, video_path: Optional[str] = None, title: Optional[str] = None, description: Optional[str] = None, tags: Optional[list] = None, category_id: str = "22", privacy_status: str = "public", thumbnail_path: Optional[str] = None, ) -> dict: """Upload a video or create a community post. Args: text: Description text (or community post text if no video) video_path: Path to video file (required for video upload) title: Video title (uses first line of text if not provided) description: Video description (uses text if not provided) tags: List of tags for the video category_id: YouTube category ID (default: 22 = People & Blogs) privacy_status: 'public', 'private', or 'unlisted' thumbnail_path: Path to custom thumbnail image Returns: dict with 'success', 'id', 'url' or 'error' """ if not HAS_YOUTUBE: return { "success": False, "error": "YouTube libraries not installed. Run: pip install google-api-python-client google-auth-oauthlib", } if not self.validate_credentials(): return {"success": False, "error": "Missing YouTube credentials"} youtube = self._get_client() if not youtube: return {"success": False, "error": "Could not create YouTube client"} if video_path: return self._upload_video( youtube, video_path=video_path, title=title or text.split("\n")[0][:100], description=description or text, tags=tags or [], category_id=category_id, privacy_status=privacy_status, thumbnail_path=thumbnail_path, ) return self._create_community_post(youtube, text)
def _upload_video( self, youtube, video_path: str, title: str, description: str, tags: list, category_id: str, privacy_status: str, thumbnail_path: Optional[str] = None, ) -> dict: """Upload a video to YouTube.""" if not os.path.exists(video_path): return {"success": False, "error": f"Video file not found: {video_path}"} body = { "snippet": { "title": title, "description": description, "tags": tags, "categoryId": category_id, }, "status": { "privacyStatus": privacy_status, "selfDeclaredMadeForKids": False, }, } try: media = MediaFileUpload( video_path, chunksize=1024 * 1024, resumable=True, ) request = youtube.videos().insert( part=",".join(body.keys()), body=body, media_body=media, ) response = None while response is None: status, response = request.next_chunk() video_id = response["id"] if thumbnail_path and os.path.exists(thumbnail_path): try: youtube.thumbnails().set( videoId=video_id, media_body=MediaFileUpload(thumbnail_path), ).execute() except Exception: pass return { "success": True, "id": video_id, "url": f"https://www.youtube.com/watch?v={video_id}", "title": title, } except Exception as e: return {"success": False, "error": str(e)} def _create_community_post(self, youtube, text: str) -> dict: """Create a community post (placeholder - requires channel eligibility).""" return { "success": False, "error": "Community posts require channel eligibility (500+ subscribers) and manual posting via YouTube Studio", }
[docs] def delete(self, video_id: str) -> dict: """Delete a video by ID. Args: video_id: YouTube video ID Returns: dict with 'success' or 'error' """ if not HAS_YOUTUBE: return {"success": False, "error": "YouTube libraries not installed"} if not self.validate_credentials(): return {"success": False, "error": "Missing credentials"} youtube = self._get_client() if not youtube: return {"success": False, "error": "Could not create YouTube client"} try: youtube.videos().delete(id=video_id).execute() return {"success": True, "deleted": True} except Exception as e: return {"success": False, "error": str(e)}
[docs] def update( self, video_id: str, title: Optional[str] = None, description: Optional[str] = None, tags: Optional[list] = None, privacy_status: Optional[str] = None, ) -> dict: """Update video metadata. Args: video_id: YouTube video ID title: New title (optional) description: New description (optional) tags: New tags (optional) privacy_status: New privacy status (optional) Returns: dict with 'success' or 'error' """ if not HAS_YOUTUBE: return {"success": False, "error": "YouTube libraries not installed"} youtube = self._get_client() if not youtube: return {"success": False, "error": "Could not create YouTube client"} try: current = ( youtube.videos().list(part="snippet,status", id=video_id).execute() ) if not current.get("items"): return {"success": False, "error": f"Video not found: {video_id}"} video = current["items"][0] snippet = video["snippet"] status = video["status"] if title: snippet["title"] = title if description: snippet["description"] = description if tags: snippet["tags"] = tags if privacy_status: status["privacyStatus"] = privacy_status youtube.videos().update( part="snippet,status", body={"id": video_id, "snippet": snippet, "status": status}, ).execute() return { "success": True, "id": video_id, "url": f"https://www.youtube.com/watch?v={video_id}", } except Exception as e: return {"success": False, "error": str(e)}
[docs] def get_channel_info(self) -> dict: """Get authenticated user's channel information.""" if not HAS_YOUTUBE: return {"success": False, "error": "YouTube libraries not installed"} youtube = self._get_client() if not youtube: return {"success": False, "error": "Could not create YouTube client"} try: response = ( youtube.channels().list(part="snippet,statistics", mine=True).execute() ) if not response.get("items"): return {"success": False, "error": "No channel found"} channel = response["items"][0] return { "success": True, "id": channel["id"], "title": channel["snippet"]["title"], "description": channel["snippet"].get("description", "")[:100], "subscribers": channel["statistics"].get("subscriberCount", "hidden"), "videos": channel["statistics"].get("videoCount", 0), "views": channel["statistics"].get("viewCount", 0), "url": f"https://www.youtube.com/channel/{channel['id']}", } except Exception as e: return {"success": False, "error": str(e)}
[docs] def list_videos(self, max_results: int = 10) -> dict: """List user's uploaded videos. Args: max_results: Maximum number of videos to return Returns: dict with videos list or error """ if not HAS_YOUTUBE: return {"success": False, "error": "YouTube libraries not installed"} youtube = self._get_client() if not youtube: return {"success": False, "error": "Could not create YouTube client"} try: channels = ( youtube.channels().list(part="contentDetails", mine=True).execute() ) if not channels.get("items"): return {"success": False, "error": "No channel found"} uploads_id = channels["items"][0]["contentDetails"]["relatedPlaylists"][ "uploads" ] videos_response = ( youtube.playlistItems() .list( part="snippet", playlistId=uploads_id, maxResults=max_results, ) .execute() ) videos = [] for item in videos_response.get("items", []): snippet = item["snippet"] video_id = snippet["resourceId"]["videoId"] videos.append( { "id": video_id, "title": snippet["title"], "description": snippet.get("description", "")[:100], "published_at": snippet["publishedAt"], "url": f"https://www.youtube.com/watch?v={video_id}", } ) return {"success": True, "videos": videos, "count": len(videos)} except Exception as e: return {"success": False, "error": str(e)}
[docs] def me(self) -> dict: """Get authenticated user's channel information.""" return self.get_channel_info()
[docs] def feed(self, limit: int = 10) -> dict: """Get user's recent videos. Args: limit: Maximum number of videos to return Returns: dict with 'success', 'posts' list or 'error' """ result = self.list_videos(max_results=limit) if result.get("success"): return { "success": True, "posts": result.get("videos", []), "count": result.get("count", 0), } return result