Newer
Older
CNCTools / ReferenceSurfaceGenerator / backend / app / main.py
import asyncio
import os
import uuid
import json
import datetime
from typing import Dict, List, Optional

from fastapi import FastAPI, File, UploadFile, WebSocket, WebSocketDisconnect, HTTPException, status
from fastapi.responses import FileResponse
from starlette.websockets import WebSocketState

from fastapi.middleware.cors import CORSMiddleware

# Import the core processing logic and data models
from .processing import create_layered_curves_dxf
from .models import Job, JobStatus

app = FastAPI()

# Allow all origins for simplicity, can be locked down in production
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Create directories for uploads, outputs, and job metadata
UPLOAD_DIR = "/app/data/uploads"
OUTPUT_DIR = "/app/data/outputs"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"
JOB_QUEUE_DIR = "/app/data/job_queue"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
os.makedirs(JOB_QUEUE_DIR, exist_ok=True)

# --- Helper Functions for Job Metadata Persistence ---

def _get_job_metadata_path(job_id: uuid.UUID) -> str:
    """
    Returns the filesystem path for a job's metadata file.
    """
    return os.path.join(JOBS_METADATA_DIR, f"{job_id}.json")

def _save_job_metadata(job: Job):
    """
    Saves a Job object's metadata to a JSON file.
    """
    path = _get_job_metadata_path(job.id)
    with open(path, "w") as f:
        json.dump(job.model_dump(mode='json'), f, indent=4) # Using model_dump for Pydantic v2

def _load_job_metadata(job_id: uuid.UUID) -> Optional[Job]:
    """
    Loads a Job object's metadata from a JSON file.
    """
    path = _get_job_metadata_path(job_id)
    if os.path.exists(path):
        try:
            with open(path, "r") as f:
                data = json.load(f)
                return Job(**data)
        except json.JSONDecodeError:
            print(f"Error: Corrupt job metadata file: {path}")
            os.remove(path) # Clean up corrupt file
            return None
    return None

def _load_all_job_metadata() -> List[Job]:
    """
    Loads metadata for all jobs from the jobs_metadata directory.
    """
    jobs = []
    for filename in os.listdir(JOBS_METADATA_DIR):
        if filename.endswith(".json"):
            job_id_str = filename.replace(".json", "")
            try:
                job_id = uuid.UUID(job_id_str)
                job = _load_job_metadata(job_id)
                if job:
                    jobs.append(job)
            except ValueError:
                # Skip invalid filenames
                continue
    # Sort by timestamp, newest first
    jobs.sort(key=lambda j: j.timestamp, reverse=True)
    return jobs


@app.post("/upload/")
async def upload_mesh_file(file: UploadFile = File(...), num_layers: int = 20, num_points_per_layer: int = 30):
    """
    Accepts a file upload and saves it to a temporary location.
    Creates a new job and returns its ID.
    """
    job_id = uuid.uuid4()
    input_path = os.path.join(UPLOAD_DIR, f"{job_id}_{file.filename}")
    output_path = os.path.join(OUTPUT_DIR, f"{job_id}_curves.dxf")

    # Save the uploaded file
    with open(input_path, "wb") as buffer:
        buffer.write(await file.read())

    # Create and save the initial job metadata
    job = Job(
        id=job_id,
        filename=file.filename,
        input_path=input_path,
        output_path=output_path,
        num_layers=num_layers,
        num_points_per_layer=num_points_per_layer,
        status=JobStatus.QUEUED, # Initial status is now QUEUED
        message=f"File ''{file.filename}'' uploaded, job queued."
    )
    _save_job_metadata(job)

    # Create a trigger file for the worker
    with open(os.path.join(JOB_QUEUE_DIR, f"{job_id}.trigger"), "w") as f:
        f.write(str(job_id))

    return {"job_id": str(job.id), "filename": job.filename, "status": job.status.value}


async def track_job_progress(websocket: WebSocket, job_id: uuid.UUID):
    """
    Monitors a job's metadata file and sends updates over a WebSocket.
    """
    last_update_content = None
    job_metadata_path = _get_job_metadata_path(job_id)

    while websocket.client_state == WebSocketState.CONNECTED:
        job = _load_job_metadata(job_id)
        if not job:
            await websocket.send_json({"status": "error", "message": "Job disappeared or was deleted."})
            break

        update_content = job.model_dump(mode='json')
        if update_content != last_update_content:
            await websocket.send_json(update_content)
            last_update_content = update_content
        
        # Stop tracking if the job is in a terminal state
        if job.status in [JobStatus.COMPLETE, JobStatus.FAILED]:
            break

        await asyncio.sleep(0.5) # Check for updates every 500ms


@app.websocket("/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: uuid.UUID):
    """
    Handles the WebSocket connection for processing the file and sending progress.
    """
    await websocket.accept()
    
    job = _load_job_metadata(job_id)
    if not job:
        await websocket.send_json({"status": "error", "message": "Job not found."})
        await websocket.close()
        return

    try:
        # Initial status send
        await websocket.send_json(job.model_dump(mode='json'))
        # Start tracking and sending real-time updates
        await track_job_progress(websocket, job_id)
    except WebSocketDisconnect:
        print(f"Client disconnected from job {job_id}")
    finally:
        # The connection is automatically closed by Starlette when the endpoint function returns.
        # No need to call websocket.close() manually, as it can lead to race conditions
        # where both client and server try to close the connection simultaneously.
        print(f"WebSocket connection handler finished for job {job_id}")

# The async_generator_wrapper is no longer needed as processing is fully offloaded
# to the worker process.



@app.get("/download/{filename}")
async def download_file(filename: str):
    """
    Serves the generated DXF file for download.
    """
    path = os.path.join(OUTPUT_DIR, filename)
    if os.path.exists(path):
        return FileResponse(path, media_type='application/vnd.dxf', filename=filename)
    return {"error": "File not found"}


# --- New API Endpoints for Job Management ---

@app.get("/api/jobs", response_model=List[Job])
async def get_all_jobs():
    """
    Retrieves a list of all processing jobs.
    """
    return _load_all_job_metadata()

@app.get("/api/jobs/{job_id}", response_model=Job)
async def get_job_status(job_id: uuid.UUID):
    """
    Retrieves the status and details for a specific job.
    """
    job = _load_job_metadata(job_id)
    if not job:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
    return job

@app.delete("/api/jobs/{job_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_job(job_id: uuid.UUID):
    """
    Deletes a specific job's metadata and associated output file.
    """
    job = _load_job_metadata(job_id)
    if not job:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")

    # Delete input file if it exists
    if job.input_path and os.path.exists(job.input_path):
        os.remove(job.input_path)
    # Delete output file if it exists
    if job.output_path and os.path.exists(job.output_path):
        os.remove(job.output_path)
    # Delete metadata file
    os.remove(_get_job_metadata_path(job_id))
    return # 204 No Content