deltask do_install
deltask do_populate_sysroot
-NVDCVE_URL ?= "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-"
FKIE_URL ?= "https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download/CVE-"
# CVE database update interval, in seconds. By default: once a day (23*60*60).
if os.path.exists(db_tmp_file):
os.remove(db_tmp_file)
-def db_file_names(d, year, is_nvd):
- if is_nvd:
- year_url = d.getVar('NVDCVE_URL') + str(year)
- meta_url = year_url + ".meta"
- json_url = year_url + ".json.gz"
- return json_url, meta_url
+def db_file_names(d, year):
year_url = d.getVar('FKIE_URL') + str(year)
meta_url = year_url + ".meta"
json_url = year_url + ".json.xz"
return json_url, meta_url
-def host_db_name(d, is_nvd):
- if is_nvd:
- return "nvd.nist.gov"
- return "github.com"
-
-def db_decompress(d, data, is_nvd):
- import gzip, lzma
-
- if is_nvd:
- return gzip.decompress(data).decode('utf-8')
- # otherwise
- return lzma.decompress(data)
-
def update_db_file(db_tmp_file, d):
"""
Update the given database file
import bb.progress
import bb.utils
from datetime import date
+ import lzma
import sqlite3
import urllib
YEAR_START = 2002
cve_socket_timeout = int(d.getVar("CVE_SOCKET_TIMEOUT"))
- is_nvd = d.getVar("NVD_DB_VERSION") == "NVD1"
# Connect to database
conn = sqlite3.connect(db_tmp_file)
for i, year in enumerate(range(YEAR_START, date.today().year + 1)):
bb.note("Updating %d" % year)
ph.update((float(i + 1) / total_years) * 100)
- json_url, meta_url = db_file_names(d, year, is_nvd)
+ json_url, meta_url = db_file_names(d, year)
# Retrieve meta last modified date
try:
cve_f.write('Warning: CVE db update error, Unable to fetch CVE data.\n\n')
bb.warn("Failed to fetch CVE data (%s)" % e)
import socket
- result = socket.getaddrinfo(host_db_name(d, is_nvd), 443, proto=socket.IPPROTO_TCP)
+ result = socket.getaddrinfo("github.com", 443, proto=socket.IPPROTO_TCP)
bb.warn("Host IPs are %s" % (", ".join(t[4][0] for t in result)))
return False
try:
response = urllib.request.urlopen(json_url, timeout=cve_socket_timeout)
if response:
- update_db(d, conn, db_decompress(d, response.read(), is_nvd))
+ update_db(conn, lzma.decompress(response.read()))
conn.execute("insert or replace into META values (?, ?)", [year, last_modified]).close()
except urllib.error.URLError as e:
cve_f.write('Warning: CVE db update error, CVE data is outdated.\n\n')
c.close()
-def parse_node_and_insert(conn, node, cveId, is_nvd):
+def parse_node_and_insert(conn, node, cveId):
# Parse children node if needed
for child in node.get('children', ()):
- parse_node_and_insert(conn, child, cveId, is_nvd)
+ parse_node_and_insert(conn, child, cveId)
- def cpe_generator(is_nvd):
+ def cpe_generator():
match_string = "cpeMatch"
cpe_string = 'criteria'
- if is_nvd:
- match_string = "cpe_match"
- cpe_string = 'cpe23Uri'
for cpe in node.get(match_string, ()):
if not cpe['vulnerable']:
# Save processing by representing as -.
yield [cveId, vendor, product, '-', '', '', '']
- conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator(is_nvd)).close()
-
-def update_db_nvdjson(conn, jsondata):
- import json
- root = json.loads(jsondata)
-
- for elt in root['CVE_Items']:
- if not elt['impact']:
- continue
-
- accessVector = None
- vectorString = None
- cvssv2 = 0.0
- cvssv3 = 0.0
- cvssv4 = 0.0
- cveId = elt['cve']['CVE_data_meta']['ID']
- cveDesc = elt['cve']['description']['description_data'][0]['value']
- date = elt['lastModifiedDate']
- try:
- accessVector = elt['impact']['baseMetricV2']['cvssV2']['accessVector']
- vectorString = elt['impact']['baseMetricV2']['cvssV2']['vectorString']
- cvssv2 = elt['impact']['baseMetricV2']['cvssV2']['baseScore']
- except KeyError:
- cvssv2 = 0.0
- try:
- accessVector = accessVector or elt['impact']['baseMetricV3']['cvssV3']['attackVector']
- vectorString = vectorString or elt['impact']['baseMetricV3']['cvssV3']['vectorString']
- cvssv3 = elt['impact']['baseMetricV3']['cvssV3']['baseScore']
- except KeyError:
- accessVector = accessVector or "UNKNOWN"
- cvssv3 = 0.0
-
- conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)",
- [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close()
-
- configurations = elt['configurations']['nodes']
- for config in configurations:
- parse_node_and_insert(conn, config, cveId, True)
+ conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator()).close()
def get_metric_entry(metric):
primaries = [c for c in metric if c['type'] == "Primary"]
return secondaries[0]
return None
-def update_db_fkie(conn, jsondata):
+def update_db(conn, jsondata):
import json
root = json.loads(jsondata)
for config in elt['configurations']:
# This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing
for node in config.get("nodes") or []:
- parse_node_and_insert(conn, node, cveId, False)
-
-def update_db(d, conn, jsondata):
- if (d.getVar("NVD_DB_VERSION") == "FKIE"):
- return update_db_fkie(conn, jsondata)
- else:
- return update_db_nvdjson(conn, jsondata)
+ parse_node_and_insert(conn, node, cveId)
do_fetch[nostamp] = "1"