-- PostgreSQL database dump
--
-\restrict Xf18Fi8Ow9mJBKm7A0YHydhg05f2dQMA1HMDnTkGw1D1d51sv6ya9FmQD2QjJyr
+\restrict pljGpSA3LcghRNp7l8VhB9577BShEv8iwmjIVnsBJjUDBiyXWHsvzLrsdBLd4g5
-- Dumped from database version 17.6 (Debian 17.6-0+deb13u1)
-- Dumped by pg_dump version 17.6 (Debian 17.6-0+deb13u1)
list_id integer,
last_modified_at timestamp with time zone,
etag text,
- updated_at timestamp with time zone
+ updated_at timestamp with time zone,
+ total_domains integer,
+ dead_domains integer
);
-- PostgreSQL database dump complete
--
-\unrestrict Xf18Fi8Ow9mJBKm7A0YHydhg05f2dQMA1HMDnTkGw1D1d51sv6ya9FmQD2QjJyr
+\unrestrict pljGpSA3LcghRNp7l8VhB9577BShEv8iwmjIVnsBJjUDBiyXWHsvzLrsdBLd4g5
import sqlalchemy.dialects.postgresql
import sqlmodel
+from . import checker
from . import database
from . import util
from .i18n import _
def __init__(self, backend):
self.backend = backend
+ def __iter__(self):
+ stmt = (
+ sqlmodel
+ .select(
+ Source,
+ )
+ .where(
+ Source.removed_at == None,
+ )
+ .order_by(
+ Source.name,
+ Source.slug,
+ )
+ )
+
+ return self.backend.db.fetch(stmt)
+
def get_by_id(self, id):
stmt = (
sqlmodel
return self.name
def __len__(self):
- stmt = (
- sqlmodel
- .select(
- sqlmodel.func.count(),
- )
- .select_from(
- SourceDomain,
- )
- .where(
- SourceDomain.source == self,
- SourceDomain.removed_at == None,
- )
- )
-
- return self.backend.db.fetch_one(stmt)
+ return self.total_domains
def __hash__(self):
# Only hashable once the object has an ID
# Mark all domains that have not been updated as removed
self.__prune()
+ # Update the stats
+ self.update_stats()
+
# Signal that we have actually fetched new data
return True
return sources
+ # Stats
+
+ total_domains : int | None
+
+ dead_domains : int | None
+
+ def update_stats(self):
+ """
+ Updates the stats of this source
+ """
+ stmt = (
+ sqlmodel
+ .select(
+ sqlmodel.func.count(),
+ )
+ .select_from(
+ SourceDomain,
+ )
+ .where(
+ SourceDomain.source == self,
+ SourceDomain.removed_at == None,
+ )
+ )
+
+ # Store the total number of domains
+ self.total_domains = self.backend.db.fetch_one(stmt)
+
+ stmt = (
+ sqlmodel
+ .select(
+ sqlmodel.func.count(),
+ )
+ .select_from(
+ SourceDomain,
+ )
+ .join(
+ checker.CheckerDomain,
+ checker.CheckerDomain.name == SourceDomain.name,
+ )
+ .where(
+ SourceDomain.source == self,
+ SourceDomain.removed_at == None,
+
+ # Only check dead domains
+ checker.CheckerDomain.status == False,
+ )
+ )
+
+ # Store the total number of dead domains
+ self.dead_domains = self.backend.db.fetch_one(stmt)
+
class SourceDomain(sqlmodel.SQLModel, database.BackendMixin, table=True):
__tablename__ = "source_domains"
table.add_column(_("Created At"))
table.add_column(_("Created_By"))
table.add_column(_("Listed Domains"), justify="right")
+ table.add_column(_("Dead Domains"), justify="right")
for source in list.sources:
table.add_row(
source.url,
source.created_at.isoformat(),
source.created_by,
- babel.numbers.format_decimal(len(source)),
+ babel.numbers.format_decimal(source.total_domains)
+ if source.total_domains else _("N/A"),
+ babel.numbers.format_percent(
+ source.dead_domains / source.total_domains,
+ ) if source.total_domains else _("N/A"),
)
# Print the sources