Newer
Older
CNCTools / ReferenceSurfaceGenerator / backend / app / worker.py
import os
import time
import uuid
import json
import sys
import importlib
import asyncio

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from app.models import Job, JobStatus
from app.job_manager import save_job_metadata, load_job_metadata

JOB_QUEUE_DIR = "/app/data/job_queue"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"

def get_processing_function(feature_id: str):
    """
    Dynamically imports the processing function for a given feature.
    """
    try:
        module = importlib.import_module(f"app.features.{feature_id}.processing")
        return getattr(module, "process")
    except (ImportError, AttributeError) as e:
        print(f"[WORKER] Error importing processing function for feature '{feature_id}': {e}")
        return None

async def process_job(job_id: uuid.UUID):
    job = load_job_metadata(job_id)
    if not job:
        print(f"[WORKER] Job {job_id} not found in metadata, skipping.")
        return

    print(f"[WORKER] Starting processing for job {job.id} (Feature: {job.feature_id}, File: {job.filename})...")

    job.status = JobStatus.PROCESSING
    job.message = "Processing started by worker."
    save_job_metadata(job)

    processing_function = get_processing_function(job.feature_id)
    if not processing_function:
        job.status = JobStatus.FAILED
        job.message = f"Could not find processing function for feature '{job.feature_id}'."
        save_job_metadata(job)
        return

    try:
        async for progress_update in processing_function(job):
            job.status = JobStatus(progress_update.get("status", job.status.value).upper())
            job.progress = progress_update.get("progress", job.progress)
            job.message = progress_update.get("message", job.message)
            save_job_metadata(job)

        job.status = JobStatus.COMPLETE
        job.progress = 100
        job.message = "Processing complete!"
        job.download_url = f"/api/download/{job.id}"
        save_job_metadata(job)
        print(f"[WORKER] Job {job.id} completed successfully.")

    except Exception as e:
        error_message = f"An error occurred during job {job.id} processing: {str(e)}"
        print(f"[WORKER] ERROR: {error_message}")
        job.status = JobStatus.FAILED
        job.message = error_message
        save_job_metadata(job)

    finally:
        trigger_file_path = os.path.join(JOB_QUEUE_DIR, f"{job.id}.trigger")
        if os.path.exists(trigger_file_path):
            os.remove(trigger_file_path)
            print(f"[WORKER] Cleaned up trigger file for job {job.id}.")

async def main():
    print(f"[WORKER] Worker started. Monitoring {JOB_QUEUE_DIR} for new jobs...")
    while True:
        for filename in os.listdir(JOB_QUEUE_DIR):
            if filename.endswith(".trigger"):
                job_id_str = filename.replace(".trigger", "")
                try:
                    job_id = uuid.UUID(job_id_str)
                    print(f"[WORKER] Found new job trigger: {job_id}")
                    await process_job(job_id)
                except ValueError:
                    print(f"[WORKER] Invalid trigger filename: {filename}, skipping.")
                    continue
        await asyncio.sleep(1)

if __name__ == "__main__":
    os.makedirs(JOB_QUEUE_DIR, exist_ok=True)
    os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
    asyncio.run(main())