Files
ToDoList/api/app/routers/tasks.py
祀梦 2979197b1c release: Elysia ToDo v1.0.0
鍏ㄦ爤涓汉淇℃伅绠$悊搴旂敤锛岄泦鎴愬緟鍔炰换鍔°€佷範鎯墦鍗°€佺邯蹇垫棩鎻愰啋銆佽祫浜ф€昏鍔熻兘銆

Made-with: Cursor
2026-03-14 22:21:26 +08:00

186 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import Optional, List
from app.database import get_db
from app.models import Task, Tag
from app.schemas import TaskCreate, TaskUpdate, TaskResponse
from app.schemas.common import DeleteResponse
from app.utils.crud import get_or_404
from app.utils.datetime import utcnow
from app.utils.logger import logger
router = APIRouter(prefix="/api/tasks", tags=["任务"])
@router.get("", response_model=List[TaskResponse])
def get_tasks(
status: Optional[str] = Query(None, description="筛选状态: all/in_progress/completed"),
category_id: Optional[int] = Query(None, description="分类ID"),
priority: Optional[str] = Query(None, description="优先级: q1/q2/q3/q4"),
sort_by: Optional[str] = Query("created_at", description="排序字段: created_at/priority/due_date"),
sort_order: Optional[str] = Query("desc", description="排序方向: asc/desc"),
db: Session = Depends(get_db)
) -> List[TaskResponse]:
"""获取任务列表(支持筛选和排序)"""
try:
query = db.query(Task)
# 状态筛选
if status == "in_progress":
query = query.filter(Task.is_completed == False)
elif status == "completed":
query = query.filter(Task.is_completed == True)
# 分类筛选
if category_id is not None:
query = query.filter(Task.category_id == category_id)
# 优先级筛选
if priority:
query = query.filter(Task.priority == priority)
# 排序
if sort_by == "priority":
order_col = Task.priority
elif sort_by == "due_date":
order_col = Task.due_date
else:
order_col = Task.created_at
if sort_order == "asc":
query = query.order_by(order_col.asc().nullslast())
else:
query = query.order_by(order_col.desc().nullslast())
tasks = query.all()
logger.info(f"获取任务列表成功,总数: {len(tasks)}")
return tasks
except Exception as e:
logger.error(f"获取任务列表失败: {str(e)}")
raise HTTPException(status_code=500, detail="获取任务列表失败")
@router.post("", response_model=TaskResponse, status_code=201)
def create_task(task_data: TaskCreate, db: Session = Depends(get_db)):
"""创建任务"""
try:
# 创建任务对象
db_task = Task(
title=task_data.title,
description=task_data.description,
priority=task_data.priority,
due_date=task_data.due_date,
category_id=task_data.category_id,
)
# 添加标签
if task_data.tag_ids:
tags = db.query(Tag).filter(Tag.id.in_(task_data.tag_ids)).all()
db_task.tags = tags
db.add(db_task)
db.commit()
db.refresh(db_task)
logger.info(f"创建任务成功: id={db_task.id}, title={db_task.title}")
return db_task
except Exception as e:
db.rollback()
logger.error(f"创建任务失败: {str(e)}")
raise HTTPException(status_code=500, detail="创建任务失败")
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(task_id: int, db: Session = Depends(get_db)):
"""获取单个任务"""
try:
task = get_or_404(db, Task, task_id, "任务")
return task
except HTTPException:
raise
except Exception as e:
logger.error(f"获取任务失败: {str(e)}")
raise HTTPException(status_code=500, detail="获取任务失败")
@router.put("/{task_id}", response_model=TaskResponse)
def update_task(task_id: int, task_data: TaskUpdate, db: Session = Depends(get_db)):
"""更新任务"""
try:
task = get_or_404(db, Task, task_id, "任务")
# exclude_unset=True 保证:前端没传的字段不会出现在 dict 中,不会意外清空
# 前端显式传了 null 的字段会出现在 dict 中,允许清空可空字段
update_data = task_data.model_dump(exclude_unset=True)
tag_ids = update_data.pop("tag_ids", None)
for field, value in update_data.items():
# 非 clearable 字段(如 title只有非 None 值才更新
# clearable 字段description, due_date, category_id允许设为 None
if value is not None or field in task_data.clearable_fields:
setattr(task, field, value)
# 更新标签
if tag_ids is not None:
tags = db.query(Tag).filter(Tag.id.in_(tag_ids)).all()
task.tags = tags
task.updated_at = utcnow()
db.commit()
db.refresh(task)
logger.info(f"更新任务成功: id={task_id}")
return task
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"更新任务失败: {str(e)}")
raise HTTPException(status_code=500, detail="更新任务失败")
@router.delete("/{task_id}")
def delete_task(task_id: int, db: Session = Depends(get_db)):
"""删除任务"""
try:
task = get_or_404(db, Task, task_id, "任务")
db.delete(task)
db.commit()
logger.info(f"删除任务成功: id={task_id}")
return DeleteResponse(message="任务删除成功")
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"删除任务失败: {str(e)}")
raise HTTPException(status_code=500, detail="删除任务失败")
@router.patch("/{task_id}/toggle", response_model=TaskResponse)
def toggle_task(task_id: int, db: Session = Depends(get_db)):
"""切换任务完成状态"""
try:
task = get_or_404(db, Task, task_id, "任务")
task.is_completed = not task.is_completed
task.updated_at = utcnow()
db.commit()
db.refresh(task)
logger.info(f"切换任务状态成功: id={task_id}, is_completed={task.is_completed}")
return task
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"切换任务状态失败: {str(e)}")
raise HTTPException(status_code=500, detail="切换任务状态失败")