Source code for CveXplore.database.maintenance.file_handlers

"""
File Handlers
=============
"""
import shutil
from abc import abstractmethod

from .DownloadHandler import DownloadHandler
from .IJSONHandler import IJSONHandler
from .db_action import DatabaseAction


[docs]class JSONFileHandler(DownloadHandler): """ This class handles all JSON related download processing and functions as a base class for specific JSON sources processing and downloading """
[docs] def __init__(self, feed_type, prefix): super().__init__(feed_type) self.is_update = True self.prefix = prefix self.ijson_handler = IJSONHandler()
[docs] def __repr__(self): """ return string representation of object """ return "<< JSONFileHandler:{} >>".format(self.feed_type)
[docs] def file_to_queue(self, file_tuple): """ Method responsible for transferring file contents to the worker queue for further processing and inserting them into the database :param file_tuple: A tuple containing the working dir for processing and the filename of the file to process :type file_tuple: tuple """ working_dir, filename = file_tuple # adjust the interval counter for debug logging when updating if self.is_update: interval = 500 else: interval = 5000 x = 0 self.logger.debug("Starting processing of file: {}".format(filename)) for cpe in self.ijson_handler.fetch(filename=filename, prefix=self.prefix): self.process_item(item=cpe) x += 1 if x % interval == 0: self.logger.debug( "Processed {} entries from file: {}".format(x, filename) ) try: self.logger.debug("Removing working dir: {}".format(working_dir)) shutil.rmtree(working_dir) except Exception as err: self.logger.error( "Failed to remove working dir; error produced: {}".format(err) )
@abstractmethod def update(self, **kwargs): raise NotImplementedError @abstractmethod def populate(self, **kwargs): raise NotImplementedError
[docs]class XMLFileHandler(DownloadHandler): """ This class handles all XML related download processing and functions as a base class for specific XML sources processing and downloading """
[docs] def __init__(self, feed_type): super().__init__(feed_type) self.is_update = True
[docs] def __repr__(self): """ return string representation of object """ return "<< XMLFileHandler:{} >>".format(self.feed_type)
[docs] def process_item(self, item): """ Method responsible for putting items into the worker queue as database actions :param item: Item processed from XML source and parsed into a dictionary :type item: dict """ if self.is_update: self.queue.put( DatabaseAction( action=DatabaseAction.actions.UpdateOne, collection=self.feed_type.lower(), doc=item, ) ) else: self.queue.put( DatabaseAction( action=DatabaseAction.actions.InsertOne, collection=self.feed_type.lower(), doc=item, ) )
@abstractmethod def file_to_queue(self, *args): raise NotImplementedError @abstractmethod def update(self, **kwargs): raise NotImplementedError @abstractmethod def populate(self, **kwargs): raise NotImplementedError