"""
Main
====
"""
import functools
import json
import os
import re
from collections import defaultdict
from pymongo import DESCENDING
from CveXplore.api.connection.api_db import ApiDatabaseSource
from CveXplore.common.cpe_converters import from2to3CPE
from CveXplore.common.db_mapping import database_mapping
from CveXplore.database.connection.mongo_db import MongoDBConnection
from CveXplore.errors import DatabaseIllegalCollection
from CveXplore.database.maintenance.main_updater import MainUpdater
try:
from version import VERSION
except ModuleNotFoundError:
_PKG_DIR = os.path.dirname(__file__)
version_file = os.path.join(_PKG_DIR, "VERSION")
with open(version_file, "r") as fdsec:
VERSION = fdsec.read()
[docs]class CveXplore(object):
"""
Main class for CveXplore package
"""
[docs] def __init__(self, mongodb_connection_details=None, api_connection_details=None):
"""
Create a new instance of CveXplore
:param mongodb_connection_details: Provide the connection details needed to establish a connection to a mongodb
instance. The connection details should be in line with pymongo's
documentation.
:type mongodb_connection_details: dict
:param api_connection_details: Provide the connection details needed to establish a connection to a cve-search
API provider. The cve-search API provider should allow access to the 'query' POST
endpoint; all other API endpoints are not needed for CveXplore to function. For
the connection details supported please check the :ref:`API connection <api>`
documentation.
:type api_connection_details: dict
"""
self.__version = VERSION
os.environ["DOC_BUILD"] = json.dumps({"DOC_BUILD": "NO"})
if (
api_connection_details is not None
and mongodb_connection_details is not None
):
raise ValueError(
"CveXplore can be used to connect to either a cve-search database OR a cve-search api, not both!"
)
elif api_connection_details is None and mongodb_connection_details is None:
# by default assume we are talking to a database
mongodb_connection_details = {}
os.environ["MONGODB_CON_DETAILS"] = json.dumps(mongodb_connection_details)
self.datasource = MongoDBConnection(**mongodb_connection_details)
self.database = MainUpdater(datasource=self.datasource)
elif mongodb_connection_details is not None:
os.environ["MONGODB_CON_DETAILS"] = json.dumps(mongodb_connection_details)
self.datasource = MongoDBConnection(**mongodb_connection_details)
self.database = MainUpdater(datasource=self.datasource)
elif api_connection_details is not None:
api_connection_details["user_agent"] = "CveXplore:{}".format(self.version)
os.environ["API_CON_DETAILS"] = json.dumps(api_connection_details)
self.datasource = ApiDatabaseSource(**api_connection_details)
self.database_mapping = database_mapping
from CveXplore.database.helpers.generic_db import GenericDatabaseFactory
from CveXplore.database.helpers.specific_db import CvesDatabaseFunctions
for each in self.database_mapping:
if each != "cves":
setattr(self, each, GenericDatabaseFactory(collection=each))
else:
setattr(self, each, CvesDatabaseFunctions(collection=each))
[docs] def get_single_store_entry(self, entry_type, dict_filter={}):
"""
Method to perform a query on a *single* collection in the data source and return a *single* result.
:param entry_type: Which specific store are you querying? Choices are:
- capec;
- cpe;
- cwe;
- via4;
- cves;
:type entry_type: str
:param dict_filter: Dictionary representing a filter according to pymongo documentation
:type dict_filter: dict
:return: Objectified result from the query
:rtype: object
"""
entry_type = entry_type.lower()
if entry_type not in self.database_mapping:
raise DatabaseIllegalCollection(
"Illegal collection requested: only {} are allowed!".format(
self.database_mapping
)
)
result = getattr(self.datasource, "store_{}".format(entry_type)).find_one(
dict_filter
)
return result
[docs] def get_single_store_entries(self, query, limit=10):
"""
Method to perform a query on a *single* collection in the data source and return all of the results.
:param query: Tuple which contains the entry_type and the dict_filter in a tuple.
Choices for entry_type:
- capec;
- cpe;
- cwe;
- via4;
- cves;
dict_filter is a dictionary representing a filter according to pymongo documentation.
example:
get_single_store_entries(("CWE", {"id": "78"}))
:type query: tuple
:param limit: Limit the amount of returned results, defaults to 10
:type limit: int
:return: list with queried results
:rtype: list
"""
if not isinstance(query, tuple):
raise ValueError(
"Wrong parameter type, received: {} expected: tuple".format(type(query))
)
if len(query) != 2:
raise ValueError(
"Query parameter does not consist of the expected amount of variables, expected: 2 received: {}".format(
len(query)
)
)
entry_type, dict_filter = query
entry_type = entry_type.lower()
if entry_type not in self.database_mapping:
raise DatabaseIllegalCollection(
"Illegal collection requested: only {} are allowed!".format(
self.database_mapping
)
)
results = (
getattr(self.datasource, "store_{}".format(entry_type))
.find(dict_filter)
.limit(limit)
)
return list(results)
[docs] def get_multi_store_entries(self, *queries, limit=10):
"""
Method to perform *multiple* queries on *a single* or *multiple* collections in the data source and return the
results.
:param queries: A list of tuples which contains the entry_type and the dict_filter.
Choices for entry_type:
- capec;
- cpe;
- cwe;
- via4;
- cves;
dict_filter is a dictionary representing a filter according to pymongo documentation.
example:
get_multi_store_entries([("CWE", {"id": "78"}), ("cves", {"id": "CVE-2009-0018"})])
:type queries: list
:return: Queried results in a single list
:rtype: list
"""
results = map(
functools.partial(self.get_single_store_entries, limit=limit), *queries
)
joined_list = []
for result_list in results:
joined_list += result_list
return list(joined_list)
[docs] def cves_for_cpe(self, cpe_string):
"""
Method for retrieving Cves that match a single CPE string. By default the search will be made matching
the configuration fields of the cves documents.
:param cpe_string: CPE string: e.g. ``cpe:2.3:o:microsoft:windows_7:*:sp1:*:*:*:*:*:*``
:type cpe_string: str
:return: List with Cves
:rtype: list
"""
# format to cpe2.3
cpe_string = from2to3CPE(cpe_string)
if cpe_string.startswith("cpe"):
# strict search with term starting with cpe; e.g: cpe:2.3:o:microsoft:windows_7:*:sp1:*:*:*:*:*:*
remove_trailing_regex_stars = r"(?:\:|\:\:|\:\*)+$"
cpe_regex = re.escape(re.sub(remove_trailing_regex_stars, "", cpe_string))
cpe_regex_string = r"^{}:".format(cpe_regex)
else:
# more general search on same field; e.g. microsoft:windows_7
cpe_regex_string = "{}".format(re.escape(cpe_string))
cves = self.get_single_store_entries(
("cves", {"vulnerable_configuration": {"$regex": cpe_regex_string}}),
limit=0,
)
return cves
[docs] def cve_by_id(self, cve_id):
"""
Method to retrieve a single CVE from the database by it's CVE ID number
:param cve_id: String representing the CVE id; e.g. CVE-2020-0001
:type cve_id: str
:return: CVE object
:rtype: Cves
"""
return self.get_single_store_entry("cves", {"id": cve_id})
[docs] def capec_by_cwe_id(self, cwe_id):
"""
Method to retrieve capecs related to a specific CWE ID
:param cwe_id: String representing the CWE id; e.g. '15'
:type cwe_id:
:return: List with Capecs
:rtype: list
"""
cwe = self.get_single_store_entry("cwe", {"id": cwe_id})
if cwe is not None:
return list(cwe.iter_related_capecs())
else:
return cwe
[docs] def last_cves(self, limit=10):
"""
Method to retrieve the last entered / changed cves. By default limited to 10.
:return: List with Cves
:rtype: list
"""
results = (
getattr(self.datasource, "store_cves")
.find()
.sort("Modified", DESCENDING)
.limit(limit)
)
return list(results)
[docs] def get_db_content_stats(self):
"""
Property returning the stats from the database. Stats consist of the time last modified and the document count
per cvedb store in the database.
:return: Statistics
:rtype: dict
"""
stats = defaultdict(dict)
if not isinstance(self.datasource, ApiDatabaseSource):
if hasattr(self.datasource, "store_info"):
results = self.datasource.store_info.find({})
for each in results:
each.pop("_id")
db = each["db"]
each.pop("db")
if "sources" in each:
each.pop("sources")
each.pop("searchables")
each["last-modified"] = str(each["last-modified"])
each["document count"] = getattr(
self.datasource, "store_{}".format(db)
).count_documents({})
stats[db] = each
for mgmtlist in ["mgmt_blacklist", "mgmt_whitelist"]:
stats[mgmtlist] = {
"document count": getattr(
self.datasource, "store_{}".format(mgmtlist)
).count_documents({})
}
return dict(stats)
else:
return "Database info could not be retrieved"
return "Using api endpoint: {}".format(self.datasource.url)
@property
def version(self):
""" Property returning current version """
return self.__version
[docs] def __repr__(self):
""" String representation of object """
return "<< CveXplore:{} >>".format(self.version)