if update_stats:
await self.update_stats()
+ async def get_false_positives(self):
+ """
+ Returns a list of all false-positives
+ """
+ return { source : await source.get_false_positives() for source in self.sources }
+
class ListStats(sqlmodel.SQLModel, table=True):
__tablename__ = "list_stats"
# Store the total number of dead domains
self.dead_domains = await self.backend.db.fetch_one(stmt)
+ # Store the number of false positives
+ self.false_positives = len(
+ await self.get_false_positives(),
+ )
+
+ # Store the stats history
+ await self.backend.db.insert(
+ SourceStats,
+ source = self,
+ total_domains = self.total_domains,
+ dead_domains = self.dead_domains,
+ false_positives = self.false_positives,
+ )
+
+ async def get_false_positives(self):
whitelisted_domains = (
sqlmodel
.select(
.cte("whitelisted_domains")
)
- stmt = (
+ return await self.backend.db.fetch_as_set(
sqlmodel
.select(
- sqlmodel.func.count(),
- )
- .select_from(
- domains.Domain,
+ domains.Domain.name,
)
+ .distinct()
.where(
# Domains must be from this source
domains.Domain.source == self,
)
)
- # Store the number of false positives
- self.false_positives = await self.backend.db.fetch_one(stmt)
-
- # Store the stats history
- await self.backend.db.insert(
- SourceStats,
- source = self,
- total_domains = self.total_domains,
- dead_domains = self.dead_domains,
- false_positives = self.false_positives,
- )
-
class SourceStats(sqlmodel.SQLModel, table=True):
__tablename__ = "source_stats"
help=_("Notifies moderators about any pending reports"))
notify.set_defaults(func=self.__notify)
+ # False Positives
+ false_positives = subparsers.add_parser("false-positives",
+ help=_("Shows a list of false-positives for the given list"),
+ )
+ false_positives.add_argument("list", help=_("The name of the list"))
+ false_positives.set_defaults(func=self.__false_positives)
+
# Parse all arguments
args = parser.parse_args()
"""
await backend.reports.notify()
+ # False Positives
+
+ async def __false_positives(self, backend, args):
+ # Fetch the list
+ list = await self.__get_list(backend, args.list)
+
+ # Fetch all false-positives
+ sources = await list.get_false_positives()
+
+ for source, fps in sorted(sources.items()):
+ if not fps:
+ continue
+
+ print(source)
+
+ for name in fps:
+ print (" * %s" % name)
+
def main():
c = CLI()