import os
import time
import uuid
import json
from typing import Optional
import sys
import datetime
# Add the app directory to the Python path to allow imports from .models and .processing
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.models import Job, JobStatus
from app.processing import create_layered_curves_dxf
# Define paths - ensure these match main.py
UPLOAD_DIR = "/app/data/uploads"
OUTPUT_DIR = "/app/data/outputs"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"
JOB_QUEUE_DIR = "/app/data/job_queue"
# Helper to save job metadata (duplicate from main.py for worker self-sufficiency)
def _save_job_metadata(job: Job):
path = os.path.join(JOBS_METADATA_DIR, f"{job.id}.json")
with open(path, "w") as f:
# Use model_dump(mode='json') for Pydantic v2 to ensure correct serialization of types like UUID and enums
json.dump(job.model_dump(mode='json'), f, indent=4)
# Helper to load job metadata (duplicate from main.py for worker self-sufficiency)
def _load_job_metadata(job_id: uuid.UUID) -> Optional[Job]:
path = os.path.join(JOBS_METADATA_DIR, f"{job_id}.json")
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
return Job(**data)
except json.JSONDecodeError:
print(f"[WORKER] Error: Corrupt job metadata file: {path}")
os.remove(path)
return None
return None
async def process_job(job_id: uuid.UUID):
job = _load_job_metadata(job_id)
if not job:
print(f"[WORKER] Job {job_id} not found in metadata, skipping.")
return
print(f"[WORKER] Starting processing for job {job.id} (File: {job.filename})...")
# Update job status to PROCESSING
job.status = JobStatus.PROCESSING
job.message = "Processing started by worker."
_save_job_metadata(job)
try:
# Execute the processing function (which is a generator)
for progress_update in create_layered_curves_dxf(
job.input_path,
job.output_path,
num_layers=job.num_layers,
num_points_per_layer=job.num_points_per_layer
):
job.status = JobStatus(progress_update["status"].upper())
job.progress = progress_update["progress"]
job.message = progress_update["message"]
_save_job_metadata(job)
# In a real system, you might also push this update to a message queue for websockets
# Final update on completion
job.status = JobStatus.COMPLETE
job.progress = 100
job.message = "Processing complete! DXF generated."
job.download_url = f"/download/{os.path.basename(job.output_path)}"
_save_job_metadata(job)
print(f"[WORKER] Job {job.id} completed successfully.")
except Exception as e:
error_message = f"An error occurred during job {job.id} processing: {str(e)}"
print(f"[WORKER] ERROR: {error_message}")
job.status = JobStatus.FAILED
job.message = error_message
_save_job_metadata(job)
finally:
# Clean up the trigger file from the queue
trigger_file_path = os.path.join(JOB_QUEUE_DIR, f"{job.id}.trigger")
if os.path.exists(trigger_file_path):
os.remove(trigger_file_path)
print(f"[WORKER] Cleaned up trigger file for job {job.id}.")
# Clean up the input file (uploaded mesh)
if os.path.exists(job.input_path):
os.remove(job.input_path)
print(f"[WORKER] Cleaned up input file for job {job.id}.")
async def main():
print(f"[WORKER] Worker started. Monitoring {JOB_QUEUE_DIR} for new jobs...")
while True:
for filename in os.listdir(JOB_QUEUE_DIR):
if filename.endswith(".trigger"):
job_id_str = filename.replace(".trigger", "")
try:
job_id = uuid.UUID(job_id_str)
print(f"[WORKER] Found new job trigger: {job_id}")
await process_job(job_id)
except ValueError:
print(f"[WORKER] Invalid trigger filename: {filename}, skipping.")
continue
time.sleep(1) # Check for new jobs every second
if __name__ == "__main__":
# Ensure directories exist (they should be created by main.py on startup)
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
os.makedirs(JOB_QUEUE_DIR, exist_ok=True)
# Run the worker's main loop
# Need to use asyncio.run to run an async main function
import asyncio
asyncio.run(main())