# network allocation lists in a machine-readable format...
self._update_overrides_for_aws()
+ # Update overrides for Spamhaus DROP feeds...
+ self._update_overrides_for_spamhaus_drop()
+
for file in ns.files:
log.info("Reading %s..." % file)
)
+ def _update_overrides_for_spamhaus_drop(self):
+ downloader = location.importer.Downloader()
+
+ ip_urls = [
+ "https://www.spamhaus.org/drop/drop.txt",
+ "https://www.spamhaus.org/drop/edrop.txt",
+ "https://www.spamhaus.org/drop/dropv6.txt"
+ ]
+
+ asn_urls = [
+ "https://www.spamhaus.org/drop/asndrop.txt"
+ ]
+
+ for url in ip_urls:
+ try:
+ with downloader.request(url, return_blocks=False) as f:
+ fcontent = f.body.readlines()
+ except Exception as e:
+ log.error("Unable to download Spamhaus DROP URL %s: %s" % (url, e))
+ return
+
+ # Iterate through every line, filter comments and add remaining networks to
+ # the override table in case they are valid...
+ with self.db.transaction():
+ for sline in fcontent:
+
+ # The response is assumed to be encoded in UTF-8...
+ sline = sline.decode("utf-8")
+
+ # Comments start with a semicolon...
+ if sline.startswith(";"):
+ continue
+
+ # Extract network and ignore anything afterwards...
+ try:
+ network = ipaddress.ip_network(sline.split()[0], strict=False)
+ except ValueError:
+ log.error("Unable to parse line: %s" % sline)
+ continue
+
+ # Sanitize parsed networks...
+ if not self._check_parsed_network(network):
+ log.warning("Skipping bogus network found in Spamhaus DROP URL %s: %s" % \
+ (url, network))
+ continue
+
+ # Conduct SQL statement...
+ self.db.execute("""
+ INSERT INTO network_overrides(
+ network,
+ source,
+ is_drop
+ ) VALUES (%s, %s, %s)
+ ON CONFLICT (network) DO NOTHING""",
+ "%s" % network,
+ "Spamhaus DROP lists",
+ True
+ )
+
+ for url in asn_urls:
+ try:
+ with downloader.request(url, return_blocks=False) as f:
+ fcontent = f.body.readlines()
+ except Exception as e:
+ log.error("Unable to download Spamhaus DROP URL %s: %s" % (url, e))
+ return
+
+ # Iterate through every line, filter comments and add remaining ASNs to
+ # the override table in case they are valid...
+ with self.db.transaction():
+ for sline in fcontent:
+
+ # The response is assumed to be encoded in UTF-8...
+ sline = sline.decode("utf-8")
+
+ # Comments start with a semicolon...
+ if sline.startswith(";"):
+ continue
+
+ # Throw away anything after the first space...
+ sline = sline.split()[0]
+
+ # ... strip the "AS" prefix from it ...
+ sline = sline.strip("AS")
+
+ # ... and convert it into an integer. Voila.
+ asn = int(sline)
+
+ # Filter invalid ASNs...
+ if not self._check_parsed_asn(asn):
+ log.warning("Skipping bogus ASN found in Spamhaus DROP URL %s: %s" % \
+ (url, asn))
+ continue
+
+ # Conduct SQL statement...
+ self.db.execute("""
+ INSERT INTO autnum_overrides(
+ number,
+ source,
+ is_drop
+ ) VALUES (%s, %s, %s)
+ ON CONFLICT (number) DO NOTHING""",
+ "%s" % asn,
+ "Spamhaus ASN-DROP list",
+ True
+ )
+
@staticmethod
def _parse_bool(block, key):
val = block.get(key)