Source code for scitex_dev._sync._local

#!/usr/bin/env python3
# Timestamp: 2026-02-24
# File: scitex_dev/sync.py

"""Ecosystem package sync across local and remote hosts.

Safety model (like bulk_rename):
  - All operations default to dry_run=True (preview only).
  - Pass confirm=True to actually execute.
  - CLI requires --confirm flag.
  - MCP tool requires confirm=True parameter.
"""

from __future__ import annotations

import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any

from .._core.config import DevConfig, HostConfig, get_enabled_hosts, load_config

# ---------------------------------------------------------------------------
# SSH helpers
# ---------------------------------------------------------------------------


def _build_ssh_args(host: HostConfig) -> list[str]:
    """Build SSH command prefix for a host."""
    args = ["ssh"]
    if host.ssh_key:
        args.extend(["-i", host.ssh_key])
    if host.port != 22:
        args.extend(["-p", str(host.port)])
    args.extend(
        [
            "-o",
            "BatchMode=yes",
            "-o",
            "StrictHostKeyChecking=accept-new",
            "-o",
            "ConnectTimeout=10",
        ]
    )
    args.append(f"{host.user}@{host.hostname}")
    return args


def _get_host_packages(host: HostConfig, config: DevConfig) -> list[tuple[str, str]]:
    """Get (package_name, remote_dir_name) pairs for a host.

    Resolution: legacy ``host.packages`` allow-list wins; otherwise
    default to all ecosystem packages minus ``host.exclude``.
    """
    pkg_map = {p.name: p for p in config.packages}
    if host.packages:
        names = list(host.packages)
    else:
        excluded = set(host.exclude or [])
        names = [p.name for p in config.packages if p.name not in excluded]
    out = []
    for name in names:
        pkg = pkg_map.get(name)
        if pkg and pkg.local_path:
            out.append((name, Path(pkg.local_path).expanduser().name))
    return out


def _build_sync_commands(
    host: HostConfig, dir_name: str, stash: bool, install: bool
) -> list[str]:
    """Build the shell commands that would be run for a package."""
    base = f"{host.remote_base}/{dir_name}"
    cmds = [f"cd {base}"]
    if install:
        # Ensure .venv symlink exists so pip resolves to the right env
        cmds.append("test -e .venv || ln -s ~/.venv .venv 2>/dev/null || true")
    if stash:
        cmds.append("git stash")
    cmds.append("git pull")
    if install:
        cmds.append(f"{host.pip_bin} install -e . -q")
    if stash:
        cmds.append("git stash pop 2>/dev/null || true")
    return cmds


def _check_ahead_state(
    host: HostConfig,
    dir_name: str,
    *,
    subprocess_run=None,
) -> dict[str, Any]:
    """Inspect the remote working copy's position vs. its upstream.

    Returns a dict with ``status`` in:
      - ``clean``: up-to-date or only behind upstream (safe to pull).
      - ``ahead``: remote-side has unpushed commits — fast-forward pull
        would be blocked; skip to avoid clobbering local work.
      - ``diverged``: both ahead and behind — needs human decision.
      - ``missing``: repo dir or git metadata absent.
      - ``error``: SSH/git failure; see ``error`` field.

    Also returns ``local_ahead`` (remote-side commits ahead of
    ``@{u}``) and ``remote_ahead`` (``@{u}`` commits ahead of HEAD).
    "Local" here is confusingly the *remote host's* working copy —
    the caller is on some other machine.
    """
    base = f"{host.remote_base}/{dir_name}"
    # Single-line remote script — uses unique sentinels so output parsing
    # survives line-wrap/colorization from git or shell integrations.
    remote_cmd = (
        f"cd {base} 2>/dev/null || {{ echo SACDEV_MISSING; exit 0; }}; "
        "if [ ! -d .git ]; then echo SACDEV_MISSING; exit 0; fi; "
        "git fetch -q 2>/dev/null || true; "
        'la=$(git rev-list --count "@{u}..HEAD" 2>/dev/null || echo 0); '
        'ra=$(git rev-list --count "HEAD..@{u}" 2>/dev/null || echo 0); '
        "echo SACDEV_STATE la=$la ra=$ra"
    )
    ssh_args = _build_ssh_args(host)
    ssh_args.append(remote_cmd)
    runner = subprocess_run if subprocess_run is not None else subprocess.run
    try:
        result = runner(ssh_args, capture_output=True, text=True, timeout=60)
        stdout = result.stdout.strip()
        if result.returncode != 0:
            return {
                "status": "error",
                "error": (result.stderr.strip() or f"exit {result.returncode}"),
            }
        if "SACDEV_MISSING" in stdout:
            return {"status": "missing"}
        # Parse: "SACDEV_STATE la=X ra=Y"
        la, ra = 0, 0
        for tok in stdout.split():
            if tok.startswith("la="):
                la = int(tok[3:] or 0)
            elif tok.startswith("ra="):
                ra = int(tok[3:] or 0)
        if la > 0 and ra > 0:
            status = "diverged"
        elif la > 0:
            status = "ahead"
        else:
            status = "clean"
        return {"status": status, "local_ahead": la, "remote_ahead": ra}
    except subprocess.TimeoutExpired:
        return {"status": "error", "error": "ahead-check SSH timed out (60s)"}
    except Exception as e:
        return {"status": "error", "error": str(e)}


def _sync_one_package(
    host: HostConfig,
    dir_name: str,
    stash: bool,
    install: bool,
    safe: bool = True,
    *,
    subprocess_run=None,
    check_ahead_state_fn=None,
) -> dict[str, Any]:
    """Sync a single package on a remote host.

    When ``safe`` is True (default), first check whether the remote
    working copy has unpushed commits; if so, skip to avoid clobbering.
    This matches the user instruction: if remote is ahead, compare and
    skip when a decision is unclear.
    """
    check_ahead = (
        check_ahead_state_fn if check_ahead_state_fn is not None else _check_ahead_state
    )
    runner = subprocess_run if subprocess_run is not None else subprocess.run
    if safe:
        ahead = check_ahead(host, dir_name)
        if ahead["status"] in {"ahead", "diverged"}:
            return {
                "status": f"skipped_{ahead['status']}",
                "local_ahead": ahead.get("local_ahead", 0),
                "remote_ahead": ahead.get("remote_ahead", 0),
                "reason": (
                    "remote working copy has unpushed commits; resolve manually "
                    "(push / rebase / reset) then re-run sync"
                ),
            }
        if ahead["status"] == "missing":
            return {"status": "missing", "reason": "package dir not on remote"}
        if ahead["status"] == "error":
            return {
                "status": "error",
                "error": ahead.get("error", "ahead-check failed"),
            }

    cmds = _build_sync_commands(host, dir_name, stash, install)
    remote_cmd = " && ".join(cmds)

    ssh_args = _build_ssh_args(host)
    ssh_args.append(remote_cmd)

    try:
        result = runner(ssh_args, capture_output=True, text=True, timeout=120)
        stdout = result.stdout.strip()
        stderr = result.stderr.strip()

        if result.returncode == 0:
            return {"status": "ok", "output": stdout}
        return {
            "status": "error",
            "output": stdout,
            "error": stderr or f"exit code {result.returncode}",
        }
    except subprocess.TimeoutExpired:
        return {"status": "timeout", "error": "SSH command timed out (120s)"}
    except Exception as e:
        return {"status": "error", "error": str(e)}


# ---------------------------------------------------------------------------
# Public API — all default to dry_run=True (safe preview)
# ---------------------------------------------------------------------------


[docs] def sync_host( host: HostConfig, packages: list[str] | None = None, stash: bool = True, install: bool = True, safe: bool = True, confirm: bool = False, config: DevConfig | None = None, *, host_packages_fn=None, ) -> dict[str, Any]: """Sync packages to a remote host via SSH. Safety: defaults to preview only. Pass confirm=True to execute. Steps per package: ahead-check (if safe), git stash, git pull, pip install -e ., git stash pop. Parameters ---------- host : HostConfig Target host configuration. packages : list[str] | None Package names to sync. None = use host's configured packages. stash : bool Git stash before pull (default True). install : bool Pip install after pull (default True). safe : bool If True (default), pre-check each remote working copy and skip packages whose HEAD is ahead of / diverged from upstream so we never clobber unpushed commits. confirm : bool If False (default), preview only (dry run). If True, execute the sync operation. config : DevConfig | None Configuration. Loaded from default if None. Returns ------- dict Per-package results: {package: {status, commands|output, error}}. ``status`` includes ``ok``, ``skipped_ahead``, ``skipped_diverged``, ``missing``, ``error``, ``timeout``, or ``dry_run``. """ if config is None: config = load_config() get_host_pkgs = ( host_packages_fn if host_packages_fn is not None else _get_host_packages ) host_pkgs = get_host_pkgs(host, config) if packages: host_pkgs = [(n, d) for n, d in host_pkgs if n in packages] if not confirm: return { name: { "status": "dry_run", "commands": _build_sync_commands(host, dir_name, stash, install), "safe_check": safe, } for name, dir_name in host_pkgs } # Parallel package sync within a single host results: dict[str, Any] = {} with ThreadPoolExecutor(max_workers=4) as executor: futures = { executor.submit( _sync_one_package, host, dir_name, stash, install, safe ): name for name, dir_name in host_pkgs } for future in as_completed(futures): name = futures[future] try: results[name] = future.result() except Exception as e: results[name] = {"status": "error", "error": str(e)} return results
[docs] def sync_all( hosts: list[str] | None = None, packages: list[str] | None = None, stash: bool = True, install: bool = True, safe: bool = True, confirm: bool = False, config: DevConfig | None = None, *, sync_host_fn=None, enabled_hosts_fn=None, ) -> dict[str, Any]: """Sync packages across all enabled hosts. Safety: defaults to preview only. Pass confirm=True to execute. Parallel: hosts are synced concurrently by default. Parameters ---------- hosts : list[str] | None Host names to sync. None = all enabled hosts. packages : list[str] | None Package names. None = host-specific defaults. stash : bool Git stash before pull. install : bool Pip install after pull. safe : bool If True (default), skip packages whose remote working copy is ahead of / diverged from upstream (never clobber unpushed work). confirm : bool If False (default), preview only (dry run). If True, execute the sync operation. config : DevConfig | None Configuration. Returns ------- dict {host_name: {package: result}}. """ if config is None: config = load_config() enabled_fn = enabled_hosts_fn if enabled_hosts_fn is not None else get_enabled_hosts do_sync_host = sync_host_fn if sync_host_fn is not None else sync_host enabled = enabled_fn(config) if hosts: enabled = [h for h in enabled if h.name in hosts] if not confirm: # Dry-run: no SSH needed, compute locally return { host.name: do_sync_host( host, packages=packages, stash=stash, install=install, safe=safe, confirm=False, config=config, ) for host in enabled } # Execute: parallel across hosts results: dict[str, Any] = {} with ThreadPoolExecutor(max_workers=len(enabled) or 1) as executor: futures = { executor.submit( do_sync_host, host, packages=packages, stash=stash, install=install, safe=safe, confirm=True, config=config, ): host.name for host in enabled } for future in as_completed(futures): host_name = futures[future] try: results[host_name] = future.result() except Exception as e: results[host_name] = {"error": str(e)} return results
def _install_one(pkg, path: Path) -> tuple[str, dict[str, Any]]: """Run `pip install -e <path>` for one package; return (name, result).""" try: # sys.executable -m pip — bare `pip` finds the first one on PATH, # which can be a system Python with stale metadata (spartan's # /usr/bin/pip is Python 3.9 and fails on >=3.10 wheels). result = subprocess.run( [sys.executable, "-m", "pip", "install", "-e", str(path), "-q"], capture_output=True, text=True, timeout=300, ) if result.returncode == 0: return pkg.name, {"status": "ok", "output": result.stdout.strip()} return pkg.name, {"status": "error", "error": result.stderr.strip()} except Exception as e: return pkg.name, {"status": "error", "error": str(e)}
[docs] def sync_local( packages: list[str] | None = None, confirm: bool = False, config: DevConfig | None = None, jobs: int = 1, on_progress=None, ) -> dict[str, Any]: """Install all local editable packages. Safety: defaults to preview only. Pass confirm=True to execute. Parameters ---------- packages : list[str] | None Package names. None = all configured packages. confirm : bool If False (default), preview only. If True, execute pip install -e. config : DevConfig | None Configuration. jobs : int Parallel installs. 1 = serial (default). 0 or negative = all CPUs. on_progress : callable | None Optional callback ``f(idx, total, name, status, elapsed)`` invoked as each package finishes. Returns ------- dict {package: {status, output|commands}}. """ import os import time from concurrent.futures import ThreadPoolExecutor, as_completed if config is None: config = load_config() targets = config.packages if packages: targets = [p for p in targets if p.name in packages] # Resolve installable targets (skip missing-path early) work: list[tuple[Any, Path]] = [] results: dict[str, Any] = {} for pkg in targets: if not pkg.local_path: continue path = Path(pkg.local_path).expanduser() if not path.exists(): results[pkg.name] = {"status": "skipped", "error": f"{path} not found"} continue if not confirm: results[pkg.name] = { "status": "dry_run", "commands": [ sys.executable, "-m", "pip", "install", "-e", str(path), "-q", ], } continue work.append((pkg, path)) if not work: return results # Resolve --jobs if jobs <= 0: jobs = os.cpu_count() or 1 jobs = max(1, min(jobs, len(work))) total = len(work) started = {pkg.name: time.monotonic() for pkg, _ in work} if jobs == 1: # Serial path — preserve deterministic ordering for idx, (pkg, path) in enumerate(work, 1): name, res = _install_one(pkg, path) results[name] = res if on_progress is not None: on_progress( idx, total, name, res["status"], time.monotonic() - started[name] ) else: # Parallel path — pip install -e is mostly I/O (git, PyPI metadata) with ThreadPoolExecutor(max_workers=jobs) as ex: futures = { ex.submit(_install_one, pkg, path): pkg.name for pkg, path in work } for idx, fut in enumerate(as_completed(futures), 1): name, res = fut.result() results[name] = res if on_progress is not None: on_progress( idx, total, name, res["status"], time.monotonic() - started[name], ) return results
# EOF