# Update
update = subparsers.add_parser("update", help=_("Update database"))
+ update.add_argument("--cron",
+ help=_("Update the library only once per interval"),
+ choices=("daily", "weekly", "monthly"),
+ )
update.set_defaults(func=self.handle_update)
# Verify
export.add_argument("--family",
help=_("Specify address family"), choices=("ipv6", "ipv4"),
)
- export.add_argument("objects", nargs="+", help=_("List country codes or ASNs to export"))
+ export.add_argument("objects", nargs="*", help=_("List country codes or ASNs to export"))
export.set_defaults(func=self.handle_export)
args = parser.parse_args()
try:
db = location.Database(args.database)
except FileNotFoundError as e:
- sys.stderr.write("location: Could not open database %s: %s\n" \
- % (args.database, e))
- sys.exit(1)
+ # Allow continuing without a database
+ if args.func == self.handle_update:
+ db = None
+
+ else:
+ sys.stderr.write("location: Could not open database %s: %s\n" \
+ % (args.database, e))
+ sys.exit(1)
# Translate family (if present)
if "family" in args:
if db.description:
for line in db.description.splitlines():
- f.write("# %s\n" % line)
+ line = "# %s" % line
+ f.write("%s\n" % line.rstrip())
f.write("#\n")
print(a)
def handle_update(self, db, ns):
+ if ns.cron and db:
+ now = time.time()
+
+ if ns.cron == "daily":
+ delta = datetime.timedelta(days=1)
+ elif ns.cron == "weekly":
+ delta = datetime.timedelta(days=7)
+ elif ns.cron == "monthly":
+ delta = datetime.timedelta(days=30)
+
+ delta = delta.total_seconds()
+
+ # Check if the database has recently been updated
+ if db.created_at >= (now - delta):
+ log.info(
+ _("The database has been updated recently"),
+ )
+ return 3
+
# Fetch the timestamp we need from DNS
t = location.discover_latest_version()
- # Parse timestamp into datetime format
- timestamp = datetime.datetime.fromtimestamp(t) if t else None
-
# Check the version of the local database
- if db and timestamp and db.created_at >= timestamp.timestamp():
+ if db and t and db.created_at >= t:
log.info("Already on the latest version")
return
return 0
- def handle_verify(self, ns):
- try:
- db = location.Database(ns.database)
- except FileNotFoundError as e:
- log.error("%s: %s" % (ns.database, e))
- return 127
-
+ def handle_verify(self, db, ns):
# Verify the database
with open(ns.public_key, "r") as f:
if not db.verify(f):
countries, asns = [], []
# Translate family
- if ns.family == "ipv6":
- families = [ socket.AF_INET6 ]
- elif ns.family == "ipv4":
- families = [ socket.AF_INET ]
+ if ns.family:
+ families = [ ns.family ]
else:
families = [ socket.AF_INET6, socket.AF_INET ]
log.warning("Invalid argument: %s" % object)
continue
+ # Default to exporting all countries
if not countries and not asns:
- log.error("Nothing to export")
- return 2
+ countries = ["A1", "A2", "A3"] + [country.code for country in db.countries]
# Select the output format
writer = self.__get_output_formatter(ns)
e.export(ns.directory, countries=countries, asns=asns, families=families)
+def format_timedelta(t):
+ s = []
+
+ if t.days:
+ s.append(
+ _("One Day", "%(days)s Days", t.days) % { "days" : t.days, }
+ )
+
+ hours = t.seconds // 3600
+ if hours:
+ s.append(
+ _("One Hour", "%(hours)s Hours", hours) % { "hours" : hours, }
+ )
+
+ minutes = (t.seconds % 3600) // 60
+ if minutes:
+ s.append(
+ _("One Minute", "%(minutes)s Minutes", minutes) % { "minutes" : minutes, }
+ )
+
+ seconds = t.seconds % 60
+ if t.seconds:
+ s.append(
+ _("One Second", "%(seconds)s Seconds", seconds) % { "seconds" : seconds, }
+ )
+
+ if not s:
+ return _("Now")
+
+ return _("%s ago") % ", ".join(s)
+
def main():
# Run the command line interface
c = CLI()