import asyncio
import os
import uuid
import json
import datetime
from typing import Dict, List, Optional
from fastapi import FastAPI, File, UploadFile, WebSocket, WebSocketDisconnect, HTTPException, status, Form
from fastapi.responses import FileResponse
from starlette.websockets import WebSocketState
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import ezdxf
# Import the core processing logic and data models
from .dxf_parser import parse_dxf_for_viewing
from .models import Job, JobStatus
app = FastAPI()
# Allow all origins for simplicity, can be locked down in production
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create directories for uploads, outputs, and job metadata
UPLOAD_DIR = "/app/data/uploads"
OUTPUT_DIR = "/app/data/outputs"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"
JOB_QUEUE_DIR = "/app/data/job_queue"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
os.makedirs(JOB_QUEUE_DIR, exist_ok=True)
# --- Helper Functions for Job Metadata Persistence ---
def _get_job_metadata_path(job_id: uuid.UUID) -> str:
"""
Returns the filesystem path for a job's metadata file.
"""
return os.path.join(JOBS_METADATA_DIR, f"{job_id}.json")
def _save_job_metadata(job: Job):
"""
Saves a Job object's metadata to a JSON file atomically.
"""
path = _get_job_metadata_path(job.id)
temp_path = f"{path}.tmp"
with open(temp_path, "w") as f:
# Using model_dump for Pydantic v2
json.dump(job.model_dump(mode='json'), f, indent=4)
os.rename(temp_path, path)
def _load_job_metadata(job_id: uuid.UUID) -> Optional[Job]:
"""
Loads a Job object's metadata from a JSON file.
"""
path = _get_job_metadata_path(job_id)
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
return Job(**data)
except json.JSONDecodeError:
print(f"Error: Corrupt job metadata file: {path}")
os.remove(path) # Clean up corrupt file
return None
return None
def _load_all_job_metadata() -> List[Job]:
"""
Loads metadata for all jobs from the jobs_metadata directory.
"""
jobs = []
for filename in os.listdir(JOBS_METADATA_DIR):
if filename.endswith(".json"):
job_id_str = filename.replace(".json", "")
try:
job_id = uuid.UUID(job_id_str)
job = _load_job_metadata(job_id)
if job:
jobs.append(job)
except ValueError:
# Skip invalid filenames
continue
# Sort by timestamp, newest first
jobs.sort(key=lambda j: j.timestamp, reverse=True)
return jobs
@app.post("/upload/")
async def upload_mesh_file(
file: UploadFile = File(...),
num_layers: int = Form(20),
num_points_per_layer: int = Form(30)
):
"""
Accepts a file upload and saves it to a temporary location.
Creates a new job and returns its ID.
"""
job_id = uuid.uuid4()
input_path = os.path.join(UPLOAD_DIR, f"{job_id}_{file.filename}")
output_path = os.path.join(OUTPUT_DIR, f"{job_id}_curves.dxf")
# Save the uploaded file
with open(input_path, "wb") as buffer:
buffer.write(await file.read())
# Create and save the initial job metadata
job = Job(
id=job_id,
filename=file.filename,
input_path=input_path,
output_path=output_path,
num_layers=num_layers,
num_points_per_layer=num_points_per_layer,
status=JobStatus.QUEUED, # Initial status is now QUEUED
message=f"File ''{file.filename}'' uploaded, job queued."
)
_save_job_metadata(job)
# Create a trigger file for the worker
with open(os.path.join(JOB_QUEUE_DIR, f"{job_id}.trigger"), "w") as f:
f.write(str(job_id))
return {"job_id": str(job.id), "filename": job.filename, "status": job.status.value}
async def track_job_progress(websocket: WebSocket, initial_job: Job):
"""
Monitors a job's metadata file and sends updates over a WebSocket.
Uses the initially provided job object for the first status check.
"""
last_update_content = initial_job.model_dump(mode='json')
job_id = initial_job.id
while websocket.client_state == WebSocketState.CONNECTED:
job = _load_job_metadata(job_id)
if not job:
await websocket.send_json({"status": "error", "message": "Job disappeared or was deleted."})
break
update_content = job.model_dump(mode='json')
if update_content != last_update_content:
await websocket.send_json(update_content)
last_update_content = update_content
# Stop tracking if the job is in a terminal state
if job.status in [JobStatus.COMPLETE, JobStatus.FAILED]:
break
await asyncio.sleep(0.5) # Check for updates every 500ms
@app.websocket("/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: uuid.UUID):
"""
Handles the WebSocket connection for processing the file and sending progress.
"""
await websocket.accept()
job = _load_job_metadata(job_id)
if not job:
await websocket.send_json({"status": "error", "message": "Job not found."})
await websocket.close()
return
try:
# Send the initial status and then start tracking for updates
await websocket.send_json(job.model_dump(mode='json'))
await track_job_progress(websocket, job)
except WebSocketDisconnect:
print(f"Client disconnected from job {job_id}")
finally:
# The connection is automatically closed by Starlette when the endpoint function returns.
# No need to call websocket.close() manually, as it can lead to race conditions
# where both client and server try to close the connection simultaneously.
print(f"WebSocket connection handler finished for job {job_id}")
# The async_generator_wrapper is no longer needed as processing is fully offloaded
# to the worker process.
@app.get("/api/download/{filename}")
async def download_file(filename: str):
"""
Serves the generated DXF file for download.
"""
path = os.path.join(OUTPUT_DIR, filename)
if os.path.exists(path):
return FileResponse(path, media_type='application/vnd.dxf', filename=filename)
return {"error": "File not found"}
# --- New API Endpoints for Job Management ---
@app.get("/api/jobs", response_model=List[Job])
async def get_all_jobs():
"""
Retrieves a list of all processing jobs.
"""
return _load_all_job_metadata()
@app.get("/api/jobs/{job_id}", response_model=Job)
async def get_job_status(job_id: uuid.UUID):
"""
Retrieves the status and details for a specific job.
"""
job = _load_job_metadata(job_id)
if not job:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
return job
@app.delete("/api/jobs/{job_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_job(job_id: uuid.UUID):
"""
Deletes a specific job's metadata and associated output file.
"""
job = _load_job_metadata(job_id)
if not job:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# Delete input file if it exists
if job.input_path and os.path.exists(job.input_path):
os.remove(job.input_path)
# Delete output file if it exists
if job.output_path and os.path.exists(job.output_path):
os.remove(job.output_path)
# Delete metadata file
os.remove(_get_job_metadata_path(job_id))
return # 24 No Content
@app.get("/api/jobs/{job_id}/view")
async def get_job_output_for_viewing(job_id: uuid.UUID):
"""
Retrieves the geometric data from a job's output DXF file in a web-friendly JSON format.
"""
job = _load_job_metadata(job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
if not job.output_path or not os.path.exists(job.output_path):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Output file not found for this job")
try:
data = parse_dxf_for_viewing(job.output_path)
return data
except IOError:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not read the DXF file.")
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"An unexpected error occurred while parsing the DXF file: {e}")
# Mount the static directory to serve the frontend if it exists
# This is necessary because in local dev, the frontend is served by `npm start`
# and the `/app/static` directory (from the Docker build) won't exist.
if os.path.isdir("/app/static"):
app.mount("/", StaticFiles(directory="/app/static", html=True), name="static")