suffix = "networks"
mode = "w"
- def __init__(self, f, prefix=None, flatten=True):
- self.f, self.prefix, self.flatten = f, prefix, flatten
+ def __init__(self, db, f, prefix=None, flatten=True):
+ self.db, self.f, self.prefix, self.flatten = db, f, prefix, flatten
# The previously written network
self._last_network = None
self._write_header()
@classmethod
- def open(cls, filename, **kwargs):
+ def open(cls, db, filename, **kwargs):
"""
Convenience function to open a file
"""
f = open(filename, cls.mode)
- return cls(f, **kwargs)
+ return cls(db, f, **kwargs)
def __repr__(self):
return "<%s f=%s>" % (self.__class__.__name__, self.f)
def _write_network(self, network):
self.f.write("%s\n" % network)
- def write(self, network):
+ def write(self, network, subnets):
if self.flatten and self._flatten(network):
log.debug("Skipping writing network %s" % network)
return
- # Write the network to file
- self._write_network(network)
+ # Write the network when it has no subnets
+ if not subnets:
+ network = ipaddress.ip_network("%s" % network)
+ return self._write_network(network)
+
+ # Collect all matching subnets
+ matching_subnets = []
+
+ for subnet in sorted(subnets):
+ # Try to find the subnet in the database
+ n = self.db.lookup("%s" % subnet.network_address)
+
+ # No entry or matching country means those IP addresses belong here
+ if not n or n.country_code == network.country_code:
+ matching_subnets.append(subnet)
+
+ # Write all networks as compact as possible
+ for network in ipaddress.collapse_addresses(matching_subnets):
+ log.debug("Writing %s to database" % network)
+ self._write_network(network)
def finish(self):
"""
mode = "wb"
def _write_network(self, network):
- for address in (network.first_address, network.last_address):
+ for address in (network.network_address, network.broadcast_address):
# Convert this into a string of bits
bytes = socket.inet_pton(
- network.family, address,
+ socket.AF_INET6 if network.version == 6 else socket.AF_INET, "%s" % address,
)
self.f.write(bytes)
directory, prefix=country_code, suffix=self.writer.suffix, family=family,
)
- writers[country_code] = self.writer.open(filename, prefix="CC_%s" % country_code)
+ writers[country_code] = self.writer.open(self.db, filename, prefix="CC_%s" % country_code)
# Create writers for ASNs
for asn in asns:
directory, "AS%s" % asn, suffix=self.writer.suffix, family=family,
)
- writers[asn] = self.writer.open(filename, prefix="AS%s" % asn)
+ writers[asn] = self.writer.open(self.db, filename, prefix="AS%s" % asn)
# Get all networks that match the family
networks = self.db.search_networks(family=family)
+ # Materialise the generator into a list (uses quite some memory)
+ networks = list(networks)
+
# Walk through all networks
- for network in networks:
+ for i, network in enumerate(networks):
+ _network = ipaddress.ip_network("%s" % network)
+
+ # Search for all subnets
+ subnets = set()
+
+ while i < len(networks):
+ subnet = networks[i+1]
+
+ if subnet.is_subnet_of(network):
+ _subnet = ipaddress.ip_network("%s" % subnet)
+
+ subnets.add(_subnet)
+ subnets.update(_network.address_exclude(_subnet))
+
+ i += 1
+ else:
+ break
+
# Write matching countries
try:
- writers[network.country_code].write(network)
+ writers[network.country_code].write(network, subnets)
except KeyError:
pass
# Write matching ASNs
try:
- writers[network.asn].write(network)
+ writers[network.asn].write(network, subnets)
except KeyError:
pass
country = flags[flag]
try:
- writers[country].write(network)
+ writers[country].write(network, subnets)
except KeyError:
pass