Source code for scitex_session._lifecycle._archive._bulk

#!/usr/bin/env python3
# Timestamp: "2026-05-24 (ywatanabe)"
# File: src/scitex_session/_lifecycle/_archive/_bulk.py
"""Bulk archive / restore over a directory of sessions."""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Optional, Union

from ._core import (
    dir_size,
    format_to_suffix,
    is_session_dir_name,
    iter_session_candidates,
    validate_root,
)
from ._single import archive_session_dir, restore_session_archive

logger = logging.getLogger(__name__)


[docs] def archive_existing( root: Union[str, Path], older_than_days: Optional[float] = None, format: str = "tar.gz", pattern: Optional[str] = None, dry_run: bool = True, max_dirs: Optional[int] = None, track_bytes: bool = False, use_fd: bool = True, ) -> dict: """Bulk compress every session-shaped child of ``root``. Parameters ---------- track_bytes : bool, default False When True, run a per-candidate ``dir_size()`` (os.walk + getsize per file) so that ``bytes_in`` / ``bytes_out`` in the returned summary are populated. Off by default because the size accounting adds a measurable per-session overhead on top of the mandatory tar IO (observed on neurovista 2026-05-25: 80,408 sessions / 47 min wall-clock — the byte stats are a meaningful contributor). use_fd : bool, default True Forwarded to ``iter_session_candidates``. When True and ``fd`` is installed, candidate enumeration uses the parallel subprocess path; otherwise the Python ``iterdir+stat`` path. Returns ------- dict Summary {scanned, candidates, archived, skipped, failed, bytes_in, bytes_out}. ``bytes_in`` and ``bytes_out`` stay at 0 when ``track_bytes`` is False. """ root_path = Path(root) if not root_path.exists(): raise FileNotFoundError(f"Root does not exist: {root_path}") if not root_path.is_dir(): raise NotADirectoryError(f"Root is not a directory: {root_path}") validate_root(root_path) suffix = format_to_suffix(format) summary = { "scanned": 0, "candidates": 0, "archived": 0, "skipped": 0, "failed": 0, "bytes_in": 0, "bytes_out": 0, } for _entry in root_path.iterdir(): summary["scanned"] += 1 for candidate in iter_session_candidates( root_path, older_than_days, pattern, use_fd=use_fd ): archive_path = candidate.parent / (candidate.name + suffix) if archive_path.exists(): summary["skipped"] += 1 logger.info("Skipping (archive exists): %s", archive_path) continue summary["candidates"] += 1 if max_dirs is not None and summary["candidates"] > max_dirs: summary["candidates"] -= 1 logger.info("Reached max_dirs=%d cap; stopping.", max_dirs) break if track_bytes: try: src_bytes = dir_size(candidate) except OSError: src_bytes = 0 summary["bytes_in"] += src_bytes else: src_bytes = 0 if dry_run: if track_bytes: logger.info( "[dry-run] would archive: %s (%d bytes)", candidate, src_bytes, ) else: logger.info("[dry-run] would archive: %s", candidate) continue try: archive_path = archive_session_dir( candidate, format=format, remove_src=True ) summary["archived"] += 1 if track_bytes: try: out_bytes = archive_path.stat().st_size except OSError: out_bytes = 0 summary["bytes_out"] += out_bytes logger.info( "Archived: %s -> %s (%d -> %d bytes)", candidate, archive_path, src_bytes, out_bytes, ) else: logger.info("Archived: %s -> %s", candidate, archive_path) except Exception as e: summary["failed"] += 1 logger.warning("Failed to archive %s: %s", candidate, e) return summary
[docs] def restore_existing( root: Union[str, Path], pattern: str = "*.tar.gz", dest_root: Optional[Union[str, Path]] = None, remove_archive: bool = False, dry_run: bool = True, max_files: Optional[int] = None, track_bytes: bool = False, ) -> dict: """Bulk restore every archive matching ``pattern`` directly under ``root``. Returns ------- dict Summary {scanned, candidates, restored, skipped, failed}. """ root_path = Path(root) if not root_path.exists(): raise FileNotFoundError(f"Root does not exist: {root_path}") if not root_path.is_dir(): raise NotADirectoryError(f"Root is not a directory: {root_path}") validate_root(root_path) dest_root_path = Path(dest_root) if dest_root is not None else root_path if dest_root is not None: validate_root(dest_root_path) dest_root_path.mkdir(parents=True, exist_ok=True) summary = { "scanned": 0, "candidates": 0, "restored": 0, "skipped": 0, "failed": 0, } archives = sorted(p for p in root_path.glob(pattern) if p.is_file()) summary["scanned"] = len(archives) for arc in archives: name = arc.name stem = name for suf in (".tar.gz", ".tar.xz", ".tar"): if stem.endswith(suf): stem = stem[: -len(suf)] break if not is_session_dir_name(stem): summary["skipped"] += 1 logger.info("Skipping (non-session-name): %s", arc) continue dest = dest_root_path / stem if dest.exists(): summary["skipped"] += 1 logger.info("Skipping (dest exists): %s", dest) continue summary["candidates"] += 1 if max_files is not None and summary["candidates"] > max_files: summary["candidates"] -= 1 logger.info("Reached max_files=%d cap; stopping.", max_files) break if dry_run: logger.info("[dry-run] would restore: %s -> %s", arc, dest) continue try: restore_session_archive( arc, dest_dir=dest, remove_archive=remove_archive ) summary["restored"] += 1 logger.info("Restored: %s -> %s", arc, dest) except Exception as e: summary["failed"] += 1 logger.warning("Failed to restore %s: %s", arc, e) return summary
# EOF