+++ /dev/null
-#
-# Copyright OpenEmbedded Contributors
-#
-# SPDX-License-Identifier: MIT
-#
-
-# This class is used to check recipes against public CVEs.
-#
-# In order to use this class just inherit the class in the
-# local.conf file and it will add the cve_check task for
-# every recipe. The task can be used per recipe, per image,
-# or using the special cases "world" and "universe". The
-# cve_check task will print a warning for every unpatched
-# CVE found and generate a file in the recipe WORKDIR/cve
-# directory. If an image is build it will generate a report
-# in DEPLOY_DIR_IMAGE for all the packages used.
-#
-# Example:
-# bitbake -c cve_check openssl
-# bitbake core-image-sato
-# bitbake -k -c cve_check universe
-#
-# DISCLAIMER
-#
-# This class/tool is meant to be used as support and not
-# the only method to check against CVEs. Running this tool
-# doesn't guarantee your packages are free of CVEs.
-
-# The product name that the CVE database uses defaults to BPN, but may need to
-# be overriden per recipe (for example tiff.bb sets CVE_PRODUCT=libtiff).
-CVE_PRODUCT ??= "${BPN}"
-CVE_VERSION ??= "${PV}"
-
-# Possible database sources: NVD1, NVD2, FKIE
-NVD_DB_VERSION ?= "FKIE"
-
-# Use different file names for each database source, as they synchronize at different moments, so may be slightly different
-CVE_CHECK_DB_FILENAME ?= "${@'nvdcve_2-2.db' if d.getVar('NVD_DB_VERSION') == 'NVD2' else 'nvdcve_1-3.db' if d.getVar('NVD_DB_VERSION') == 'NVD1' else 'nvdfkie_1-1.db'}"
-CVE_CHECK_DB_FETCHER ?= "${@'cve-update-nvd2-native' if d.getVar('NVD_DB_VERSION') == 'NVD2' else 'cve-update-db-native'}"
-CVE_CHECK_DB_DIR ?= "${STAGING_DIR}/CVE_CHECK"
-CVE_CHECK_DB_FILE ?= "${CVE_CHECK_DB_DIR}/${CVE_CHECK_DB_FILENAME}"
-CVE_CHECK_DB_FILE_LOCK ?= "${CVE_CHECK_DB_FILE}.lock"
-
-CVE_CHECK_SUMMARY_DIR ?= "${LOG_DIR}/cve"
-CVE_CHECK_SUMMARY_FILE_NAME ?= "cve-summary"
-CVE_CHECK_SUMMARY_FILE_NAME_JSON = "cve-summary.json"
-CVE_CHECK_SUMMARY_INDEX_PATH = "${CVE_CHECK_SUMMARY_DIR}/cve-summary-index.txt"
-
-CVE_CHECK_LOG_JSON ?= "${T}/cve.json"
-
-CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve"
-CVE_CHECK_RECIPE_FILE_JSON ?= "${CVE_CHECK_DIR}/${PN}_cve.json"
-CVE_CHECK_MANIFEST_JSON_SUFFIX ?= "json"
-CVE_CHECK_MANIFEST_JSON ?= "${IMGDEPLOYDIR}/${IMAGE_NAME}.${CVE_CHECK_MANIFEST_JSON_SUFFIX}"
-CVE_CHECK_COPY_FILES ??= "1"
-CVE_CHECK_CREATE_MANIFEST ??= "1"
-
-# Report Patched or Ignored CVEs
-CVE_CHECK_REPORT_PATCHED ??= "1"
-
-CVE_CHECK_SHOW_WARNINGS ??= "1"
-
-# Provide JSON output
-CVE_CHECK_FORMAT_JSON ??= "1"
-
-# Check for packages without CVEs (no issues or missing product name)
-CVE_CHECK_COVERAGE ??= "1"
-
-# Skip CVE Check for packages (PN)
-CVE_CHECK_SKIP_RECIPE ?= ""
-
-# Replace NVD DB check status for a given CVE. Each of CVE has to be mentioned
-# separately with optional detail and description for this status.
-#
-# CVE_STATUS[CVE-1234-0001] = "not-applicable-platform: Issue only applies on Windows"
-# CVE_STATUS[CVE-1234-0002] = "fixed-version: Fixed externally"
-#
-# Settings the same status and reason for multiple CVEs is possible
-# via CVE_STATUS_GROUPS variable.
-#
-# CVE_STATUS_GROUPS = "CVE_STATUS_WIN CVE_STATUS_PATCHED"
-#
-# CVE_STATUS_WIN = "CVE-1234-0001 CVE-1234-0003"
-# CVE_STATUS_WIN[status] = "not-applicable-platform: Issue only applies on Windows"
-# CVE_STATUS_PATCHED = "CVE-1234-0002 CVE-1234-0004"
-# CVE_STATUS_PATCHED[status] = "fixed-version: Fixed externally"
-#
-# All possible CVE statuses could be found in cve-check-map.conf
-# CVE_CHECK_STATUSMAP[not-applicable-platform] = "Ignored"
-# CVE_CHECK_STATUSMAP[fixed-version] = "Patched"
-#
-# CVE_CHECK_IGNORE is deprecated and CVE_STATUS has to be used instead.
-# Keep CVE_CHECK_IGNORE until other layers migrate to new variables
-CVE_CHECK_IGNORE ?= ""
-
-# Layers to be excluded
-CVE_CHECK_LAYER_EXCLUDELIST ??= ""
-
-# Layers to be included
-CVE_CHECK_LAYER_INCLUDELIST ??= ""
-
-
-# set to "alphabetical" for version using single alphabetical character as increment release
-CVE_VERSION_SUFFIX ??= ""
-
-python () {
- from oe.cve_check import extend_cve_status
- extend_cve_status(d)
-
- nvd_database_type = d.getVar("NVD_DB_VERSION")
- if nvd_database_type not in ("NVD1", "NVD2", "FKIE"):
- bb.erroronce("Malformed NVD_DB_VERSION, must be one of: NVD1, NVD2, FKIE. Defaulting to NVD2")
- d.setVar("NVD_DB_VERSION", "NVD2")
-}
-
-def generate_json_report(d, out_path, link_path):
- if os.path.exists(d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")):
- import json
- from oe.cve_check import cve_check_merge_jsons, update_symlinks
-
- bb.note("Generating JSON CVE summary")
- index_file = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")
- summary = {"version":"1", "package": []}
- with open(index_file) as f:
- filename = f.readline()
- while filename:
- with open(filename.rstrip()) as j:
- data = json.load(j)
- cve_check_merge_jsons(summary, data)
- filename = f.readline()
-
- summary["package"].sort(key=lambda d: d['name'])
-
- with open(out_path, "w") as f:
- json.dump(summary, f, indent=2)
-
- update_symlinks(out_path, link_path)
-
-python cve_save_summary_handler () {
- import shutil
- import datetime
- from oe.cve_check import update_symlinks
-
- cve_summary_name = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME")
- cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR")
- bb.utils.mkdirhier(cvelogpath)
-
- timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
-
- if d.getVar("CVE_CHECK_FORMAT_JSON") == "1":
- json_summary_link_name = os.path.join(cvelogpath, d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON"))
- json_summary_name = os.path.join(cvelogpath, "%s-%s.json" % (cve_summary_name, timestamp))
- generate_json_report(d, json_summary_name, json_summary_link_name)
- bb.plain("Complete CVE JSON report summary created at: %s" % json_summary_link_name)
-}
-
-addhandler cve_save_summary_handler
-cve_save_summary_handler[eventmask] = "bb.event.BuildCompleted"
-
-python do_cve_check () {
- """
- Check recipe for patched and unpatched CVEs
- """
- from oe.cve_check import get_patched_cves
-
- with bb.utils.fileslocked([d.getVar("CVE_CHECK_DB_FILE_LOCK")], shared=True):
- if os.path.exists(d.getVar("CVE_CHECK_DB_FILE")):
- try:
- patched_cves = get_patched_cves(d)
- except FileNotFoundError:
- bb.fatal("Failure in searching patches")
- cve_data, status = check_cves(d, patched_cves)
- if len(cve_data) or (d.getVar("CVE_CHECK_COVERAGE") == "1" and status):
- get_cve_info(d, cve_data)
- cve_write_data(d, cve_data, status)
- else:
- bb.note("No CVE database found, skipping CVE check")
-
-}
-
-addtask cve_check before do_build
-do_cve_check[depends] = "${CVE_CHECK_DB_FETCHER}:do_unpack"
-do_cve_check[nostamp] = "1"
-
-python cve_check_cleanup () {
- """
- Delete the file used to gather all the CVE information.
- """
- bb.utils.remove(e.data.getVar("CVE_CHECK_SUMMARY_INDEX_PATH"))
-}
-
-addhandler cve_check_cleanup
-cve_check_cleanup[eventmask] = "bb.event.BuildCompleted"
-
-python cve_check_write_rootfs_manifest () {
- """
- Create CVE manifest when building an image
- """
-
- import shutil
- import json
- from oe.rootfs import image_list_installed_packages
- from oe.cve_check import cve_check_merge_jsons, update_symlinks
-
- if d.getVar("CVE_CHECK_COPY_FILES") == "1":
- deploy_file_json = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
- if os.path.exists(deploy_file_json):
- bb.utils.remove(deploy_file_json)
-
- # Create a list of relevant recipies
- recipies = set()
- for pkg in list(image_list_installed_packages(d)):
- pkg_info = os.path.join(d.getVar('PKGDATA_DIR'),
- 'runtime-reverse', pkg)
- pkg_data = oe.packagedata.read_pkgdatafile(pkg_info)
- recipies.add(pkg_data["PN"])
-
- bb.note("Writing rootfs CVE manifest")
- deploy_dir = d.getVar("IMGDEPLOYDIR")
- link_name = d.getVar("IMAGE_LINK_NAME")
-
- json_data = {"version":"1", "package": []}
- text_data = ""
- enable_json = d.getVar("CVE_CHECK_FORMAT_JSON") == "1"
-
- save_pn = d.getVar("PN")
-
- for pkg in recipies:
- # To be able to use the CVE_CHECK_RECIPE_FILE_JSON variable we have to evaluate
- # it with the different PN names set each time.
- d.setVar("PN", pkg)
-
- if enable_json:
- pkgfilepath = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
- if os.path.exists(pkgfilepath):
- with open(pkgfilepath) as j:
- data = json.load(j)
- cve_check_merge_jsons(json_data, data)
-
- d.setVar("PN", save_pn)
-
- if enable_json:
- manifest_name_suffix = d.getVar("CVE_CHECK_MANIFEST_JSON_SUFFIX")
- manifest_name = d.getVar("CVE_CHECK_MANIFEST_JSON")
-
- with open(manifest_name, "w") as f:
- json.dump(json_data, f, indent=2)
-
- if link_name:
- link_path = os.path.join(deploy_dir, "%s.%s" % (link_name, manifest_name_suffix))
- update_symlinks(manifest_name, link_path)
-
- bb.plain("Image CVE JSON report stored in: %s" % manifest_name)
-}
-
-ROOTFS_POSTPROCESS_COMMAND:prepend = "${@'cve_check_write_rootfs_manifest ' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
-do_rootfs[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
-do_populate_sdk[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
-
-def cve_is_ignored(d, cve_data, cve):
- if cve not in cve_data:
- return False
- if cve_data[cve]['abbrev-status'] == "Ignored":
- return True
- return False
-
-def cve_is_patched(d, cve_data, cve):
- if cve not in cve_data:
- return False
- if cve_data[cve]['abbrev-status'] == "Patched":
- return True
- return False
-
-def cve_update(d, cve_data, cve, entry):
- # If no entry, just add it
- if cve not in cve_data:
- cve_data[cve] = entry
- return
- # If we are updating, there might be change in the status
- bb.debug(1, "Trying CVE entry update for %s from %s to %s" % (cve, cve_data[cve]['abbrev-status'], entry['abbrev-status']))
- if cve_data[cve]['abbrev-status'] == "Unknown":
- cve_data[cve] = entry
- return
- if cve_data[cve]['abbrev-status'] == entry['abbrev-status']:
- return
- # Update like in {'abbrev-status': 'Patched', 'status': 'version-not-in-range'} to {'abbrev-status': 'Unpatched', 'status': 'version-in-range'}
- if entry['abbrev-status'] == "Unpatched" and cve_data[cve]['abbrev-status'] == "Patched":
- if entry['status'] == "version-in-range" and cve_data[cve]['status'] == "version-not-in-range":
- # New result from the scan, vulnerable
- cve_data[cve] = entry
- bb.debug(1, "CVE entry %s update from Patched to Unpatched from the scan result" % cve)
- return
- if entry['abbrev-status'] == "Patched" and cve_data[cve]['abbrev-status'] == "Unpatched":
- if entry['status'] == "version-not-in-range" and cve_data[cve]['status'] == "version-in-range":
- # Range does not match the scan, but we already have a vulnerable match, ignore
- bb.debug(1, "CVE entry %s update from Patched to Unpatched from the scan result - not applying" % cve)
- return
- # If we have an "Ignored", it has a priority
- if cve_data[cve]['abbrev-status'] == "Ignored":
- bb.debug(1, "CVE %s not updating because Ignored" % cve)
- return
- bb.warn("Unhandled CVE entry update for %s from %s to %s" % (cve, cve_data[cve], entry))
-
-def check_cves(d, cve_data):
- """
- Connect to the NVD database and find unpatched cves.
- """
- from oe.cve_check import Version, convert_cve_version, decode_cve_status
-
- pn = d.getVar("PN")
- real_pv = d.getVar("PV")
- suffix = d.getVar("CVE_VERSION_SUFFIX")
-
- cves_status = []
- cves_in_recipe = False
- # CVE_PRODUCT can contain more than one product (eg. curl/libcurl)
- products = d.getVar("CVE_PRODUCT").split()
- # If this has been unset then we're not scanning for CVEs here (for example, image recipes)
- if not products:
- return ([], [])
- pv = d.getVar("CVE_VERSION").split("+git")[0]
-
- # If the recipe has been skipped/ignored we return empty lists
- if pn in d.getVar("CVE_CHECK_SKIP_RECIPE").split():
- bb.note("Recipe has been skipped by cve-check")
- return ([], [])
-
- import sqlite3
- db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro")
- conn = sqlite3.connect(db_file, uri=True)
-
- # For each of the known product names (e.g. curl has CPEs using curl and libcurl)...
- for product in products:
- cves_in_product = False
- if ":" in product:
- vendor, product = product.split(":", 1)
- else:
- vendor = "%"
-
- # Find all relevant CVE IDs.
- cve_cursor = conn.execute("SELECT DISTINCT ID FROM PRODUCTS WHERE PRODUCT IS ? AND VENDOR LIKE ?", (product, vendor))
- for cverow in cve_cursor:
- cve = cverow[0]
-
- # Write status once only for each product
- if not cves_in_product:
- cves_status.append([product, True])
- cves_in_product = True
- cves_in_recipe = True
-
- if cve_is_ignored(d, cve_data, cve):
- bb.note("%s-%s ignores %s" % (product, pv, cve))
- continue
- elif cve_is_patched(d, cve_data, cve):
- bb.note("%s has been patched" % (cve))
- continue
-
- vulnerable = False
- ignored = False
-
- product_cursor = conn.execute("SELECT * FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor))
- for row in product_cursor:
- (_, _, _, version_start, operator_start, version_end, operator_end) = row
- #bb.debug(2, "Evaluating row " + str(row))
- if cve_is_ignored(d, cve_data, cve):
- ignored = True
-
- version_start = convert_cve_version(version_start)
- version_end = convert_cve_version(version_end)
-
- if (operator_start == '=' and pv == version_start) or version_start == '-':
- vulnerable = True
- else:
- if operator_start:
- try:
- vulnerable_start = (operator_start == '>=' and Version(pv,suffix) >= Version(version_start,suffix))
- vulnerable_start |= (operator_start == '>' and Version(pv,suffix) > Version(version_start,suffix))
- except:
- bb.warn("%s: Failed to compare %s %s %s for %s" %
- (product, pv, operator_start, version_start, cve))
- vulnerable_start = False
- else:
- vulnerable_start = False
-
- if operator_end:
- try:
- vulnerable_end = (operator_end == '<=' and Version(pv,suffix) <= Version(version_end,suffix) )
- vulnerable_end |= (operator_end == '<' and Version(pv,suffix) < Version(version_end,suffix) )
- except:
- bb.warn("%s: Failed to compare %s %s %s for %s" %
- (product, pv, operator_end, version_end, cve))
- vulnerable_end = False
- else:
- vulnerable_end = False
-
- if operator_start and operator_end:
- vulnerable = vulnerable_start and vulnerable_end
- else:
- vulnerable = vulnerable_start or vulnerable_end
-
- if vulnerable:
- if ignored:
- bb.note("%s is ignored in %s-%s" % (cve, pn, real_pv))
- cve_update(d, cve_data, cve, {"abbrev-status": "Ignored"})
- else:
- bb.note("%s-%s is vulnerable to %s" % (pn, real_pv, cve))
- cve_update(d, cve_data, cve, {"abbrev-status": "Unpatched", "status": "version-in-range"})
- break
- product_cursor.close()
-
- if not vulnerable:
- bb.note("%s-%s is not vulnerable to %s" % (pn, real_pv, cve))
- cve_update(d, cve_data, cve, {"abbrev-status": "Patched", "status": "version-not-in-range"})
- cve_cursor.close()
-
- if not cves_in_product:
- bb.note("No CVE records found for product %s, pn %s" % (product, pn))
- cves_status.append([product, False])
-
- conn.close()
-
- if not cves_in_recipe:
- bb.note("No CVE records for products in recipe %s" % (pn))
-
- if d.getVar("CVE_CHECK_SHOW_WARNINGS") == "1":
- unpatched_cves = [cve for cve in cve_data if cve_data[cve]["abbrev-status"] == "Unpatched"]
- if unpatched_cves:
- bb.warn("Found unpatched CVE (%s)" % " ".join(unpatched_cves))
-
- return (cve_data, cves_status)
-
-def get_cve_info(d, cve_data):
- """
- Get CVE information from the database.
- """
-
- import sqlite3
-
- db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro")
- conn = sqlite3.connect(db_file, uri=True)
-
- for cve in cve_data:
- cursor = conn.execute("SELECT * FROM NVD WHERE ID IS ?", (cve,))
- for row in cursor:
- # The CVE itdelf has been added already
- if row[0] not in cve_data:
- bb.note("CVE record %s not present" % row[0])
- continue
- #cve_data[row[0]] = {}
- cve_data[row[0]]["NVD-summary"] = row[1]
- cve_data[row[0]]["NVD-scorev2"] = row[2]
- cve_data[row[0]]["NVD-scorev3"] = row[3]
- cve_data[row[0]]["NVD-scorev4"] = row[4]
- cve_data[row[0]]["NVD-modified"] = row[5]
- cve_data[row[0]]["NVD-vector"] = row[6]
- cve_data[row[0]]["NVD-vectorString"] = row[7]
- cursor.close()
- conn.close()
-
-def cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file):
- """
- Write CVE information in the JSON format: to WORKDIR; and to
- CVE_CHECK_DIR, if CVE manifest if enabled, write fragment
- files that will be assembled at the end in cve_check_write_rootfs_manifest.
- """
-
- import json
-
- write_string = json.dumps(output, indent=2)
- with open(direct_file, "w") as f:
- bb.note("Writing file %s with CVE information" % direct_file)
- f.write(write_string)
-
- if d.getVar("CVE_CHECK_COPY_FILES") == "1":
- bb.utils.mkdirhier(os.path.dirname(deploy_file))
- with open(deploy_file, "w") as f:
- f.write(write_string)
-
- if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1":
- cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR")
- index_path = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")
- bb.utils.mkdirhier(cvelogpath)
- fragment_file = os.path.basename(deploy_file)
- fragment_path = os.path.join(cvelogpath, fragment_file)
- with open(fragment_path, "w") as f:
- f.write(write_string)
- with open(index_path, "a+") as f:
- f.write("%s\n" % fragment_path)
-
-def cve_write_data_json(d, cve_data, cve_status):
- """
- Prepare CVE data for the JSON format, then write it.
- """
-
- output = {"version":"1", "package": []}
- nvd_link = "https://nvd.nist.gov/vuln/detail/"
-
- fdir_name = d.getVar("FILE_DIRNAME")
- layer = fdir_name.split("/")[-3]
-
- include_layers = d.getVar("CVE_CHECK_LAYER_INCLUDELIST").split()
- exclude_layers = d.getVar("CVE_CHECK_LAYER_EXCLUDELIST").split()
-
- report_all = d.getVar("CVE_CHECK_REPORT_PATCHED") == "1"
-
- if exclude_layers and layer in exclude_layers:
- return
-
- if include_layers and layer not in include_layers:
- return
-
- product_data = []
- for s in cve_status:
- p = {"product": s[0], "cvesInRecord": "Yes"}
- if s[1] == False:
- p["cvesInRecord"] = "No"
- product_data.append(p)
-
- package_version = "%s%s" % (d.getVar("EXTENDPE"), d.getVar("PV"))
- package_data = {
- "name" : d.getVar("PN"),
- "layer" : layer,
- "version" : package_version,
- "products": product_data
- }
-
- cve_list = []
-
- for cve in sorted(cve_data):
- if not report_all and (cve_data[cve]["abbrev-status"] == "Patched" or cve_data[cve]["abbrev-status"] == "Ignored"):
- continue
- issue_link = "%s%s" % (nvd_link, cve)
-
- cve_item = {
- "id" : cve,
- "status" : cve_data[cve]["abbrev-status"],
- "link": issue_link,
- }
- if 'NVD-summary' in cve_data[cve]:
- cve_item["summary"] = cve_data[cve]["NVD-summary"]
- cve_item["scorev2"] = cve_data[cve]["NVD-scorev2"]
- cve_item["scorev3"] = cve_data[cve]["NVD-scorev3"]
- cve_item["scorev4"] = cve_data[cve]["NVD-scorev4"]
- cve_item["modified"] = cve_data[cve]["NVD-modified"]
- cve_item["vector"] = cve_data[cve]["NVD-vector"]
- cve_item["vectorString"] = cve_data[cve]["NVD-vectorString"]
- if 'status' in cve_data[cve]:
- cve_item["detail"] = cve_data[cve]["status"]
- if 'justification' in cve_data[cve]:
- cve_item["description"] = cve_data[cve]["justification"]
- if 'resource' in cve_data[cve]:
- cve_item["patch-file"] = cve_data[cve]["resource"]
- cve_list.append(cve_item)
-
- package_data["issue"] = cve_list
- output["package"].append(package_data)
-
- direct_file = d.getVar("CVE_CHECK_LOG_JSON")
- deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
- manifest_file = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON")
-
- cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file)
-
-def cve_write_data(d, cve_data, status):
- """
- Write CVE data in each enabled format.
- """
-
- if d.getVar("CVE_CHECK_FORMAT_JSON") == "1":
- cve_write_data_json(d, cve_data, status)
RECIPE_MAINTAINER:pn-cryptodev-tests = "Robert Yang <liezhi.yang@windriver.com>"
RECIPE_MAINTAINER:pn-cups = "Chen Qi <Qi.Chen@windriver.com>"
RECIPE_MAINTAINER:pn-curl = "Robert Joslyn <robert.joslyn@redrectangle.org>"
-RECIPE_MAINTAINER:pn-cve-update-nvd2-native = "Ross Burton <ross.burton@arm.com>"
RECIPE_MAINTAINER:pn-db = "Unassigned <unassigned@yoctoproject.org>"
RECIPE_MAINTAINER:pn-dbus = "Chen Qi <Qi.Chen@windriver.com>"
RECIPE_MAINTAINER:pn-dbus-glib = "Chen Qi <Qi.Chen@windriver.com>"
CORE_IMAGE_EXTRA_INSTALL[doc] = "Specifies the list of packages to be added to the image. You should only set this variable in the conf/local.conf file in the Build Directory."
COREBASE[doc] = "Specifies the parent directory of the OpenEmbedded Core Metadata layer (i.e. meta)."
CONF_VERSION[doc] = "Tracks the version of local.conf. Increased each time build/conf/ changes incompatibly."
-CVE_CHECK_LAYER_EXCLUDELIST[doc] = "Defines which layers to exclude from cve-check scanning"
-CVE_CHECK_LAYER_INCLUDELIST[doc] = "Defines which layers to include during cve-check scanning"
#D
# SPDX-License-Identifier: MIT
#
-import json
-import os
from oeqa.selftest.case import OESelftestTestCase
-from oeqa.utils.commands import bitbake, get_bb_vars
class CVECheck(OESelftestTestCase):
),
{"CVE-2019-6461", "CVE-2019-6462", "CVE-2019-6463", "CVE-2019-6464"},
)
-
- def test_recipe_report_json(self):
- config = """
-INHERIT += "cve-check"
-CVE_CHECK_FORMAT_JSON = "1"
-"""
- self.write_config(config)
-
- vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "m4-native_cve.json")
-
- try:
- os.remove(summary_json)
- os.remove(recipe_json)
- except FileNotFoundError:
- pass
-
- bitbake("m4-native -c cve_check")
-
- def check_m4_json(filename):
- with open(filename) as f:
- report = json.load(f)
- self.assertEqual(report["version"], "1")
- self.assertEqual(len(report["package"]), 1)
- package = report["package"][0]
- self.assertEqual(package["name"], "m4-native")
- found_cves = { issue["id"]: issue["status"] for issue in package["issue"]}
- self.assertIn("CVE-2008-1687", found_cves)
- self.assertEqual(found_cves["CVE-2008-1687"], "Patched")
-
- self.assertExists(summary_json)
- check_m4_json(summary_json)
- self.assertExists(recipe_json)
- check_m4_json(recipe_json)
-
-
- def test_image_json(self):
- config = """
-INHERIT += "cve-check"
-CVE_CHECK_FORMAT_JSON = "1"
-"""
- self.write_config(config)
-
- vars = get_bb_vars(["CVE_CHECK_DIR", "CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- report_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- print(report_json)
- try:
- os.remove(report_json)
- except FileNotFoundError:
- pass
-
- bitbake("core-image-minimal-initramfs")
- self.assertExists(report_json)
-
- # Check that the summary report lists at least one package
- with open(report_json) as f:
- report = json.load(f)
- self.assertEqual(report["version"], "1")
- self.assertGreater(len(report["package"]), 1)
-
- # Check that a random recipe wrote a recipe report to deploy/cve/
- recipename = report["package"][0]["name"]
- recipe_report = os.path.join(vars["CVE_CHECK_DIR"], recipename + "_cve.json")
- self.assertExists(recipe_report)
- with open(recipe_report) as f:
- report = json.load(f)
- self.assertEqual(report["version"], "1")
- self.assertEqual(len(report["package"]), 1)
- self.assertEqual(report["package"][0]["name"], recipename)
-
-
- def test_recipe_report_json_unpatched(self):
- config = """
-INHERIT += "cve-check"
-CVE_CHECK_FORMAT_JSON = "1"
-CVE_CHECK_REPORT_PATCHED = "0"
-"""
- self.write_config(config)
-
- vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "m4-native_cve.json")
-
- try:
- os.remove(summary_json)
- os.remove(recipe_json)
- except FileNotFoundError:
- pass
-
- bitbake("m4-native -c cve_check")
-
- def check_m4_json(filename):
- with open(filename) as f:
- report = json.load(f)
- self.assertEqual(report["version"], "1")
- self.assertEqual(len(report["package"]), 1)
- package = report["package"][0]
- self.assertEqual(package["name"], "m4-native")
- #m4 had only Patched CVEs, so the issues array will be empty
- self.assertEqual(package["issue"], [])
-
- self.assertExists(summary_json)
- check_m4_json(summary_json)
- self.assertExists(recipe_json)
- check_m4_json(recipe_json)
-
-
- def test_recipe_report_json_ignored(self):
- config = """
-INHERIT += "cve-check"
-CVE_CHECK_FORMAT_JSON = "1"
-CVE_CHECK_REPORT_PATCHED = "1"
-"""
- self.write_config(config)
-
- vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"])
- recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "logrotate_cve.json")
-
- try:
- os.remove(summary_json)
- os.remove(recipe_json)
- except FileNotFoundError:
- pass
-
- bitbake("logrotate -c cve_check")
-
- def check_m4_json(filename):
- with open(filename) as f:
- report = json.load(f)
- self.assertEqual(report["version"], "1")
- self.assertEqual(len(report["package"]), 1)
- package = report["package"][0]
- self.assertEqual(package["name"], "logrotate")
- found_cves = {}
- for issue in package["issue"]:
- found_cves[issue["id"]] = {
- "status" : issue["status"],
- "detail" : issue["detail"] if "detail" in issue else "",
- "description" : issue["description"] if "description" in issue else ""
- }
- # m4 CVE should not be in logrotate
- self.assertNotIn("CVE-2008-1687", found_cves)
- # logrotate has both Patched and Ignored CVEs
- detail = "version-not-in-range"
- self.assertIn("CVE-2011-1098", found_cves)
- self.assertEqual(found_cves["CVE-2011-1098"]["status"], "Patched")
- self.assertEqual(found_cves["CVE-2011-1098"]["detail"], detail)
- self.assertEqual(len(found_cves["CVE-2011-1098"]["description"]), 0)
- detail = "not-applicable-platform"
- description = "CVE is debian, gentoo or SUSE specific on the way logrotate was installed/used"
- self.assertIn("CVE-2011-1548", found_cves)
- self.assertEqual(found_cves["CVE-2011-1548"]["status"], "Ignored")
- self.assertEqual(found_cves["CVE-2011-1548"]["detail"], detail)
- self.assertEqual(found_cves["CVE-2011-1548"]["description"], description)
- self.assertIn("CVE-2011-1549", found_cves)
- self.assertEqual(found_cves["CVE-2011-1549"]["status"], "Ignored")
- self.assertEqual(found_cves["CVE-2011-1549"]["detail"], detail)
- self.assertEqual(found_cves["CVE-2011-1549"]["description"], description)
- self.assertIn("CVE-2011-1550", found_cves)
- self.assertEqual(found_cves["CVE-2011-1550"]["status"], "Ignored")
- self.assertEqual(found_cves["CVE-2011-1550"]["detail"], detail)
- self.assertEqual(found_cves["CVE-2011-1550"]["description"], description)
-
- self.assertExists(summary_json)
- check_m4_json(summary_json)
- self.assertExists(recipe_json)
- check_m4_json(recipe_json)
+++ /dev/null
-SUMMARY = "Updates the NVD CVE database"
-LICENSE = "MIT"
-
-INHIBIT_DEFAULT_DEPS = "1"
-
-inherit native
-
-deltask do_patch
-deltask do_configure
-deltask do_compile
-deltask do_install
-deltask do_populate_sysroot
-
-NVDCVE_URL ?= "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-"
-FKIE_URL ?= "https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download/CVE-"
-
-# CVE database update interval, in seconds. By default: once a day (23*60*60).
-# Use 0 to force the update
-# Use a negative value to skip the update
-CVE_DB_UPDATE_INTERVAL ?= "82800"
-
-# Timeout for blocking socket operations, such as the connection attempt.
-CVE_SOCKET_TIMEOUT ?= "60"
-
-CVE_CHECK_DB_DLDIR_FILE ?= "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}"
-CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock"
-CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp"
-
-python () {
- if not bb.data.inherits_class("cve-check", d):
- raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.")
-}
-
-python do_fetch() {
- """
- Update NVD database with json data feed
- """
- import bb.utils
- import bb.progress
- import shutil
-
- bb.utils.export_proxies(d)
-
- db_file = d.getVar("CVE_CHECK_DB_DLDIR_FILE")
- db_dir = os.path.dirname(db_file)
- db_tmp_file = d.getVar("CVE_CHECK_DB_TEMP_FILE")
-
- cleanup_db_download(db_tmp_file)
-
- # The NVD database changes once a day, so no need to update more frequently
- # Allow the user to force-update
- try:
- import time
- update_interval = int(d.getVar("CVE_DB_UPDATE_INTERVAL"))
- if update_interval < 0:
- bb.note("CVE database update skipped")
- if not os.path.exists(db_file):
- bb.error("CVE database %s not present, database fetch/update skipped" % db_file)
- return
- curr_time = time.time()
- database_time = os.path.getmtime(db_file)
- bb.note("Current time: %s; DB time: %s" % (time.ctime(curr_time), time.ctime(database_time)))
- if curr_time < database_time:
- bb.warn("Database time is in the future, force DB update")
- elif curr_time - database_time < update_interval:
- bb.note("CVE database recently updated, skipping")
- return
-
- except OSError:
- pass
-
- if bb.utils.to_boolean(d.getVar("BB_NO_NETWORK")):
- bb.error("BB_NO_NETWORK attempted to disable fetch, this recipe uses CVE_DB_UPDATE_INTERVAL to control download, set to '-1' to disable fetch or update")
-
- bb.utils.mkdirhier(db_dir)
- bb.utils.mkdirhier(os.path.dirname(db_tmp_file))
- if os.path.exists(db_file):
- shutil.copy2(db_file, db_tmp_file)
-
- if update_db_file(db_tmp_file, d):
- # Update downloaded correctly, we can swap files. To avoid potential
- # NFS caching issues, ensure that the destination file has a new inode
- # number. We do this in two steps as the downloads directory may be on
- # a different filesystem to tmpdir we're working in.
- new_file = "%s.new" % (db_file)
- shutil.move(db_tmp_file, new_file)
- os.rename(new_file, db_file)
- else:
- # Update failed, do not modify the database
- bb.warn("CVE database update failed")
- os.remove(db_tmp_file)
-}
-
-do_fetch[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK}"
-do_fetch[file-checksums] = ""
-do_fetch[vardeps] = ""
-
-python do_unpack() {
- import shutil
- shutil.copyfile(d.getVar("CVE_CHECK_DB_DLDIR_FILE"), d.getVar("CVE_CHECK_DB_FILE"))
-}
-do_unpack[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK} ${CVE_CHECK_DB_FILE_LOCK}"
-
-def cleanup_db_download(db_tmp_file):
- """
- Cleanup the download space from possible failed downloads
- """
-
- # Clean-up the temporary file downloads, we can remove both journal
- # and the temporary database
- if os.path.exists("{0}-journal".format(db_tmp_file)):
- os.remove("{0}-journal".format(db_tmp_file))
- if os.path.exists(db_tmp_file):
- os.remove(db_tmp_file)
-
-def db_file_names(d, year, is_nvd):
- if is_nvd:
- year_url = d.getVar('NVDCVE_URL') + str(year)
- meta_url = year_url + ".meta"
- json_url = year_url + ".json.gz"
- return json_url, meta_url
- year_url = d.getVar('FKIE_URL') + str(year)
- meta_url = year_url + ".meta"
- json_url = year_url + ".json.xz"
- return json_url, meta_url
-
-def host_db_name(d, is_nvd):
- if is_nvd:
- return "nvd.nist.gov"
- return "github.com"
-
-def db_decompress(d, data, is_nvd):
- import gzip, lzma
-
- if is_nvd:
- return gzip.decompress(data).decode('utf-8')
- # otherwise
- return lzma.decompress(data)
-
-def update_db_file(db_tmp_file, d):
- """
- Update the given database file
- """
- import bb.progress
- import bb.utils
- from datetime import date
- import sqlite3
- import urllib
-
- YEAR_START = 2002
- cve_socket_timeout = int(d.getVar("CVE_SOCKET_TIMEOUT"))
- is_nvd = d.getVar("NVD_DB_VERSION") == "NVD1"
-
- # Connect to database
- conn = sqlite3.connect(db_tmp_file)
- initialize_db(conn)
-
- with bb.progress.ProgressHandler(d) as ph, open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a') as cve_f:
- total_years = date.today().year + 1 - YEAR_START
- for i, year in enumerate(range(YEAR_START, date.today().year + 1)):
- bb.note("Updating %d" % year)
- ph.update((float(i + 1) / total_years) * 100)
- json_url, meta_url = db_file_names(d, year, is_nvd)
-
- # Retrieve meta last modified date
- try:
- response = urllib.request.urlopen(meta_url, timeout=cve_socket_timeout)
- except urllib.error.URLError as e:
- cve_f.write('Warning: CVE db update error, Unable to fetch CVE data.\n\n')
- bb.warn("Failed to fetch CVE data (%s)" % e)
- import socket
- result = socket.getaddrinfo(host_db_name(d, is_nvd), 443, proto=socket.IPPROTO_TCP)
- bb.warn("Host IPs are %s" % (", ".join(t[4][0] for t in result)))
- return False
-
- if response:
- for line in response.read().decode("utf-8").splitlines():
- key, value = line.split(":", 1)
- if key == "lastModifiedDate":
- last_modified = value
- break
- else:
- bb.warn("Cannot parse CVE metadata, update failed")
- return False
-
- # Compare with current db last modified date
- cursor = conn.execute("select DATE from META where YEAR = ?", (year,))
- meta = cursor.fetchone()
- cursor.close()
-
- if not meta or meta[0] != last_modified:
- bb.note("Updating entries")
- # Clear products table entries corresponding to current year
- conn.execute("delete from PRODUCTS where ID like ?", ('CVE-%d%%' % year,)).close()
-
- # Update db with current year json file
- try:
- response = urllib.request.urlopen(json_url, timeout=cve_socket_timeout)
- if response:
- update_db(d, conn, db_decompress(d, response.read(), is_nvd))
- conn.execute("insert or replace into META values (?, ?)", [year, last_modified]).close()
- except urllib.error.URLError as e:
- cve_f.write('Warning: CVE db update error, CVE data is outdated.\n\n')
- bb.warn("Cannot parse CVE data (%s), update failed" % e.reason)
- return False
- else:
- bb.debug(2, "Already up to date (last modified %s)" % last_modified)
- # Update success, set the date to cve_check file.
- if year == date.today().year:
- cve_f.write('CVE database update : %s\n\n' % date.today())
-
- conn.commit()
- conn.close()
- return True
-
-def initialize_db(conn):
- with conn:
- c = conn.cursor()
-
- c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER UNIQUE, DATE TEXT)")
-
- c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, SUMMARY TEXT, \
- SCOREV2 TEXT, SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)")
-
- c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \
- VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \
- VERSION_END TEXT, OPERATOR_END TEXT)")
- c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);")
-
- c.close()
-
-def parse_node_and_insert(conn, node, cveId, is_nvd):
- # Parse children node if needed
- for child in node.get('children', ()):
- parse_node_and_insert(conn, child, cveId, is_nvd)
-
- def cpe_generator(is_nvd):
- match_string = "cpeMatch"
- cpe_string = 'criteria'
- if is_nvd:
- match_string = "cpe_match"
- cpe_string = 'cpe23Uri'
-
- for cpe in node.get(match_string, ()):
- if not cpe['vulnerable']:
- return
- cpe23 = cpe.get(cpe_string)
- if not cpe23:
- return
- cpe23 = cpe23.split(':')
- if len(cpe23) < 6:
- return
- vendor = cpe23[3]
- product = cpe23[4]
- version = cpe23[5]
-
- if cpe23[6] == '*' or cpe23[6] == '-':
- version_suffix = ""
- else:
- version_suffix = "_" + cpe23[6]
-
- if version != '*' and version != '-':
- # Version is defined, this is a '=' match
- yield [cveId, vendor, product, version + version_suffix, '=', '', '']
- elif version == '-':
- # no version information is available
- yield [cveId, vendor, product, version, '', '', '']
- else:
- # Parse start version, end version and operators
- op_start = ''
- op_end = ''
- v_start = ''
- v_end = ''
-
- if 'versionStartIncluding' in cpe:
- op_start = '>='
- v_start = cpe['versionStartIncluding']
-
- if 'versionStartExcluding' in cpe:
- op_start = '>'
- v_start = cpe['versionStartExcluding']
-
- if 'versionEndIncluding' in cpe:
- op_end = '<='
- v_end = cpe['versionEndIncluding']
-
- if 'versionEndExcluding' in cpe:
- op_end = '<'
- v_end = cpe['versionEndExcluding']
-
- if op_start or op_end or v_start or v_end:
- yield [cveId, vendor, product, v_start, op_start, v_end, op_end]
- else:
- # This is no version information, expressed differently.
- # Save processing by representing as -.
- yield [cveId, vendor, product, '-', '', '', '']
-
- conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator(is_nvd)).close()
-
-def update_db_nvdjson(conn, jsondata):
- import json
- root = json.loads(jsondata)
-
- for elt in root['CVE_Items']:
- if not elt['impact']:
- continue
-
- accessVector = None
- vectorString = None
- cvssv2 = 0.0
- cvssv3 = 0.0
- cvssv4 = 0.0
- cveId = elt['cve']['CVE_data_meta']['ID']
- cveDesc = elt['cve']['description']['description_data'][0]['value']
- date = elt['lastModifiedDate']
- try:
- accessVector = elt['impact']['baseMetricV2']['cvssV2']['accessVector']
- vectorString = elt['impact']['baseMetricV2']['cvssV2']['vectorString']
- cvssv2 = elt['impact']['baseMetricV2']['cvssV2']['baseScore']
- except KeyError:
- cvssv2 = 0.0
- try:
- accessVector = accessVector or elt['impact']['baseMetricV3']['cvssV3']['attackVector']
- vectorString = vectorString or elt['impact']['baseMetricV3']['cvssV3']['vectorString']
- cvssv3 = elt['impact']['baseMetricV3']['cvssV3']['baseScore']
- except KeyError:
- accessVector = accessVector or "UNKNOWN"
- cvssv3 = 0.0
-
- conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)",
- [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close()
-
- configurations = elt['configurations']['nodes']
- for config in configurations:
- parse_node_and_insert(conn, config, cveId, True)
-
-def get_metric_entry(metric):
- primaries = [c for c in metric if c['type'] == "Primary"]
- secondaries = [c for c in metric if c['type'] == "Secondary"]
- if len(primaries) > 0:
- return primaries[0]
- elif len(secondaries) > 0:
- return secondaries[0]
- return None
-
-def update_db_fkie(conn, jsondata):
- import json
- root = json.loads(jsondata)
-
- for elt in root['cve_items']:
- if 'vulnStatus' not in elt or elt['vulnStatus'] == 'Rejected':
- continue
-
- if 'configurations' not in elt:
- continue
-
- accessVector = None
- vectorString = None
- cvssv2 = 0.0
- cvssv3 = 0.0
- cvssv4 = 0.0
- cveId = elt['id']
- cveDesc = elt['descriptions'][0]['value']
- date = elt['lastModified']
- try:
- if 'cvssMetricV2' in elt['metrics']:
- entry = get_metric_entry(elt['metrics']['cvssMetricV2'])
- if entry:
- accessVector = entry['cvssData']['accessVector']
- vectorString = entry['cvssData']['vectorString']
- cvssv2 = entry['cvssData']['baseScore']
- except KeyError:
- cvssv2 = 0.0
- try:
- if 'cvssMetricV30' in elt['metrics']:
- entry = get_metric_entry(elt['metrics']['cvssMetricV30'])
- if entry:
- accessVector = entry['cvssData']['attackVector']
- vectorString = entry['cvssData']['vectorString']
- cvssv3 = entry['cvssData']['baseScore']
- except KeyError:
- accessVector = accessVector or "UNKNOWN"
- cvssv3 = 0.0
- try:
- if 'cvssMetricV31' in elt['metrics']:
- entry = get_metric_entry(elt['metrics']['cvssMetricV31'])
- if entry:
- accessVector = entry['cvssData']['attackVector']
- vectorString = entry['cvssData']['vectorString']
- cvssv3 = entry['cvssData']['baseScore']
- except KeyError:
- accessVector = accessVector or "UNKNOWN"
- cvssv3 = 0.0
- try:
- if 'cvssMetricV40' in elt['metrics']:
- entry = get_metric_entry(elt['metrics']['cvssMetricV40'])
- if entry:
- accessVector = entry['cvssData']['attackVector']
- vectorString = entry['cvssData']['vectorString']
- cvssv4 = entry['cvssData']['baseScore']
- except KeyError:
- accessVector = accessVector or "UNKNOWN"
- cvssv4 = 0.0
-
- conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)",
- [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close()
-
- for config in elt['configurations']:
- # This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing
- for node in config.get("nodes") or []:
- parse_node_and_insert(conn, node, cveId, False)
-
-def update_db(d, conn, jsondata):
- if (d.getVar("NVD_DB_VERSION") == "FKIE"):
- return update_db_fkie(conn, jsondata)
- else:
- return update_db_nvdjson(conn, jsondata)
-
-do_fetch[nostamp] = "1"
-
-EXCLUDE_FROM_WORLD = "1"
+++ /dev/null
-SUMMARY = "Updates the NVD CVE database"
-LICENSE = "MIT"
-
-# Important note:
-# This product uses the NVD API but is not endorsed or certified by the NVD.
-
-INHIBIT_DEFAULT_DEPS = "1"
-
-inherit native
-
-deltask do_patch
-deltask do_configure
-deltask do_compile
-deltask do_install
-deltask do_populate_sysroot
-
-NVDCVE_URL ?= "https://services.nvd.nist.gov/rest/json/cves/2.0"
-
-# If you have a NVD API key (https://nvd.nist.gov/developers/request-an-api-key)
-# then setting this to get higher rate limits.
-NVDCVE_API_KEY ?= ""
-
-# CVE database update interval, in seconds. By default: once a day (23*60*60).
-# Use 0 to force the update
-# Use a negative value to skip the update
-CVE_DB_UPDATE_INTERVAL ?= "82800"
-
-# CVE database incremental update age threshold, in seconds. If the database is
-# older than this threshold, do a full re-download, else, do an incremental
-# update. By default: the maximum allowed value from NVD: 120 days (120*24*60*60)
-# Use 0 to force a full download.
-CVE_DB_INCR_UPDATE_AGE_THRES ?= "10368000"
-
-# Number of attempts for each http query to nvd server before giving up
-CVE_DB_UPDATE_ATTEMPTS ?= "5"
-
-CVE_CHECK_DB_DLDIR_FILE ?= "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}"
-CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock"
-CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp"
-
-python () {
- if not bb.data.inherits_class("cve-check", d):
- raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.")
-}
-
-python do_fetch() {
- """
- Update NVD database with API 2.0
- """
- import bb.utils
- import bb.progress
- import shutil
-
- bb.utils.export_proxies(d)
-
- db_file = d.getVar("CVE_CHECK_DB_DLDIR_FILE")
- db_dir = os.path.dirname(db_file)
- db_tmp_file = d.getVar("CVE_CHECK_DB_TEMP_FILE")
-
- cleanup_db_download(db_tmp_file)
- # By default let's update the whole database (since time 0)
- database_time = 0
-
- # The NVD database changes once a day, so no need to update more frequently
- # Allow the user to force-update
- try:
- import time
- update_interval = int(d.getVar("CVE_DB_UPDATE_INTERVAL"))
- if update_interval < 0:
- bb.note("CVE database update skipped")
- if not os.path.exists(db_file):
- bb.error("CVE database %s not present, database fetch/update skipped" % db_file)
- return
- curr_time = time.time()
- database_time = os.path.getmtime(db_file)
- bb.note("Current time: %s; DB time: %s" % (time.ctime(curr_time), time.ctime(database_time)))
- if curr_time < database_time:
- bb.warn("Database time is in the future, force DB update")
- database_time = 0
- elif curr_time - database_time < update_interval:
- bb.note("CVE database recently updated, skipping")
- return
-
- except OSError:
- pass
-
- if bb.utils.to_boolean(d.getVar("BB_NO_NETWORK")):
- bb.error("BB_NO_NETWORK attempted to disable fetch, this recipe uses CVE_DB_UPDATE_INTERVAL to control download, set to '-1' to disable fetch or update")
-
- bb.utils.mkdirhier(db_dir)
- bb.utils.mkdirhier(os.path.dirname(db_tmp_file))
- if os.path.exists(db_file):
- shutil.copy2(db_file, db_tmp_file)
-
- if update_db_file(db_tmp_file, d, database_time):
- # Update downloaded correctly, we can swap files. To avoid potential
- # NFS caching issues, ensure that the destination file has a new inode
- # number. We do this in two steps as the downloads directory may be on
- # a different filesystem to tmpdir we're working in.
- new_file = "%s.new" % (db_file)
- shutil.move(db_tmp_file, new_file)
- os.rename(new_file, db_file)
- else:
- # Update failed, do not modify the database
- bb.warn("CVE database update failed")
- os.remove(db_tmp_file)
-}
-
-do_fetch[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK}"
-do_fetch[file-checksums] = ""
-do_fetch[vardeps] = ""
-
-python do_unpack() {
- import shutil
- shutil.copyfile(d.getVar("CVE_CHECK_DB_DLDIR_FILE"), d.getVar("CVE_CHECK_DB_FILE"))
-}
-do_unpack[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK} ${CVE_CHECK_DB_FILE_LOCK}"
-
-def cleanup_db_download(db_tmp_file):
- """
- Cleanup the download space from possible failed downloads
- """
-
- # Clean-up the temporary file downloads, we can remove both journal
- # and the temporary database
- if os.path.exists("{0}-journal".format(db_tmp_file)):
- os.remove("{0}-journal".format(db_tmp_file))
- if os.path.exists(db_tmp_file):
- os.remove(db_tmp_file)
-
-def nvd_request_wait(attempt, min_wait):
- return min(((2 * attempt) + min_wait), 30)
-
-def nvd_request_next(url, attempts, api_key, args, min_wait):
- """
- Request next part of the NVD database
- NVD API documentation: https://nvd.nist.gov/developers/vulnerabilities
- """
-
- import urllib.request
- import urllib.parse
- import gzip
- import http
- import time
-
- request = urllib.request.Request(url + "?" + urllib.parse.urlencode(args))
- if api_key:
- request.add_header("apiKey", api_key)
- bb.note("Requesting %s" % request.full_url)
-
- for attempt in range(attempts):
- try:
- r = urllib.request.urlopen(request)
-
- if (r.headers['content-encoding'] == 'gzip'):
- buf = r.read()
- raw_data = gzip.decompress(buf)
- else:
- raw_data = r.read().decode("utf-8")
-
- r.close()
-
- except Exception as e:
- wait_time = nvd_request_wait(attempt, min_wait)
- bb.note("CVE database: received error (%s)" % (e))
- bb.note("CVE database: retrying download after %d seconds. attempted (%d/%d)" % (wait_time, attempt+1, attempts))
- time.sleep(wait_time)
- pass
- else:
- return raw_data
- else:
- # We failed at all attempts
- return None
-
-def update_db_file(db_tmp_file, d, database_time):
- """
- Update the given database file
- """
- import bb.progress
- import bb.utils
- import datetime
- import sqlite3
- import json
-
- # Connect to database
- conn = sqlite3.connect(db_tmp_file)
- initialize_db(conn)
-
- req_args = {'startIndex': 0}
-
- incr_update_threshold = int(d.getVar("CVE_DB_INCR_UPDATE_AGE_THRES"))
- if database_time != 0:
- database_date = datetime.datetime.fromtimestamp(database_time, tz=datetime.timezone.utc)
- today_date = datetime.datetime.now(tz=datetime.timezone.utc)
- delta = today_date - database_date
- if incr_update_threshold == 0:
- bb.note("CVE database: forced full update")
- elif delta < datetime.timedelta(seconds=incr_update_threshold):
- bb.note("CVE database: performing partial update")
- # The maximum range for time is 120 days
- if delta > datetime.timedelta(days=120):
- bb.error("CVE database: Trying to do an incremental update on a larger than supported range")
- req_args['lastModStartDate'] = database_date.isoformat()
- req_args['lastModEndDate'] = today_date.isoformat()
- else:
- bb.note("CVE database: file too old, forcing a full update")
- else:
- bb.note("CVE database: no preexisting database, do a full download")
-
- with bb.progress.ProgressHandler(d) as ph, open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a') as cve_f:
-
- bb.note("Updating entries")
- index = 0
- url = d.getVar("NVDCVE_URL")
- api_key = d.getVar("NVDCVE_API_KEY") or None
- attempts = int(d.getVar("CVE_DB_UPDATE_ATTEMPTS"))
-
- # Recommended by NVD
- wait_time = 6
- if api_key:
- wait_time = 2
-
- while True:
- req_args['startIndex'] = index
- raw_data = nvd_request_next(url, attempts, api_key, req_args, wait_time)
- if raw_data is None:
- # We haven't managed to download data
- return False
-
- # hack for json5 style responses
- if raw_data[-3:] == ',]}':
- bb.note("Removing trailing ',' from nvd response")
- raw_data = raw_data[:-3] + ']}'
-
- data = json.loads(raw_data)
-
- index = data["startIndex"]
- total = data["totalResults"]
- per_page = data["resultsPerPage"]
- bb.note("Got %d entries" % per_page)
- for cve in data["vulnerabilities"]:
- update_db(conn, cve)
-
- index += per_page
- ph.update((float(index) / (total+1)) * 100)
- if index >= total:
- break
-
- # Recommended by NVD
- time.sleep(wait_time)
-
- # Update success, set the date to cve_check file.
- cve_f.write('CVE database update : %s\n\n' % datetime.date.today())
-
- conn.commit()
- conn.close()
- return True
-
-def initialize_db(conn):
- with conn:
- c = conn.cursor()
-
- c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER UNIQUE, DATE TEXT)")
-
- c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, SUMMARY TEXT, \
- SCOREV2 TEXT, SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)")
-
- c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \
- VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \
- VERSION_END TEXT, OPERATOR_END TEXT)")
- c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);")
-
- c.close()
-
-def parse_node_and_insert(conn, node, cveId):
-
- def cpe_generator():
- for cpe in node.get('cpeMatch', ()):
- if not cpe['vulnerable']:
- return
- cpe23 = cpe.get('criteria')
- if not cpe23:
- return
- cpe23 = cpe23.split(':')
- if len(cpe23) < 6:
- return
- vendor = cpe23[3]
- product = cpe23[4]
- version = cpe23[5]
-
- if cpe23[6] == '*' or cpe23[6] == '-':
- version_suffix = ""
- else:
- version_suffix = "_" + cpe23[6]
-
- if version != '*' and version != '-':
- # Version is defined, this is a '=' match
- yield [cveId, vendor, product, version + version_suffix, '=', '', '']
- elif version == '-':
- # no version information is available
- yield [cveId, vendor, product, version, '', '', '']
- else:
- # Parse start version, end version and operators
- op_start = ''
- op_end = ''
- v_start = ''
- v_end = ''
-
- if 'versionStartIncluding' in cpe:
- op_start = '>='
- v_start = cpe['versionStartIncluding']
-
- if 'versionStartExcluding' in cpe:
- op_start = '>'
- v_start = cpe['versionStartExcluding']
-
- if 'versionEndIncluding' in cpe:
- op_end = '<='
- v_end = cpe['versionEndIncluding']
-
- if 'versionEndExcluding' in cpe:
- op_end = '<'
- v_end = cpe['versionEndExcluding']
-
- if op_start or op_end or v_start or v_end:
- yield [cveId, vendor, product, v_start, op_start, v_end, op_end]
- else:
- # This is no version information, expressed differently.
- # Save processing by representing as -.
- yield [cveId, vendor, product, '-', '', '', '']
-
- conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator()).close()
-
-def update_db(conn, elt):
- """
- Update a single entry in the on-disk database
- """
-
- accessVector = None
- vectorString = None
- cveId = elt['cve']['id']
- if elt['cve'].get('vulnStatus') == "Rejected":
- c = conn.cursor()
- c.execute("delete from PRODUCTS where ID = ?;", [cveId])
- c.execute("delete from NVD where ID = ?;", [cveId])
- c.close()
- return
- cveDesc = ""
- for desc in elt['cve']['descriptions']:
- if desc['lang'] == 'en':
- cveDesc = desc['value']
- date = elt['cve']['lastModified']
-
- # Extract maximum CVSS scores from all sources (Primary and Secondary)
- cvssv2 = 0.0
- try:
- # Iterate through all cvssMetricV2 entries and find the maximum score
- for metric in elt['cve']['metrics']['cvssMetricV2']:
- score = metric['cvssData']['baseScore']
- if score > cvssv2:
- cvssv2 = score
- accessVector = metric['cvssData']['accessVector']
- vectorString = metric['cvssData']['vectorString']
- except KeyError:
- pass
-
- cvssv3 = 0.0
- try:
- # Iterate through all cvssMetricV30 entries and find the maximum score
- for metric in elt['cve']['metrics']['cvssMetricV30']:
- score = metric['cvssData']['baseScore']
- if score > cvssv3:
- cvssv3 = score
- accessVector = accessVector or metric['cvssData']['attackVector']
- vectorString = vectorString or metric['cvssData']['vectorString']
- except KeyError:
- pass
-
- try:
- # Iterate through all cvssMetricV31 entries and find the maximum score
- for metric in elt['cve']['metrics']['cvssMetricV31']:
- score = metric['cvssData']['baseScore']
- if score > cvssv3:
- cvssv3 = score
- accessVector = accessVector or metric['cvssData']['attackVector']
- vectorString = vectorString or metric['cvssData']['vectorString']
- except KeyError:
- pass
-
- cvssv4 = 0.0
- try:
- # Iterate through all cvssMetricV40 entries and find the maximum score
- for metric in elt['cve']['metrics']['cvssMetricV40']:
- score = metric['cvssData']['baseScore']
- if score > cvssv4:
- cvssv4 = score
- accessVector = accessVector or metric['cvssData']['attackVector']
- vectorString = vectorString or metric['cvssData']['vectorString']
- except KeyError:
- pass
-
- accessVector = accessVector or "UNKNOWN"
- vectorString = vectorString or "UNKNOWN"
-
- conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)",
- [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close()
-
- try:
- # Remove any pre-existing CVE configuration. Even for partial database
- # update, those will be repopulated. This ensures that old
- # configuration is not kept for an updated CVE.
- conn.execute("delete from PRODUCTS where ID = ?", [cveId]).close()
- for config in elt['cve']['configurations']:
- # This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing
- for node in config["nodes"]:
- parse_node_and_insert(conn, node, cveId)
- except KeyError:
- bb.note("CVE %s has no configurations" % cveId)
-
-do_fetch[nostamp] = "1"
-
-EXCLUDE_FROM_WORLD = "1"