#!/usr/bin/env -S uv run # /// script # dependencies = [ # "boto3", # "httpx", # "click", # "rich", # "lxml", # ] # requires-python = ">=3.13" # /// """AI Copyright Cases Manager - Unified tool for managing AI legal case archives. This tool provides a complete pipeline for: 1. Downloading AI legal cases from Court Listener 2. Creating and uploading zip archives to S3 3. Generating HTML indexes for browsing Usage Examples: # Download all cases uv run ai_cases_manager.py download --aws-profile personal-site # Create and upload archives uv run ai_cases_manager.py archive --aws-profile personal-site # Generate HTML index uv run ai_cases_manager.py index --aws-profile personal-site # Run complete pipeline uv run ai_cases_manager.py pipeline --aws-profile personal-site # Show current status uv run ai_cases_manager.py status # Test with limited cases uv run ai_cases_manager.py download --max-items 5 --dry-run """ from __future__ import annotations import json import logging import re import sys import zipfile from dataclasses import dataclass from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Optional import boto3 import click import httpx from botocore.exceptions import ClientError from lxml import html from rich.console import Console from rich.logging import RichHandler from rich.progress import ( Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, ) from rich.table import Table # Configure rich console console = Console() # Configure logging with rich logging.basicConfig( level=logging.INFO, format="%(message)s", handlers=[RichHandler(console=console, rich_tracebacks=True)], ) logger = logging.getLogger(__name__) # ============================================================================ # Data Models # ============================================================================ class ProcessingStatus(Enum): """Status of processing operations.""" SUCCESS = "success" PARTIAL = "partial" FAILED = "failed" SKIPPED = "skipped" PENDING = "pending" @dataclass class DocketInfo: """Complete information about a legal docket.""" docket_id: int case_name: str = "" court: str = "" pacer_case_id: str = "" date_filed: Optional[str] = None date_terminated: Optional[str] = None # S3 paths cl_s3_prefix: str = "" # Court Listener S3 prefix cl_s3_path: str = "" # Full Court Listener S3 path archive_s3_key: str = "" # Our archive S3 key # URLs cl_url: str = "" # Court Listener URL archive_url: str = "" # Our archive URL # File tracking files_found: int = 0 files_downloaded: int = 0 local_path: Optional[Path] = None archive_path: Optional[Path] = None archive_size_mb: float = 0.0 # Status tracking download_status: ProcessingStatus = ProcessingStatus.PENDING archive_status: ProcessingStatus = ProcessingStatus.PENDING error_message: Optional[str] = None def is_valid_for_s3(self) -> bool: """Check if docket has required info for S3 access.""" return bool(self.court and self.pacer_case_id) def generate_slug(self) -> str: """Generate URL-safe slug from case name.""" slug = re.sub(r"[^\w\s-]", "", self.case_name.lower()) slug = re.sub(r"[-\s]+", "-", slug) slug = slug.strip("-") if len(slug) > 100: slug = slug[:100].rsplit("-", 1)[0] return slug or f"case-{self.docket_id}" def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "docket_id": self.docket_id, "case_name": self.case_name, "court": self.court, "pacer_case_id": self.pacer_case_id, "date_filed": self.date_filed, "date_terminated": self.date_terminated, "cl_s3_path": self.cl_s3_path, "archive_s3_key": self.archive_s3_key, "archive_url": self.archive_url, "files_found": self.files_found, "files_downloaded": self.files_downloaded, "local_path": str(self.local_path) if self.local_path else None, "archive_path": str(self.archive_path) if self.archive_path else None, "archive_size_mb": self.archive_size_mb, "download_status": self.download_status.value, "archive_status": self.archive_status.value, "error_message": self.error_message, } @classmethod def from_dict(cls, data: dict[str, Any]) -> DocketInfo: """Create from dictionary.""" info = cls( docket_id=data["docket_id"], case_name=data.get("case_name", ""), court=data.get("court", ""), pacer_case_id=data.get("pacer_case_id", ""), date_filed=data.get("date_filed"), date_terminated=data.get("date_terminated"), cl_s3_prefix=data.get("cl_s3_prefix", ""), cl_s3_path=data.get("cl_s3_path", ""), archive_s3_key=data.get("archive_s3_key", ""), cl_url=data.get("cl_url", ""), archive_url=data.get("archive_url", ""), files_found=data.get("files_found", 0), files_downloaded=data.get("files_downloaded", 0), archive_size_mb=data.get("archive_size_mb", 0.0), ) if data.get("local_path"): info.local_path = Path(data["local_path"]) if data.get("archive_path"): info.archive_path = Path(data["archive_path"]) if data.get("download_status"): info.download_status = ProcessingStatus(data["download_status"]) if data.get("archive_status"): info.archive_status = ProcessingStatus(data["archive_status"]) info.error_message = data.get("error_message") return info @dataclass class Config: """Configuration for the AI cases manager.""" # AWS settings aws_profile: Optional[str] = None # S3 buckets cl_bucket: str = "com-courtlistener-storage" archive_bucket: str = "michaelbommarito.com" archive_prefix: str = "resources/ai-copyright/cases/" index_key: str = "resources/ai-copyright/cases/index.html" # Local paths download_dir: Path = Path("tmp/ai-cases") archive_dir: Path = Path("tmp/ai-cases-zips") state_file: Path = Path("ai_cases_state.json") # API settings user_agent: str = "michaelbommarito.com-bot " bots_law_url: str = "https://bots.law/little-cases/ai-cases-bot/" cl_api_base: str = "https://www.courtlistener.com/api/rest/v4" # Processing options dry_run: bool = False verbose: bool = False max_items: Optional[int] = None # ============================================================================ # Service Layer # ============================================================================ class WebScraper: """Scrapes AI case information from bots.law.""" def __init__(self, config: Config): self.config = config self.headers = {"User-Agent": config.user_agent} self.client = httpx.Client(headers=self.headers, timeout=30.0) def fetch_docket_ids(self) -> set[int]: """Fetch unique docket IDs from bots.law.""" with console.status("[bold green]Fetching cases from bots.law..."): response = self.client.get(self.config.bots_law_url) response.raise_for_status() tree = html.fromstring(response.text) links = tree.xpath( '//a[contains(@href, "courtlistener.com/docket/")]/@href' ) docket_ids: set[int] = set() for link in links: match = re.search(r"/docket/(\d+)/", link) if match: docket_ids.add(int(match.group(1))) logger.info(f"Found {len(docket_ids)} unique cases") return docket_ids def close(self): """Close HTTP client.""" self.client.close() class CourtListenerAPI: """Interface to Court Listener REST API.""" def __init__(self, config: Config): self.config = config self.headers = { "User-Agent": config.user_agent, "Accept": "application/json", } self.client = httpx.Client(headers=self.headers, timeout=30.0) def get_docket_info(self, docket_id: int) -> Optional[DocketInfo]: """Fetch docket information from Court Listener API.""" api_url = f"{self.config.cl_api_base}/dockets/{docket_id}/" try: response = self.client.get(api_url) if response.status_code != 200: logger.warning( f"API returned {response.status_code} for docket {docket_id}" ) return None data = response.json() # Extract court code court = "" court_url = data.get("court", "") if court_url: court_match = re.search(r"/courts/([^/]+)/", court_url) if court_match: court = court_match.group(1) pacer_case_id = data.get("pacer_case_id", "") # Build S3 paths cl_s3_prefix = "" cl_s3_path = "" if court and pacer_case_id: cl_s3_prefix = f"recap/gov.uscourts.{court}.{pacer_case_id}/" cl_s3_path = f"s3://{self.config.cl_bucket}/{cl_s3_prefix}" # Build archive paths slug = re.sub(r"[^\w\s-]", "", data.get("case_name", "").lower()) slug = re.sub(r"[-\s]+", "-", slug)[:100].strip("-") archive_filename = f"{slug}-docket-{docket_id}.zip" archive_s3_key = f"{self.config.archive_prefix}{archive_filename}" archive_url = f"https://s3.us-east-1.amazonaws.com/{self.config.archive_bucket}/{archive_s3_key}" return DocketInfo( docket_id=docket_id, case_name=data.get("case_name", ""), court=court, pacer_case_id=pacer_case_id, date_filed=data.get("date_filed"), date_terminated=data.get("date_terminated"), cl_s3_prefix=cl_s3_prefix, cl_s3_path=cl_s3_path, archive_s3_key=archive_s3_key, cl_url=api_url, archive_url=archive_url, ) except Exception as e: logger.error(f"Error fetching docket {docket_id}: {e}") return None def close(self): """Close HTTP client.""" self.client.close() class S3Service: """Handles all S3 operations.""" def __init__(self, config: Config): self.config = config if config.aws_profile: session = boto3.Session(profile_name=config.aws_profile) self.s3_client = session.client("s3") else: self.s3_client = boto3.client("s3") def download_docket_files( self, docket: DocketInfo, progress: Optional[Progress] = None ) -> DocketInfo: """Download all files for a docket from Court Listener S3.""" if not docket.is_valid_for_s3(): docket.download_status = ProcessingStatus.SKIPPED docket.error_message = "Missing court or PACER case ID" return docket # Create local directory local_dir = self.config.download_dir / str(docket.docket_id) local_dir.mkdir(parents=True, exist_ok=True) docket.local_path = local_dir if self.config.dry_run: logger.info(f"[DRY RUN] Would download from {docket.cl_s3_path}") docket.download_status = ProcessingStatus.SUCCESS return docket try: # List and download files paginator = self.s3_client.get_paginator("list_objects_v2") pages = paginator.paginate( Bucket=self.config.cl_bucket, Prefix=docket.cl_s3_prefix, ) files_found = 0 files_downloaded = 0 task_id = None if progress: task_id = progress.add_task( f"Downloading {docket.case_name[:30]}...", total=None ) for page in pages: if "Contents" not in page: continue for obj in page["Contents"]: key = obj["Key"] filename = key.split("/")[-1] if not filename: continue files_found += 1 local_path = local_dir / filename # Check if file exists with same size if local_path.exists(): local_size = local_path.stat().st_size s3_size = obj.get("Size", 0) if local_size == s3_size: continue try: self.s3_client.download_file( self.config.cl_bucket, key, str(local_path), ) files_downloaded += 1 if progress and task_id is not None: progress.update( task_id, description=f"Downloaded {files_downloaded} files...", ) except Exception as e: logger.warning(f"Failed to download {key}: {e}") if progress and task_id is not None: progress.remove_task(task_id) docket.files_found = files_found docket.files_downloaded = files_downloaded if files_found == 0: docket.download_status = ProcessingStatus.FAILED docket.error_message = "No files found in S3" elif files_downloaded == files_found: docket.download_status = ProcessingStatus.SUCCESS else: docket.download_status = ProcessingStatus.PARTIAL docket.error_message = ( f"Downloaded {files_downloaded} of {files_found} files" ) except ClientError as e: docket.download_status = ProcessingStatus.FAILED docket.error_message = f"S3 error: {e}" logger.error(f"S3 error for docket {docket.docket_id}: {e}") except Exception as e: docket.download_status = ProcessingStatus.FAILED docket.error_message = f"Unexpected error: {e}" logger.error(f"Error downloading docket {docket.docket_id}: {e}") return docket def upload_archive(self, docket: DocketInfo) -> bool: """Upload archive to S3 with public read access.""" if not docket.archive_path or not docket.archive_path.exists(): logger.error(f"Archive not found: {docket.archive_path}") return False if self.config.dry_run: logger.info(f"[DRY RUN] Would upload to {docket.archive_url}") return True try: self.s3_client.upload_file( str(docket.archive_path), self.config.archive_bucket, docket.archive_s3_key, ExtraArgs={"ACL": "public-read"}, ) logger.info(f"Uploaded to {docket.archive_url}") return True except ClientError as e: logger.error(f"Failed to upload archive: {e}") return False def list_archives(self) -> list[dict[str, Any]]: """List all archive files in S3.""" archives = [] try: paginator = self.s3_client.get_paginator("list_objects_v2") pages = paginator.paginate( Bucket=self.config.archive_bucket, Prefix=self.config.archive_prefix, ) for page in pages: if "Contents" not in page: continue for obj in page["Contents"]: if obj["Key"].endswith(".zip"): archives.append( { "key": obj["Key"], "size": obj["Size"], "last_modified": obj["LastModified"], } ) except ClientError as e: logger.error(f"Error listing archives: {e}") return archives def upload_index(self, html_content: str) -> bool: """Upload HTML index to S3.""" if self.config.dry_run: logger.info( f"[DRY RUN] Would upload index to s3://{self.config.archive_bucket}/{self.config.index_key}" ) return True try: self.s3_client.put_object( Bucket=self.config.archive_bucket, Key=self.config.index_key, Body=html_content.encode("utf-8"), ContentType="text/html", ACL="public-read", ) index_url = f"https://s3.us-east-1.amazonaws.com/{self.config.archive_bucket}/{self.config.index_key}" logger.info(f"Index uploaded to: {index_url}") return True except ClientError as e: logger.error(f"Failed to upload index: {e}") return False class ArchiveService: """Creates and manages zip archives.""" def __init__(self, config: Config): self.config = config self.config.archive_dir.mkdir(parents=True, exist_ok=True) def create_archive( self, docket: DocketInfo, progress: Optional[Progress] = None ) -> bool: """Create zip archive for a docket.""" if not docket.local_path or not docket.local_path.exists(): logger.warning(f"Local files not found for docket {docket.docket_id}") docket.archive_status = ProcessingStatus.FAILED docket.error_message = "Local files not found" return False # Generate archive filename slug = docket.generate_slug() archive_filename = f"{slug}-docket-{docket.docket_id}.zip" archive_path = self.config.archive_dir / archive_filename docket.archive_path = archive_path if self.config.dry_run: logger.info(f"[DRY RUN] Would create archive: {archive_filename}") docket.archive_status = ProcessingStatus.SUCCESS docket.archive_size_mb = 100.0 # Fake size for dry run return True try: task_id = None if progress: task_id = progress.add_task( f"Archiving {docket.case_name[:30]}...", total=None ) with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf: files = list(docket.local_path.rglob("*")) for i, file_path in enumerate(files): if file_path.is_file(): arc_name = file_path.relative_to(docket.local_path.parent) zf.write(file_path, arc_name) if progress and task_id is not None and i % 10 == 0: progress.update( task_id, description=f"Archived {i + 1}/{len(files)} files...", ) if progress and task_id is not None: progress.remove_task(task_id) # Update size docket.archive_size_mb = archive_path.stat().st_size / (1024 * 1024) docket.archive_status = ProcessingStatus.SUCCESS logger.info( f"Created archive: {archive_filename} ({docket.archive_size_mb:.1f} MB)" ) return True except Exception as e: logger.error(f"Failed to create archive: {e}") docket.archive_status = ProcessingStatus.FAILED docket.error_message = f"Archive creation failed: {e}" if archive_path.exists(): archive_path.unlink() return False class IndexGenerator: """Generates HTML index of archives.""" def __init__(self, config: Config): self.config = config def generate_html( self, dockets: list[DocketInfo], archives: list[dict[str, Any]] ) -> str: """Generate HTML index page.""" # Match archives with dockets archive_map = {} for archive in archives: match = re.search(r"docket-(\d+)\.zip", archive["key"]) if match: docket_id = int(match.group(1)) archive_map[docket_id] = archive # Calculate stats total_cases = len(archives) total_size_gb = sum(a["size"] for a in archives) / (1024**3) total_files = sum(d.files_found for d in dockets if d.docket_id in archive_map) html = f""" AI Copyright Cases Archive

AI Copyright Cases Archive

Court documents from AI-related copyright litigation

Filtering results...

Total Cases
{total_cases}
Total Files
{total_files:,}
Total Size
{total_size_gb:.1f} GB
Last Updated
{datetime.now().strftime("%b %d, %Y")}
""" # Add case cards for docket in sorted(dockets, key=lambda d: d.case_name): if docket.docket_id in archive_map: archive = archive_map[docket.docket_id] size_mb = archive["size"] / (1024 * 1024) html += f"""

{docket.case_name}

Court: {docket.court.upper()} Docket: {docket.docket_id}
{docket.files_found} files {size_mb:.1f} MB
Updated: {archive["last_modified"].strftime("%b %d, %Y")}
""" html += """
""" return html class StateManager: """Manages persistent state for processing.""" def __init__(self, config: Config): self.config = config self.dockets: dict[int, DocketInfo] = {} def load(self) -> None: """Load state from JSON file.""" if self.config.state_file.exists(): with open(self.config.state_file, "r") as f: data = json.load(f) for item in data: docket = DocketInfo.from_dict(item) self.dockets[docket.docket_id] = docket logger.info(f"Loaded state for {len(self.dockets)} dockets") def save(self) -> None: """Save state to JSON file.""" data = [docket.to_dict() for docket in self.dockets.values()] with open(self.config.state_file, "w") as f: json.dump(data, f, indent=2, default=str) logger.info(f"Saved state for {len(self.dockets)} dockets") def add_docket(self, docket: DocketInfo) -> None: """Add or update a docket.""" self.dockets[docket.docket_id] = docket def get_pending_downloads(self) -> list[DocketInfo]: """Get dockets that need downloading.""" return [ d for d in self.dockets.values() if d.download_status in [ProcessingStatus.PENDING, ProcessingStatus.FAILED] ] def get_pending_archives(self) -> list[DocketInfo]: """Get dockets that need archiving.""" return [ d for d in self.dockets.values() if d.download_status == ProcessingStatus.SUCCESS and d.archive_status in [ProcessingStatus.PENDING, ProcessingStatus.FAILED] ] # ============================================================================ # CLI Commands # ============================================================================ @click.group() @click.option("--aws-profile", help="AWS profile to use") @click.option( "--dry-run", is_flag=True, help="Simulate operations without making changes" ) @click.option("--verbose", is_flag=True, help="Enable verbose logging") @click.option( "--state-file", type=Path, default="ai_cases_state.json", help="State file path" ) @click.pass_context def cli(ctx, aws_profile, dry_run, verbose, state_file): """AI Copyright Cases Manager - Unified tool for managing AI legal case archives.""" config = Config( aws_profile=aws_profile, dry_run=dry_run, verbose=verbose, state_file=state_file, ) if verbose: logging.getLogger().setLevel(logging.DEBUG) ctx.obj = config @cli.command() @click.option("--max-items", type=int, help="Maximum number of items to process") @click.option("--force", is_flag=True, help="Re-download even if files exist") @click.pass_obj def download(config: Config, max_items: Optional[int], force: bool): """Download AI legal cases from Court Listener.""" config.max_items = max_items # Initialize services state = StateManager(config) state.load() scraper = WebScraper(config) cl_api = CourtListenerAPI(config) s3 = S3Service(config) try: # Fetch docket IDs docket_ids = scraper.fetch_docket_ids() if config.max_items: docket_ids = set(list(docket_ids)[: config.max_items]) logger.info(f"Limited to {config.max_items} dockets") # Process each docket with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console, ) as progress: main_task = progress.add_task( f"Processing {len(docket_ids)} dockets...", total=len(docket_ids) ) for docket_id in sorted(docket_ids): # Check if already processed if docket_id in state.dockets and not force: docket = state.dockets[docket_id] if docket.download_status == ProcessingStatus.SUCCESS: progress.update(main_task, advance=1) continue # Get docket info progress.update( main_task, description=f"Fetching info for docket {docket_id}..." ) docket_info = cl_api.get_docket_info(docket_id) if not docket_info: logger.warning(f"Failed to get info for docket {docket_id}") progress.update(main_task, advance=1) continue # Download files progress.update( main_task, description=f"Downloading {docket_info.case_name[:30]}...", ) docket_info = s3.download_docket_files(docket_info, progress) # Save state state.add_docket(docket_info) state.save() progress.update(main_task, advance=1) # Print summary _print_download_summary(state.dockets.values()) finally: scraper.close() cl_api.close() @cli.command() @click.option("--max-items", type=int, help="Maximum number of items to process") @click.pass_obj def archive(config: Config, max_items: Optional[int]): """Create and upload zip archives to S3.""" config.max_items = max_items # Initialize services state = StateManager(config) state.load() archiver = ArchiveService(config) s3 = S3Service(config) # Get dockets to archive pending = state.get_pending_archives() if not pending: logger.info("No dockets to archive") return if config.max_items: pending = pending[: config.max_items] logger.info(f"Found {len(pending)} dockets to archive") # Process archives with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console, ) as progress: main_task = progress.add_task( f"Archiving {len(pending)} dockets...", total=len(pending) ) for docket in pending: progress.update( main_task, description=f"Processing {docket.case_name[:30]}..." ) # Create archive if archiver.create_archive(docket, progress): # Upload to S3 if s3.upload_archive(docket): docket.archive_status = ProcessingStatus.SUCCESS else: docket.archive_status = ProcessingStatus.FAILED # Save state state.add_docket(docket) state.save() progress.update(main_task, advance=1) # Print summary _print_archive_summary(pending) @cli.command() @click.pass_obj def index(config: Config): """Generate and upload HTML index.""" # Initialize services state = StateManager(config) state.load() s3 = S3Service(config) generator = IndexGenerator(config) with console.status("[bold green]Generating index..."): # List archives in S3 archives = s3.list_archives() logger.info(f"Found {len(archives)} archives in S3") # Generate HTML html_content = generator.generate_html(list(state.dockets.values()), archives) # Save locally index_path = Path("index.html") with open(index_path, "w") as f: f.write(html_content) logger.info(f"Saved index to {index_path}") # Upload to S3 if s3.upload_index(html_content): logger.info("Index uploaded successfully") else: logger.error("Failed to upload index") @cli.command() @click.option("--max-items", type=int, help="Maximum number of items to process") @click.pass_obj def pipeline(config: Config, max_items: Optional[int]): """Run complete pipeline: download, archive, and index.""" config.max_items = max_items console.print("[bold green]Starting complete pipeline...") # Run download ctx = click.get_current_context() ctx.invoke(download, max_items=max_items, force=False) # Run archive ctx.invoke(archive, max_items=max_items) # Run index ctx.invoke(index) console.print("[bold green]Pipeline complete!") @cli.command() @click.pass_obj def status(config: Config): """Show current processing status.""" state = StateManager(config) state.load() if not state.dockets: console.print("No dockets found. Run 'download' first.") return # Create status table table = Table(title="AI Cases Processing Status") table.add_column("Status", style="cyan") table.add_column("Download", justify="right") table.add_column("Archive", justify="right") # Count statuses download_counts = {} archive_counts = {} for docket in state.dockets.values(): download_counts[docket.download_status] = ( download_counts.get(docket.download_status, 0) + 1 ) archive_counts[docket.archive_status] = ( archive_counts.get(docket.archive_status, 0) + 1 ) # Add rows for status in ProcessingStatus: table.add_row( status.value.title(), str(download_counts.get(status, 0)), str(archive_counts.get(status, 0)), ) table.add_row( "[bold]Total", f"[bold]{len(state.dockets)}", f"[bold]{len(state.dockets)}", ) console.print(table) # Calculate storage total_downloaded = sum( d.files_downloaded for d in state.dockets.values() if d.download_status == ProcessingStatus.SUCCESS ) total_archived_mb = sum( d.archive_size_mb for d in state.dockets.values() if d.archive_status == ProcessingStatus.SUCCESS ) console.print("\n[bold]Storage Summary:") console.print(f" Files downloaded: {total_downloaded:,}") console.print(f" Archives created: {total_archived_mb:.1f} MB") # ============================================================================ # Helper Functions # ============================================================================ def _print_download_summary(dockets): """Print download summary table.""" table = Table(title="Download Summary") table.add_column("Status", style="cyan") table.add_column("Count", justify="right") status_counts = {} for docket in dockets: status_counts[docket.download_status] = ( status_counts.get(docket.download_status, 0) + 1 ) for status, count in status_counts.items(): table.add_row(status.value.title(), str(count)) total_files = sum(d.files_downloaded for d in dockets) table.add_row("[bold]Total Files", f"[bold]{total_files}") console.print(table) def _print_archive_summary(dockets): """Print archive summary table.""" table = Table(title="Archive Summary") table.add_column("Case", style="cyan") table.add_column("Status", style="green") table.add_column("Size (MB)", justify="right") for docket in dockets[:10]: # Show first 10 table.add_row( docket.case_name[:40], docket.archive_status.value, f"{docket.archive_size_mb:.1f}" if docket.archive_size_mb else "-", ) if len(dockets) > 10: table.add_row("...", "...", "...") total_mb = sum( d.archive_size_mb for d in dockets if d.archive_status == ProcessingStatus.SUCCESS ) table.add_row("[bold]Total", "", f"[bold]{total_mb:.1f}") console.print(table) # ============================================================================ # Main Entry Point # ============================================================================ if __name__ == "__main__": try: cli() except KeyboardInterrupt: console.print("\n[yellow]Interrupted by user[/yellow]") sys.exit(1) except Exception as e: console.print(f"[red]Fatal error: {e}[/red]") if logger.level == logging.DEBUG: console.print_exception() sys.exit(1)