Newer
Older
CNCTools / ReferenceSurfaceGenerator / backend / app / worker.py
import os
import time
import uuid
import json
from typing import Optional
import sys
import datetime

# Add the app directory to the Python path to allow imports from .models and .processing
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from app.models import Job, JobStatus
from app.processing import create_layered_curves_dxf

# Define paths - ensure these match main.py
UPLOAD_DIR = "/app/data/uploads"
OUTPUT_DIR = "/app/data/outputs"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"
JOB_QUEUE_DIR = "/app/data/job_queue"

# Helper to save job metadata (duplicate from main.py for worker self-sufficiency)
def _save_job_metadata(job: Job):
    path = os.path.join(JOBS_METADATA_DIR, f"{job.id}.json")
    temp_path = f"{path}.tmp"
    with open(temp_path, "w") as f:
        # Use model_dump(mode='json') for Pydantic v2 to ensure correct serialization of types like UUID and enums
        json.dump(job.model_dump(mode='json'), f, indent=4)
    os.rename(temp_path, path)

# Helper to load job metadata (duplicate from main.py for worker self-sufficiency)
def _load_job_metadata(job_id: uuid.UUID) -> Optional[Job]:
    path = os.path.join(JOBS_METADATA_DIR, f"{job_id}.json")
    if os.path.exists(path):
        try:
            with open(path, "r") as f:
                data = json.load(f)
                return Job(**data)
        except json.JSONDecodeError:
            print(f"[WORKER] Error: Corrupt job metadata file: {path}")
            os.remove(path)
            return None
    return None

async def process_job(job_id: uuid.UUID):
    job = _load_job_metadata(job_id)
    if not job:
        print(f"[WORKER] Job {job_id} not found in metadata, skipping.")
        return

    print(f"[WORKER] Starting processing for job {job.id} (File: {job.filename})...")

    # Update job status to PROCESSING
    job.status = JobStatus.PROCESSING
    job.message = "Processing started by worker."
    _save_job_metadata(job)

    try:
        # Execute the processing function (which is a generator)
        for progress_update in create_layered_curves_dxf(
            job.input_path, 
            job.output_path, 
            num_layers=job.num_layers, 
            num_points_per_layer=job.num_points_per_layer
        ):
            job.status = JobStatus(progress_update["status"].upper())
            job.progress = progress_update["progress"]
            job.message = progress_update["message"]
            _save_job_metadata(job)
            # In a real system, you might also push this update to a message queue for websockets

        # Final update on completion
        job.status = JobStatus.COMPLETE
        job.progress = 100
        job.message = "Processing complete! DXF generated."
        job.download_url = f"/api/download/{os.path.basename(job.output_path)}"
        _save_job_metadata(job)
        # Final update on completion
        job.status = JobStatus.COMPLETE
        job.progress = 100
        job.message = "Processing complete! DXF generated."
        job.download_url = f"/api/download/{os.path.basename(job.output_path)}"
        _save_job_metadata(job)
        print(f"[WORKER] Job {job.id} completed successfully.")

    except Exception as e:
        error_message = f"An error occurred during job {job.id} processing: {str(e)}"
        print(f"[WORKER] ERROR: {error_message}")
        job.status = JobStatus.FAILED
        job.message = error_message
        _save_job_metadata(job)

    finally:
        # Clean up the trigger file from the queue
        trigger_file_path = os.path.join(JOB_QUEUE_DIR, f"{job.id}.trigger")
        if os.path.exists(trigger_file_path):
            os.remove(trigger_file_path)
            print(f"[WORKER] Cleaned up trigger file for job {job.id}.")
        
        # Clean up the input file (uploaded mesh)
        if os.path.exists(job.input_path):
            os.remove(job.input_path)
            print(f"[WORKER] Cleaned up input file for job {job.id}.")

async def main():
    print(f"[WORKER] Worker started. Monitoring {JOB_QUEUE_DIR} for new jobs...")
    while True:
        for filename in os.listdir(JOB_QUEUE_DIR):
            if filename.endswith(".trigger"):
                job_id_str = filename.replace(".trigger", "")
                try:
                    job_id = uuid.UUID(job_id_str)
                    print(f"[WORKER] Found new job trigger: {job_id}")
                    await process_job(job_id)
                except ValueError:
                    print(f"[WORKER] Invalid trigger filename: {filename}, skipping.")
                    continue
            
        time.sleep(1) # Check for new jobs every second

if __name__ == "__main__":
    # Ensure directories exist (they should be created by main.py on startup)
    os.makedirs(UPLOAD_DIR, exist_ok=True)
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
    os.makedirs(JOB_QUEUE_DIR, exist_ok=True)

    # Run the worker's main loop
    # Need to use asyncio.run to run an async main function
    import asyncio
    asyncio.run(main())