feeds = (
# AWS IP Ranges
("AWS-IP-RANGES", self._import_aws_ip_ranges, "https://ip-ranges.amazonaws.com/ip-ranges.json"),
+
+ # Spamhaus DROP
+ ("SPAMHAUS-DROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/drop.txt"),
+ ("SPAMHAUS-EDROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/edrop.txt"),
+ ("SPAMHAUS-DROPV6", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/dropv6.txt"),
)
# Walk through all feeds
success = False
# Spamhaus
- self._update_feed_for_spamhaus_drop()
+ #self._update_feed_for_spamhaus_drop()
# Return status
return 0 if success else 1
""", "%s" % network, name, cc, is_anycast,
)
- def _update_feed_for_spamhaus_drop(self):
- downloader = location.importer.Downloader()
+ def _import_spamhaus_drop(self, name, f):
+ """
+ Import Spamhaus DROP IP feeds
+ """
+ # Count all lines
+ lines = 0
- ip_lists = [
- ("SPAMHAUS-DROP", "https://www.spamhaus.org/drop/drop.txt"),
- ("SPAMHAUS-EDROP", "https://www.spamhaus.org/drop/edrop.txt"),
- ("SPAMHAUS-DROPV6", "https://www.spamhaus.org/drop/dropv6.txt")
- ]
+ # Walk through all lines
+ for line in f:
+ # Decode line
+ line = line.decode("utf-8")
- asn_lists = [
- ("SPAMHAUS-ASNDROP", "https://www.spamhaus.org/drop/asndrop.json")
- ]
+ # Strip off any comments
+ line, _, comment = line.partition(";")
- for name, url in ip_lists:
- # Fetch IP list from given URL
- f = downloader.retrieve(url)
+ # Ignore empty lines
+ if not line:
+ continue
- # Split into lines
- fcontent = f.readlines()
+ # Strip any excess whitespace
+ line = line.strip()
- with self.db.transaction():
- # Conduct a very basic sanity check to rule out CDN issues causing bogus DROP
- # downloads.
- if len(fcontent) > 10:
- self.db.execute("DELETE FROM network_feeds WHERE source = %s", name)
- else:
- log.warning("%s (%s) returned likely bogus file, ignored" % (name, url))
- continue
+ # Increment line counter
+ lines += 1
- # Iterate through every line, filter comments and add remaining networks to
- # the override table in case they are valid...
- for sline in fcontent:
- # The response is assumed to be encoded in UTF-8...
- sline = sline.decode("utf-8")
+ # Parse the network
+ try:
+ network = ipaddress.ip_network(line)
+ except ValueError as e:
+ log.warning("%s: Could not parse network: %s - %s" % (name, line, e))
+ continue
- # Comments start with a semicolon...
- if sline.startswith(";"):
- continue
+ # Check network
+ if not self._check_parsed_network(network):
+ log.warning("%s: Skipping bogus network: %s" % (name, network))
+ continue
- # Extract network and ignore anything afterwards...
- try:
- network = ipaddress.ip_network(sline.split()[0], strict=False)
- except ValueError:
- log.error("Unable to parse line: %s" % sline)
- continue
+ # Insert into the database
+ self.db.execute("""
+ INSERT INTO
+ network_feeds
+ (
+ network,
+ source,
+ is_drop
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )""", "%s" % network, name, True,
+ )
- # Sanitize parsed networks...
- if not self._check_parsed_network(network):
- log.warning("Skipping bogus network found in %s (%s): %s" % \
- (name, url, network))
- continue
+ # Raise an exception if we could not import anything
+ if not lines:
+ raise RuntimeError("Received bogus feed %s with no data" % name)
- # Conduct SQL statement...
- self.db.execute("""
- INSERT INTO
- network_feeds
- (
- network,
- source,
- is_drop
- )
- VALUES
- (
- %s, %s, %s
- )""", "%s" % network, name, True,
- )
+ def _update_feed_for_spamhaus_drop(self):
+ downloader = location.importer.Downloader()
+
+ asn_lists = [
+ ("SPAMHAUS-ASNDROP", "https://www.spamhaus.org/drop/asndrop.json")
+ ]
for name, url in asn_lists:
# Fetch URL