import os
import time
import uuid
import json
import sys
import importlib
import asyncio
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.models import Job, JobStatus
from app.job_manager import save_job_metadata, load_job_metadata
JOB_QUEUE_DIR = "/app/data/job_queue"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"
def get_processing_function(feature_id: str):
"""
Dynamically imports the processing function for a given feature.
"""
try:
module = importlib.import_module(f"app.features.{feature_id}.processing")
return getattr(module, "process")
except (ImportError, AttributeError) as e:
print(f"[WORKER] Error importing processing function for feature '{feature_id}': {e}")
return None
async def process_job(job_id: uuid.UUID):
job = load_job_metadata(job_id)
if not job:
print(f"[WORKER] Job {job_id} not found in metadata, skipping.")
return
print(f"[WORKER] Starting processing for job {job.id} (Feature: {job.feature_id}, File: {job.filename})...")
job.status = JobStatus.PROCESSING
job.message = "Processing started by worker."
save_job_metadata(job)
processing_function = get_processing_function(job.feature_id)
if not processing_function:
job.status = JobStatus.FAILED
job.message = f"Could not find processing function for feature '{job.feature_id}'."
save_job_metadata(job)
return
try:
async for progress_update in processing_function(job):
job.status = JobStatus(progress_update.get("status", job.status.value).upper())
job.progress = progress_update.get("progress", job.progress)
job.message = progress_update.get("message", job.message)
save_job_metadata(job)
job.status = JobStatus.COMPLETE
job.progress = 100
job.message = "Processing complete!"
job.download_url = f"/api/download/{job.id}"
save_job_metadata(job)
print(f"[WORKER] Job {job.id} completed successfully.")
except Exception as e:
error_message = f"An error occurred during job {job.id} processing: {str(e)}"
print(f"[WORKER] ERROR: {error_message}")
job.status = JobStatus.FAILED
job.message = error_message
save_job_metadata(job)
finally:
trigger_file_path = os.path.join(JOB_QUEUE_DIR, f"{job.id}.trigger")
if os.path.exists(trigger_file_path):
os.remove(trigger_file_path)
print(f"[WORKER] Cleaned up trigger file for job {job.id}.")
async def main():
print(f"[WORKER] Worker started. Monitoring {JOB_QUEUE_DIR} for new jobs...")
while True:
for filename in os.listdir(JOB_QUEUE_DIR):
if filename.endswith(".trigger"):
job_id_str = filename.replace(".trigger", "")
try:
job_id = uuid.UUID(job_id_str)
print(f"[WORKER] Found new job trigger: {job_id}")
await process_job(job_id)
except ValueError:
print(f"[WORKER] Invalid trigger filename: {filename}, skipping.")
continue
await asyncio.sleep(1)
if __name__ == "__main__":
os.makedirs(JOB_QUEUE_DIR, exist_ok=True)
os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
asyncio.run(main())