drop_rule.enabled = rule.enabled
return drop_rule
-class Fetch(object):
+class Fetch:
- def __init__(self, args):
- self.args = args
+ def __init__(self, config):
+ self.config = config
+ if config is not None:
+ self.args = config.args
+ else:
+ # Should only happen in tests.
+ self.args = None
def check_checksum(self, tmp_filename, url):
try:
def get_tmp_filename(self, url):
url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
return os.path.join(
- self.args.cache_dir,
+ self.config.get_cache_dir(),
"%s-%s" % (url_hash, self.url_basename(url)))
def fetch(self, url):
if self.check_checksum(tmp_filename, url):
logger.info("Remote checksum has not changed. Not fetching.")
return self.extract_files(tmp_filename)
- if not os.path.exists(self.args.cache_dir):
- os.makedirs(self.args.cache_dir, mode=0o770)
+ if not os.path.exists(self.config.get_cache_dir()):
+ os.makedirs(self.config.get_cache_dir(), mode=0o770)
logger.info("Fetching %s." % (url))
try:
suricata.update.net.get(
self.config = {}
self.config.update(self.DEFAULTS)
+ self.cache_dir = None
+
def load(self):
if self.args.config:
with open(self.args.config) as fileobj:
def set(self, key, val):
self.config[key] = val
+ def get_cache_dir(self):
+ if self.cache_dir:
+ return self.cache_dir
+ return os.path.join(self.args.output, ".cache")
+
+ def set_cache_dir(self, directory):
+ self.cache_dir = directory
+
def test_suricata(config, suricata_path):
if not suricata_path:
logger.info("No suricata application binary found, skipping test.")
# Now download each URL.
for url in urls:
- Fetch(config.args).run(url, files)
+ Fetch(config).run(url, files)
# Now load local rules specified in the configuration file.
for local in config.config["local"]:
parser.add_argument("-o", "--output", metavar="<directory>",
dest="output", default="/var/lib/suricata/rules",
help="Directory to write rules to")
- parser.add_argument("--cache-dir", default="/var/lib/suricata/cache",
- metavar="<directory>", help="set the cache directory")
parser.add_argument("--suricata", metavar="<path>",
help="Path to Suricata program")
parser.add_argument("--suricata-version", metavar="<version>",
drop_filters += load_drop_filters(drop_conf_filename)
# Check that the cache directory exists and is writable.
- if not os.path.exists(args.cache_dir):
+ if not os.path.exists(config.get_cache_dir()):
try:
- os.makedirs(args.cache_dir, mode=0o770)
+ os.makedirs(config.get_cache_dir(), mode=0o770)
except Exception as err:
logger.warning(
"Cache directory does exist and could not be created. /var/tmp will be used instead.")
- args.cache_dir = "/var/tmp"
+ config.set_cache_dir("/var/tmp")
files = load_sources(config, suricata_version)
"file://%s/emerging.rules.tar.gz" % (
os.getcwd()),
"--local", "./rule-with-unicode.rules",
- "--cache-dir", "./tmp",
"--force",
"--output", "./tmp/rules/",
"--yaml-fragment", "./tmp/suricata-rules.yaml",
"file://%s/emerging.rules.tar.gz" % (
os.getcwd()),
"--local", "./rule-with-unicode.rules",
- "--cache-dir", "./tmp",
"--force",
"--output", "./tmp/rules/",
"--yaml-fragment", "./tmp/suricata-rules.yaml",