-
- def request_blocks(self, url, data=None):
- """
- This method will fetch the data from the URL and return an
- iterator for each block in the data.
- """
- # Download the data first
- t = self.retrieve(url, data=data)
-
- # Then, split it into blocks
- return iterate_over_blocks(t)
-
- def request_lines(self, url, data=None):
- """
- This method will fetch the data from the URL and return an
- iterator for each line in the data.
- """
- # Download the data first
- t = self.retrieve(url, data=data)
-
- # Then, split it into lines
- return iterate_over_lines(t)
-
-
-def read_blocks(f):
- for block in iterate_over_blocks(f):
- type = None
- data = {}
-
- for i, line in enumerate(block):
- key, value = line.split(":", 1)
-
- # The key of the first line defines the type
- if i == 0:
- type = key
-
- # Store value
- data[key] = value.strip()
-
- yield type, data
-
-def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
- block = []
-
- for line in f:
- # Skip commented lines
- if line.startswith(b"#") or line.startswith(b"%"):
- continue
-
- # Convert to string
- for charset in charsets:
- try:
- line = line.decode(charset)
- except UnicodeDecodeError:
- continue
- else:
- break
-
- # Remove any comments at the end of line
- line, hash, comment = line.partition("#")
-
- # Strip any whitespace at the end of the line
- line = line.rstrip()
-
- # If we cut off some comment and the line is empty, we can skip it
- if comment and not line:
- continue
-
- # If the line has some content, keep collecting it
- if line:
- block.append(line)
- continue
-
- # End the block on an empty line
- if block:
- yield block
-
- # Reset the block
- block = []
-
- # Return the last block
- if block:
- yield block
-
-
-def iterate_over_lines(f):
- for line in f:
- # Decode the line
- line = line.decode()
-
- # Strip the ending
- yield line.rstrip()