###############################################################################
import argparse
+import concurrent.futures
import ipaddress
import json
import logging
update_announcements.add_argument("server", nargs=1,
help=_("Route Server to connect to"), metavar=_("SERVER"))
+ # Update geofeeds
+ update_geofeeds = subparsers.add_parser("update-geofeeds",
+ help=_("Update Geofeeds"))
+ update_geofeeds.set_defaults(func=self.handle_update_geofeeds)
+
# Update overrides
update_overrides = subparsers.add_parser("update-overrides",
help=_("Update overrides"),
CREATE INDEX IF NOT EXISTS networks_family ON networks USING BTREE(family(network));
CREATE INDEX IF NOT EXISTS networks_search ON networks USING GIST(network inet_ops);
+ -- geofeeds
+ CREATE TABLE IF NOT EXISTS geofeeds(
+ id serial primary key,
+ url text,
+ status integer default null,
+ updated_at timestamp without time zone default null
+ );
+ CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
+ ON geofeeds(url);
+ CREATE TABLE IF NOT EXISTS geofeed_networks(
+ geofeed_id integer references geofeeds(id) on delete cascade,
+ network inet,
+ country text,
+ region text,
+ city text
+ );
+ CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
+ ON geofeed_networks(geofeed_id);
+ CREATE INDEX IF NOT EXISTS geofeed_networks_search
+ ON geofeed_networks USING GIST(network inet_ops);
+ CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
+ CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique
+ ON network_geofeeds(network);
+ CREATE INDEX IF NOT EXISTS network_geofeeds_search
+ ON network_geofeeds USING GIST(network inet_ops);
+ CREATE INDEX IF NOT EXISTS network_geofeeds_url
+ ON network_geofeeds(url);
+
-- overrides
CREATE TABLE IF NOT EXISTS autnum_overrides(
number bigint NOT NULL,
SELECT network FROM networks
UNION
SELECT network FROM network_overrides
+ UNION
+ SELECT network FROM geofeed_networks
),
ordered_networks AS (
SELECT country FROM autnum_overrides overrides
WHERE networks.autnum = overrides.number
),
+ (
+ SELECT
+ geofeed_networks.country AS country
+ FROM
+ network_geofeeds
+
+ -- Join the data from the geofeeds
+ LEFT JOIN
+ geofeeds ON network_geofeeds.url = geofeeds.url
+ LEFT JOIN
+ geofeed_networks ON geofeeds.id = geofeed_networks.geofeed_id
+
+ -- Check whether we have a geofeed for this network
+ WHERE
+ networks.network <<= network_geofeeds.network
+ AND
+ networks.network <<= geofeed_networks.network
+
+ -- Filter for the best result
+ ORDER BY
+ masklen(geofeed_networks.network) DESC
+ LIMIT 1
+ ),
networks.country
) AS country,
inetnum[key].append(val)
+ # Parse the geofeed attribute
+ elif key == "geofeed":
+ inetnum["geofeed"] = val
+
+ # Parse geofeed when used as a remark
+ elif key == "remarks":
+ m = re.match(r"^(?:Geofeed)\s+(https://.*)", val)
+ if m:
+ inetnum["geofeed"] = m.group(1)
+
# Skip empty objects
if not inetnum or not "country" in inetnum:
return
# them into the database, if _check_parsed_network() succeeded
for single_network in inetnum.get("inet6num") or inetnum.get("inetnum"):
if self._check_parsed_network(single_network):
-
# Skip objects with unknown country codes if they are valid to avoid log spam...
if validcountries and invalidcountries:
log.warning("Skipping network with bogus countr(y|ies) %s (original countries: %s): %s" % \
"%s" % single_network, inetnum.get("country")[0], inetnum.get("country"), source_key,
)
+ # Update any geofeed information
+ geofeed = inetnum.get("geofeed", None)
+
+ # Make sure that this is a HTTPS URL
+ if geofeed and not geofeed.startswith("https://"):
+ log.warning("Geofeed URL is not using HTTPS: %s" % geofeed)
+ geofeed = None
+
+ # Store/update any geofeeds
+ if geofeed:
+ self.db.execute("""
+ INSERT INTO
+ network_geofeeds(
+ network,
+ url
+ )
+ VALUES(
+ %s, %s
+ )
+ ON CONFLICT (network) DO
+ UPDATE SET url = excluded.url""",
+ "%s" % single_network, geofeed,
+ )
+
+ # Delete any previous geofeeds
+ else:
+ self.db.execute("DELETE FROM network_geofeeds WHERE network = %s",
+ "%s" % single_network)
+
def _parse_org_block(self, block, source_key):
org = {}
for line in block:
# Otherwise return the line
yield line
+ def handle_update_geofeeds(self, ns):
+ # Sync geofeeds
+ with self.db.transaction():
+ # Delete all geofeeds which are no longer linked
+ self.db.execute("""
+ DELETE FROM
+ geofeeds
+ WHERE
+ NOT EXISTS (
+ SELECT
+ 1
+ FROM
+ network_geofeeds
+ WHERE
+ geofeeds.url = network_geofeeds.url
+ )""",
+ )
+
+ # Copy all geofeeds
+ self.db.execute("""
+ INSERT INTO
+ geofeeds(
+ url
+ )
+ SELECT
+ url
+ FROM
+ network_geofeeds
+ ON CONFLICT (url)
+ DO NOTHING
+ """,
+ )
+
+ # Fetch all Geofeeds that require an update
+ geofeeds = self.db.query("""
+ SELECT
+ id,
+ url
+ FROM
+ geofeeds
+ WHERE
+ updated_at IS NULL
+ OR
+ updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
+ ORDER BY
+ id
+ """)
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
+ results = executor.map(self._fetch_geofeed, geofeeds)
+
+ # Fetch all results to raise any exceptions
+ for result in results:
+ pass
+
+ def _fetch_geofeed(self, geofeed):
+ log.debug("Fetching Geofeed %s" % geofeed.url)
+
+ with self.db.transaction():
+ # Open the URL
+ try:
+ req = urllib.request.Request(geofeed.url, headers={
+ "User-Agent" : "location/%s" % location.__version__,
+
+ # We expect some plain text file in CSV format
+ "Accept" : "text/csv, text/plain",
+ })
+
+ # XXX set proxy
+
+ # Send the request
+ with urllib.request.urlopen(req, timeout=10) as f:
+ # Remove any previous data
+ self.db.execute("DELETE FROM geofeed_networks \
+ WHERE geofeed_id = %s", geofeed.id)
+
+ # Read the output line by line
+ for line in f:
+ line = line.decode()
+
+ # Strip any newline
+ line = line.rstrip()
+
+ # Skip empty lines
+ if not line:
+ continue
+
+ # Try to parse the line
+ try:
+ fields = line.split(",", 5)
+ except ValueError:
+ log.debug("Could not parse line: %s" % line)
+ continue
+
+ # Check if we have enough fields
+ if len(fields) < 4:
+ log.debug("Not enough fields in line: %s" % line)
+ continue
+
+ # Fetch all fields
+ network, country, region, city, = fields[:4]
+
+ # Try to parse the network
+ try:
+ network = ipaddress.ip_network(network, strict=False)
+ except ValueError:
+ log.debug("Could not parse network: %s" % network)
+ continue
+
+ # Strip any excess whitespace from country codes
+ if country:
+ country = country.strip()
+
+ # Check the country code
+ if not location.country_code_is_valid(country):
+ log.warning("Invalid country code in Geofeed %s: %s" \
+ % (geofeed.url, country))
+ continue
+
+ # Write this into the database
+ self.db.execute("""
+ INSERT INTO
+ geofeed_networks (
+ geofeed_id,
+ network,
+ country,
+ region,
+ city
+ )
+ VALUES (%s, %s, %s, %s, %s)""",
+ geofeed.id,
+ "%s" % network,
+ country,
+ region,
+ city,
+ )
+
+ # Catch any HTTP errors
+ except urllib.request.HTTPError as e:
+ self.db.execute("UPDATE geofeeds SET status = %s \
+ WHERE id = %s", e.code, geofeed.id)
+
+ # Catch any other errors
+ except urllib.request.URLError as e:
+ log.error("Could not fetch URL %s: %s" % (geofeed.url, e))
+
+ # Mark the geofeed as updated
+ else:
+ self.db.execute("""
+ UPDATE
+ geofeeds
+ SET
+ updated_at = CURRENT_TIMESTAMP,
+ status = NULL
+ WHERE
+ id = %s""",
+ geofeed.id,
+ )
+
def handle_update_overrides(self, ns):
with self.db.transaction():
# Only drop manually created overrides, as we can be reasonably sure to have them,