# #
###############################################################################
+import datetime
import io
import ipaddress
import logging
if not directory:
for writer in writers.values():
writer.print()
+
+
+class ZoneExporter(object):
+ def __init__(self, db, format, origin, ttl=None):
+ self.db = db
+ self.origin = origin
+ self.ttl = ttl
+
+ try:
+ self.type, self._format = self.formats[format]
+ except KeyError as e:
+ raise ValueError("Unsupported format for a zone: %s" % format)
+
+ def export(self, f):
+ assert self.write, "No write method has been selected"
+
+ created_at = datetime.datetime.fromtimestamp(self.db.created_at)
+
+ # Write the header
+ f.write(";##############################################################################\n")
+ f.write("; IPFire Location DNS Zone\n")
+ f.write(";##############################################################################\n")
+ if self.db.license:
+ f.write("License: %s\n" % self.db.license)
+ if self.db.description:
+ f.write("%s" % self.db.description)
+ f.write("Updated At: %s" % created_at.isoformat())
+ f.write(";##############################################################################\n")
+
+ f.write("$ORIGIN %s" % self.origin)
+ if self.ttl:
+ f.write("$TTL %s" % self.ttl)
+
+ # Write all networks
+ for network in self.db.networks:
+ self.write(f, network)
+
+ def write(self, f, network):
+ rp = network.reverse_pointer(suffix="")
+
+ # If we don't have a reverse pointer, we will have to split the network
+ # into its subnets and call ourselves again.
+ if rp is None:
+ for subnet in network.subnets:
+ self.write(f, subnet)
+
+ return
+
+ # Fetch the payload
+ content = self._format(self, network)
+
+ # Skip the network if there is no payload
+ if content is None:
+ return
+
+ f.write("%s IN %s %s\n" % (rp, self.type, content))
+
+ def _format_asn(self, network):
+ # Skip the network if it does not belong to an AS
+ if network.asn is None:
+ return
+
+ # Fetch the ASN
+ asn = self.db.get_as(network.asn)
+ if asn is None:
+ return
+
+ return "\"%s\"" % asn
+
+ formats = {
+ "asn" : ("TXT", _format_asn),
+ }
export.add_argument("objects", nargs="*", help=_("List country codes or ASNs to export"))
export.set_defaults(func=self.handle_export)
+ # Export DNS Zones
+ export_zone = subparsers.add_parser("export-zone",
+ help=_("Exports the database in DNS zone format"),
+ )
+ export_zone.add_argument("type", help=_("Output Type"),
+ choices=location.export.ZoneExporter.formats.keys(),
+ )
+ export_zone.add_argument("--output", required=True, type=argparse.FileType("w"),
+ help=_("The file to write the output to"))
+ export_zone.add_argument("--origin", required=True,
+ help=_("The origin of the DNS zone"))
+ export_zone.add_argument("--ttl", help=_("The TTL of the DNS zone"))
+ export_zone.set_defaults(func=self.handle_export_zone)
+
args = parser.parse_args()
# Configure logging
e = location.export.Exporter(db, writer)
e.export(ns.directory, countries=countries, asns=asns, families=families)
+ def handle_export_zone(self, db, ns):
+ """
+ Exports the database in DNS zone format
+ """
+ e = location.export.ZoneExporter(db, format=ns.type, origin=ns.origin, ttl=ns.ttl)
+ e.export(ns.output)
+
def format_timedelta(t):
s = []