whois_server = None
def __init__(self):
- pass
+ self.db = self._open_database(".cache.db")
def __str__(self):
if self.name:
return self.__class__.__name__
+ def _open_database(self, path=None):
+ db = sqlite3.connect(path or ":memory:")
+ db.set_trace_callback(logging.debug)
+
+ # Create tables
+ db.executescript("""
+ CREATE TABLE IF NOT EXISTS whois_query_cache(query TEXT, response TEXT,
+ fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
+ CREATE UNIQUE INDEX IF NOT EXISTS whois_query_cache_query
+ ON whois_query_cache(query);
+ """)
+
+ return db
+
@property
def parser(self):
return RIRParser
# Write the database to disk
p.export_database(directory)
- def whois(self, query):
+ def _whois(self, query):
command = [
"whois", query,
]
return output.decode(errors="ignore")
+ def whois(self, query):
+ # Try fetching a response from the cache
+ with self.db as c:
+ res = c.execute("SELECT response, fetched_at FROM whois_query_cache \
+ WHERE query = ?", (query,))
+
+ # Return any results
+ for row in res:
+ response, fetched_at = row
+
+ logging.debug("Fetched response for %s from cache (%s)"
+ % (query, fetched_at))
+
+ return response
+
+ # If we could not find anything, we will have to contact the whois server
+ response = self._whois(query)
+
+ # Store the response in the database
+ with self.db as c:
+ c.execute("INSERT INTO whois_query_cache(query, response) \
+ VALUES(?, ?)", (query, response))
+
+ # Commit changes to disk
+ self.db.commit()
+
+ return response
+
def get_name_for_asn(self, asn):
result = self.whois("AS%s" % asn)