name: str | None = None,
limit: int | None = None
) -> typing.List[reports.Report]:
- return [report async for report in list.get_reports(open=open, name=name, limit=limit)]
+ reports = backend.reports.get(list=list, open=open, name=name, limit=limit)
+
+ return [report async for report in reports]
@router.get("/{list}/domains/{name}")
async def get_list_domains(
return report
@router.get("")
-async def get_reports() -> typing.List[reports.Report]:
- return [l async for l in backend.reports]
+async def get_reports(
+ list: str | None = None,
+ open: bool | None = None,
+ name: str | None = None,
+ limit: int | None = None,
+) -> typing.List[reports.Report]:
+ # Fetch the list
+ list = await backend.lists.get_by_slug(list)
+ if not list:
+ raise fastapi.HTTPException(400, "Could not find list '%s'" % list)
+
+ # Fetch all reports
+ reports = backend.reports.get(
+ list = list,
+ open = open,
+ name = name,
+ limit = limit,
+ )
+
+ # Return all matched reports as list
+ return [report async for report in reports]
@router.post("")
async def create_report(
# Reports
reports : typing.List["Report"] = sqlmodel.Relationship(back_populates="list")
- def get_reports(self, open=None, name=None, reported_by=None, limit=None):
- """
- Fetches the most recent reports
- """
- # Only use the UID of any users
- if isinstance(reported_by, users.User):
- reported_by = reported_by.uid
-
- stmt = (
- sqlmodel
- .select(
- reports.Report,
- )
- .where(
- reports.Report.list == self,
- )
- .order_by(
- reports.Report.reported_at.desc(),
- )
- )
-
- # Filter for open/closed reports only
- if open is True:
- stmt = stmt.where(
- reports.Report.closed_at == None,
- )
- elif open is False:
- stmt = stmt.where(
- not reports.Report.closed_at == None,
- )
-
- # Optionally filter by name
- if name:
- stmt = stmt.where(
- reports.Report.name == name,
- )
-
- # Optionally filter by reporter
- if reported_by:
- stmt = stmt.where(
- reports.Report.reported_by == reported_by,
- )
-
- # Optionally apply the limit
- if limit:
- stmt = stmt.limit(limit)
-
- return self.backend.db.fetch(stmt)
-
# Report!
async def report(self, **kwargs):
return await self.backend.db.fetch_one(stmt)
+ def get(self, list=None, open=None, name=None, reported_by=None, limit=None):
+ """
+ Fetches the most recent reports
+ """
+ # Only use the UID of any users
+ if isinstance(reported_by, users.User):
+ reported_by = reported_by.uid
+
+ # Start the statement
+ stmt = (
+ sqlmodel
+ .select(
+ Report,
+ )
+ .order_by(
+ Report.reported_at.desc(),
+ )
+ )
+
+ # Filter by list
+ if list:
+ stmt = stmt.where(
+ Report.list == list,
+ )
+
+ # Filter for open/closed reports only
+ if open is True:
+ stmt = stmt.where(
+ Report.closed_at == None,
+ )
+ elif open is False:
+ stmt = stmt.where(
+ not Report.closed_at == None,
+ )
+
+ # Optionally filter by name
+ if name:
+ stmt = stmt.where(
+ Report.name == name,
+ )
+
+ # Optionally filter by reporter
+ if reported_by:
+ stmt = stmt.where(
+ Report.reported_by == reported_by,
+ )
+
+ # Optionally apply the limit
+ if limit:
+ stmt = stmt.limit(limit)
+
+ return self.backend.db.fetch(stmt)
+
async def create(self, reported_by, comment=None, **kwargs):
"""
Creates a new report