return self.backend.db.fetch_one(stmt)
+ def __hash__(self):
+ # Only hashable once the object has an ID
+ if self.id is None:
+ raise TypeError("Cannot hash Source objects before they are persisted and have an ID")
+
+ return hash(self.id)
+
# ID
id : int = sqlmodel.Field(primary_key=True)
)
self.backend.db.execute(stmt)
+ def duplicates(self):
+ """
+ Finds the number of duplicates against other sources
+ """
+ sources = {}
+
+ for source in self.list.sources:
+ # Don't compare against ourselves
+ if source == self:
+ continue
+
+ domains_self = sqlalchemy.orm.aliased(SourceDomain)
+ domains_other = sqlalchemy.orm.aliased(SourceDomain)
+
+ stmt = (
+ sqlmodel
+ .select(
+ sqlmodel.func.count(),
+ )
+ .select_from(
+ domains_self,
+ )
+ .join(
+ domains_other,
+ domains_other.name == domains_self.name,
+ )
+ .where(
+ # Select the right sources
+ domains_self.source == self,
+ domains_other.source == source,
+
+ # Domains cannot have been removed
+ domains_self.removed_at == None,
+ domains_other.removed_at == None,
+ )
+ )
+
+ # Run the query
+ sources[source] = self.backend.db.fetch_one(stmt)
+
+ return sources
+
class SourceDomain(sqlmodel.SQLModel, database.BackendMixin, table=True):
__tablename__ = "source_domains"
search.add_argument("domain", help=_("The domain name"))
search.set_defaults(func=self.__search)
+ # analyze
+ analyze = subparsers.add_parser("analyze", help=_("Analyzes a list"))
+ analyze.add_argument("list", help=_("The name of the list"))
+ analyze.set_defaults(func=self.__analyze)
+
# Parse all arguments
args = parser.parse_args()
# Print the table
self.console.print(table)
+ def __analyze(self, backend, args):
+ """
+ Analyzes a list
+ """
+ # Fetch the list
+ list = backend.lists.get_by_slug(args.list)
+
+ # Show duplicates
+ self.__analyze_duplicates(list)
+
+ def __analyze_duplicates(self, list):
+ table = rich.table.Table(title=_("Duplication"))
+
+ table.add_column(_("List"))
+
+ # Add all columns
+ for source in list.sources:
+ table.add_column(source.name, justify="right")
+
+ # Check duplicates
+ for source in list.sources:
+ # Determine all duplicates against other sources
+ duplicates = source.duplicates()
+
+ columns = []
+
+ # Format the values for the table
+ for other in list.sources:
+ try:
+ value = duplicates[other]
+ except KeyError:
+ columns.append("")
+ continue
+
+ columns.append(
+ "%.2f%%" % (value / len(source) * 100),
+ )
+
+ # Add a row to the table
+ table.add_row(source.name, *columns)
+
+ # Print the table
+ self.console.print(table)
+
def main():
c = CLI()