Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
def _import_aws_ip_ranges(self, name, f):
# Parse the feed
def _import_aws_ip_ranges(self, name, f):
# Parse the feed
- aws_ip_dump = json.load(f)
# Set up a dictionary for mapping a region name to a country. Unfortunately,
# there seems to be no machine-readable version available of this other than
# Set up a dictionary for mapping a region name to a country. Unfortunately,
# there seems to be no machine-readable version available of this other than
"eusc-de-east-1" : "DE",
}
"eusc-de-east-1" : "DE",
}
- # Fetch all countries that we know of
- countries = self.fetch_countries()
+ # Collect a list of all networks
+ prefixes = feed.get("ipv6_prefixes", []) + feed.get("prefixes", [])
- for snetwork in aws_ip_dump["prefixes"] + aws_ip_dump["ipv6_prefixes"]:
+ for prefix in prefixes:
+ # Fetch network
+ network = prefix.get("ipv6_prefix") or prefix.get("ip_prefix")
+
+ # Parse the network
- network = ipaddress.ip_network(snetwork.get("ip_prefix") or snetwork.get("ipv6_prefix"), strict=False)
- except ValueError:
- log.warning("Unable to parse line: %s" % snetwork)
+ network = ipaddress.ip_network(network)
+ except ValuleError as e:
+ log.warning("%s: Unable to parse prefix %s" % (name, network))
continue
# Sanitize parsed networks...
if not self._check_parsed_network(network):
continue
continue
# Sanitize parsed networks...
if not self._check_parsed_network(network):
continue
- # Determine region of this network...
- region = snetwork["region"]
+ # Fetch the region
+ region = prefix.get("region")
+
+ # Set some defaults
cc = None
is_anycast = False
cc = None
is_anycast = False
- # Any region name starting with "us-" will get "US" country code assigned straight away...
- if region.startswith("us-"):
- cc = "US"
- elif region.startswith("cn-"):
- # ... same goes for China ...
- cc = "CN"
- elif region == "GLOBAL":
- # ... funny region name for anycast-like networks ...
- is_anycast = True
- elif region in aws_region_country_map:
- # ... assign looked up country code otherwise ...
+ # Fetch the CC from the dictionary
+ try:
cc = aws_region_country_map[region]
cc = aws_region_country_map[region]
- else:
- # ... and bail out if we are missing something here
- log.warning("Unable to determine country code for line: %s" % snetwork)
- continue
- # Skip networks with unknown country codes
- if not is_anycast and countries and cc not in countries:
- log.warning("Skipping Amazon AWS network with bogus country '%s': %s" % \
- (cc, network))
- return
+ # If we couldn't find anything, let's try something else...
+ except KeyError as e:
+ # Find anycast networks
+ if region == "GLOBAL":
+ is_anycast = True
+
+ # Everything that starts with us- is probably in the United States
+ elif region.startswith("us-"):
+ cc = "US"
+
+ # Everything that starts with cn- is probably China
+ elif region.startswith("cn-"):
+ cc = "CN"
+
+ # Log a warning for anything else
+ else:
+ log.warning("%s: Could not determine country code for AWS region %s" \
+ % (name, region))
+ continue
- # Conduct SQL statement...
self.db.execute("""
INSERT INTO
network_feeds
self.db.execute("""
INSERT INTO
network_feeds
%s, %s, %s, %s
)
ON CONFLICT (network, source) DO NOTHING
%s, %s, %s, %s
)
ON CONFLICT (network, source) DO NOTHING
- """, "%s" % network, "Amazon AWS IP feed", cc, is_anycast,
+ """, "%s" % network, name, cc, is_anycast,
)
def _update_feed_for_spamhaus_drop(self):
)
def _update_feed_for_spamhaus_drop(self):