Newer
Older
CNCTools / ReferenceSurfaceGenerator / backend / app / job_manager.py
import os
import uuid
import json
from typing import List, Optional
from .models import Job

JOBS_METADATA_DIR = "/app/data/jobs_metadata"
os.makedirs(JOBS_METADATA_DIR, exist_ok=True)

def _get_job_metadata_path(job_id: uuid.UUID) -> str:
    """
    Returns the filesystem path for a job's metadata file.
    """
    return os.path.join(JOBS_METADATA_DIR, f"{job_id}.json")

def save_job_metadata(job: Job):
    """
    Saves a Job object's metadata to a JSON file atomically.
    """
    path = _get_job_metadata_path(job.id)
    temp_path = f"{path}.tmp"
    with open(temp_path, "w") as f:
        json.dump(job.model_dump(mode='json'), f, indent=4)
    os.rename(temp_path, path)

def load_job_metadata(job_id: uuid.UUID) -> Optional[Job]:
    """
    Loads a Job object's metadata from a JSON file.
    """
    path = _get_job_metadata_path(job_id)
    if os.path.exists(path):
        try:
            with open(path, "r") as f:
                data = json.load(f)
                return Job(**data)
        except json.JSONDecodeError:
            print(f"Error: Corrupt job metadata file: {path}")
            os.remove(path)  # Clean up corrupt file
            return None
    return None

def load_all_job_metadata() -> List[Job]:
    """
    Loads metadata for all jobs from the jobs_metadata directory.
    """
    jobs = []
    for filename in os.listdir(JOBS_METADATA_DIR):
        if filename.endswith(".json"):
            job_id_str = filename.replace(".json", "")
            try:
                job_id = uuid.UUID(job_id_str)
                job = load_job_metadata(job_id)
                if job:
                    jobs.append(job)
            except ValueError:
                # Skip invalid filenames
                continue
    # Sort by timestamp, newest first
    jobs.sort(key=lambda j: j.timestamp, reverse=True)
    return jobs