# Return the temporary file handle
return t
-
- def request_blocks(self, url, data=None):
- """
- This method will fetch the data from the URL and return an
- iterator for each block in the data.
- """
- # Download the data first
- t = self.retrieve(url, data=data)
-
- # Then, split it into blocks
- return iterate_over_blocks(t)
-
- def request_lines(self, url, data=None):
- """
- This method will fetch the data from the URL and return an
- iterator for each line in the data.
- """
- # Download the data first
- t = self.retrieve(url, data=data)
-
- # Then, split it into lines
- return iterate_over_lines(t)
-
-
-def read_blocks(f):
- for block in iterate_over_blocks(f):
- type = None
- data = {}
-
- for i, line in enumerate(block):
- key, value = line.split(":", 1)
-
- # The key of the first line defines the type
- if i == 0:
- type = key
-
- # Store value
- data[key] = value.strip()
-
- yield type, data
-
-def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
- block = []
-
- for line in f:
- # Skip commented lines
- if line.startswith(b"#") or line.startswith(b"%"):
- continue
-
- # Convert to string
- for charset in charsets:
- try:
- line = line.decode(charset)
- except UnicodeDecodeError:
- continue
- else:
- break
-
- # Remove any comments at the end of line
- line, hash, comment = line.partition("#")
-
- # Strip any whitespace at the end of the line
- line = line.rstrip()
-
- # If we cut off some comment and the line is empty, we can skip it
- if comment and not line:
- continue
-
- # If the line has some content, keep collecting it
- if line:
- block.append(line)
- continue
-
- # End the block on an empty line
- if block:
- yield block
-
- # Reset the block
- block = []
-
- # Return the last block
- if block:
- yield block
-
-
-def iterate_over_lines(f):
- for line in f:
- # Decode the line
- line = line.decode()
-
- # Strip the ending
- yield line.rstrip()
Imports a single standard format source feed
"""
# Iterate over all blocks
- for block in location.importer.iterate_over_blocks(f):
+ for block in iterate_over_blocks(f):
self._parse_block(block, source, countries)
def _import_extended_format(self, source, countries, f, *args):
# Iterate over all lines
- for line in location.importer.iterate_over_lines(f):
+ for line in iterate_over_lines(f):
self._parse_line(block, source, countries)
def _import_arin_as_names(self, source, countries, f, *args):
log.info("Reading %s..." % file)
with open(file, "rb") as f:
- for type, block in location.importer.read_blocks(f):
+ for type, block in read_blocks(f):
if type == "net":
network = block.get("net")
# Try to parse and normalise the network
return key, val
+def read_blocks(f):
+ for block in iterate_over_blocks(f):
+ type = None
+ data = {}
+
+ for i, line in enumerate(block):
+ key, value = line.split(":", 1)
+
+ # The key of the first line defines the type
+ if i == 0:
+ type = key
+
+ # Store value
+ data[key] = value.strip()
+
+ yield type, data
+
+def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
+ block = []
+
+ for line in f:
+ # Skip commented lines
+ if line.startswith(b"#") or line.startswith(b"%"):
+ continue
+
+ # Convert to string
+ for charset in charsets:
+ try:
+ line = line.decode(charset)
+ except UnicodeDecodeError:
+ continue
+ else:
+ break
+
+ # Remove any comments at the end of line
+ line, hash, comment = line.partition("#")
+
+ # Strip any whitespace at the end of the line
+ line = line.rstrip()
+
+ # If we cut off some comment and the line is empty, we can skip it
+ if comment and not line:
+ continue
+
+ # If the line has some content, keep collecting it
+ if line:
+ block.append(line)
+ continue
+
+ # End the block on an empty line
+ if block:
+ yield block
+
+ # Reset the block
+ block = []
+
+ # Return the last block
+ if block:
+ yield block
+
+def iterate_over_lines(f):
+ for line in f:
+ # Decode the line
+ line = line.decode()
+
+ # Strip the ending
+ yield line.rstrip()
+
def main():
# Run the command line interface
c = CLI()