import asyncio
import os
import uuid
import json
import datetime
from typing import Dict, List, Optional
from fastapi import FastAPI, File, UploadFile, WebSocket, WebSocketDisconnect, HTTPException, status
from fastapi.responses import FileResponse
from starlette.websockets import WebSocketState
from fastapi.middleware.cors import CORSMiddleware
# Import the core processing logic and data models
from .processing import create_layered_curves_dxf
from .models import Job, JobStatus
app = FastAPI()
# Allow all origins for simplicity, can be locked down in production
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create directories for uploads, outputs, and job metadata
UPLOAD_DIR = "/app/data/uploads"
OUTPUT_DIR = "/app/data/outputs"
JOBS_METADATA_DIR = "/app/data/jobs_metadata"
JOB_QUEUE_DIR = "/app/data/job_queue"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(JOBS_METADATA_DIR, exist_ok=True)
os.makedirs(JOB_QUEUE_DIR, exist_ok=True)
# --- Helper Functions for Job Metadata Persistence ---
def _get_job_metadata_path(job_id: uuid.UUID) -> str:
"""
Returns the filesystem path for a job's metadata file.
"""
return os.path.join(JOBS_METADATA_DIR, f"{job_id}.json")
def _save_job_metadata(job: Job):
"""
Saves a Job object's metadata to a JSON file.
"""
path = _get_job_metadata_path(job.id)
with open(path, "w") as f:
json.dump(job.model_dump(mode='json'), f, indent=4) # Using model_dump for Pydantic v2
def _load_job_metadata(job_id: uuid.UUID) -> Optional[Job]:
"""
Loads a Job object's metadata from a JSON file.
"""
path = _get_job_metadata_path(job_id)
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
return Job(**data)
except json.JSONDecodeError:
print(f"Error: Corrupt job metadata file: {path}")
os.remove(path) # Clean up corrupt file
return None
return None
def _load_all_job_metadata() -> List[Job]:
"""
Loads metadata for all jobs from the jobs_metadata directory.
"""
jobs = []
for filename in os.listdir(JOBS_METADATA_DIR):
if filename.endswith(".json"):
job_id_str = filename.replace(".json", "")
try:
job_id = uuid.UUID(job_id_str)
job = _load_job_metadata(job_id)
if job:
jobs.append(job)
except ValueError:
# Skip invalid filenames
continue
# Sort by timestamp, newest first
jobs.sort(key=lambda j: j.timestamp, reverse=True)
return jobs
@app.post("/upload/")
async def upload_mesh_file(file: UploadFile = File(...), num_layers: int = 20, num_points_per_layer: int = 30):
"""
Accepts a file upload and saves it to a temporary location.
Creates a new job and returns its ID.
"""
job_id = uuid.uuid4()
input_path = os.path.join(UPLOAD_DIR, f"{job_id}_{file.filename}")
output_path = os.path.join(OUTPUT_DIR, f"{job_id}_curves.dxf")
# Save the uploaded file
with open(input_path, "wb") as buffer:
buffer.write(await file.read())
# Create and save the initial job metadata
job = Job(
id=job_id,
filename=file.filename,
input_path=input_path,
output_path=output_path,
num_layers=num_layers,
num_points_per_layer=num_points_per_layer,
status=JobStatus.QUEUED, # Initial status is now QUEUED
message=f"File ''{file.filename}'' uploaded, job queued."
)
_save_job_metadata(job)
# Create a trigger file for the worker
with open(os.path.join(JOB_QUEUE_DIR, f"{job_id}.trigger"), "w") as f:
f.write(str(job_id))
return {"job_id": str(job.id), "filename": job.filename, "status": job.status.value}
async def track_job_progress(websocket: WebSocket, job_id: uuid.UUID):
"""
Monitors a job's metadata file and sends updates over a WebSocket.
"""
last_update_content = None
job_metadata_path = _get_job_metadata_path(job_id)
while websocket.client_state == WebSocketState.CONNECTED:
job = _load_job_metadata(job_id)
if not job:
await websocket.send_json({"status": "error", "message": "Job disappeared or was deleted."})
break
update_content = job.model_dump(mode='json')
if update_content != last_update_content:
await websocket.send_json(update_content)
last_update_content = update_content
# Stop tracking if the job is in a terminal state
if job.status in [JobStatus.COMPLETE, JobStatus.FAILED]:
break
await asyncio.sleep(0.5) # Check for updates every 500ms
@app.websocket("/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: uuid.UUID):
"""
Handles the WebSocket connection for processing the file and sending progress.
"""
await websocket.accept()
job = _load_job_metadata(job_id)
if not job:
await websocket.send_json({"status": "error", "message": "Job not found."})
await websocket.close()
return
try:
# Initial status send
await websocket.send_json(job.model_dump(mode='json'))
# Start tracking and sending real-time updates
await track_job_progress(websocket, job_id)
except WebSocketDisconnect:
print(f"Client disconnected from job {job_id}")
finally:
# The connection is automatically closed by Starlette when the endpoint function returns.
# No need to call websocket.close() manually, as it can lead to race conditions
# where both client and server try to close the connection simultaneously.
print(f"WebSocket connection handler finished for job {job_id}")
# The async_generator_wrapper is no longer needed as processing is fully offloaded
# to the worker process.
@app.get("/download/{filename}")
async def download_file(filename: str):
"""
Serves the generated DXF file for download.
"""
path = os.path.join(OUTPUT_DIR, filename)
if os.path.exists(path):
return FileResponse(path, media_type='application/vnd.dxf', filename=filename)
return {"error": "File not found"}
# --- New API Endpoints for Job Management ---
@app.get("/api/jobs", response_model=List[Job])
async def get_all_jobs():
"""
Retrieves a list of all processing jobs.
"""
return _load_all_job_metadata()
@app.get("/api/jobs/{job_id}", response_model=Job)
async def get_job_status(job_id: uuid.UUID):
"""
Retrieves the status and details for a specific job.
"""
job = _load_job_metadata(job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
return job
@app.delete("/api/jobs/{job_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_job(job_id: uuid.UUID):
"""
Deletes a specific job's metadata and associated output file.
"""
job = _load_job_metadata(job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# Delete input file if it exists
if job.input_path and os.path.exists(job.input_path):
os.remove(job.input_path)
# Delete output file if it exists
if job.output_path and os.path.exists(job.output_path):
os.remove(job.output_path)
# Delete metadata file
os.remove(_get_job_metadata_path(job_id))
return # 204 No Content