| from datetime import datetime |
| from datetime import timedelta |
| import json |
| |
| from recipe_engine import recipe_api |
| |
| JSON_ENTRY_NAME = 'name' |
| JSON_ENTRY_UPDATED_DATE = 'updated_date' |
| JSON_ENTRY_REMOVAL_DATE = 'removal_date' |
| |
| PACKAGE_REMOVAL_INTERVAL_DAYS = 30 |
| |
| |
| class CacheEntry: |
| """An object used to represent a file and its usage in a cache.""" |
| |
| def __init__( |
| self, |
| file_name: str, |
| updated_date: datetime = None, |
| removal_date: datetime = None, |
| ): |
| self.name = file_name |
| |
| # The updated date will be stored as a string for easy conversion to and from json. |
| self.updated_date = updated_date |
| self.updated_date = updated_date.strftime("%m/%d/%Y, %H:%M:%S") |
| |
| # The removal date will be stored as a string for easy conversion to and from json. |
| self.removal_date = removal_date |
| if self.removal_date is None: |
| self._calc_removal_date(self.updated_date) |
| else: |
| self.removal_date = removal_date.strftime("%m/%d/%Y, %H:%M:%S") |
| |
| def _calc_removal_date(self, updated_date: datetime): |
| """Calculate the removal date as the update date + 30 days.""" |
| time_delta = timedelta(days=PACKAGE_REMOVAL_INTERVAL_DAYS) |
| updated_date_datetime = datetime.strptime( |
| updated_date, "%m/%d/%Y, %H:%M:%S" |
| ) |
| new_removal_date = updated_date_datetime + time_delta |
| self.removal_date = new_removal_date.strftime("%m/%d/%Y, %H:%M:%S") |
| |
| def removal_date_as_datetime(self) -> datetime: |
| """Return the stored removal timestamp string as a datetime object.""" |
| return self._convert_str_to_datetime(self.removal_date) |
| |
| def _convert_str_to_datetime(self, date_str: str) -> datetime: |
| """Return the converted string date as a datetime object""" |
| return datetime.strptime(date_str, "%m/%d/%Y, %H:%M:%S") |
| |
| |
| class CacheMicroManagerApi(recipe_api.RecipeApi): |
| """This module keeps track of root individual files within a directory (based on |
| a non-recursivce mode). It keeps track of the last time a file was used and |
| proactively cleans the directory by removing files older than some specified time. |
| """ |
| |
| # pylint: disable=unused-argument |
| def __init__(self, *args, **kwargs): |
| """Create a new CacheMicroManager object. |
| |
| Args: |
| target_dir (str): the path to the cache directory containing dependencies. |
| """ |
| super(CacheMicroManagerApi, self).__init__(*args, **kwargs) |
| self.cache_target_directory = None |
| self.cache_name = None |
| self.metadata_file_name = None |
| self.cache_metadata_file_path = None |
| |
| def _initialize(self, target_dir): |
| self.cache_target_directory = self.m.path.cast_to_path(target_dir) |
| self.cache_name = self.m.path.basename(self.cache_target_directory) |
| self.metadata_file_name = '.{}_cache_metadata.json'.format(self.cache_name) |
| self.cache_metadata_file_path = ( |
| self.cache_target_directory / self.metadata_file_name |
| ) |
| |
| def today(self): |
| """Provide a deterministic date when running tests.""" |
| return datetime.now() # pragma: nocover |
| |
| def run(self, target_dir, deps_list: list): |
| """Run the cache micro manager on the target directory. |
| |
| If the directory is not yet being tracked by the cache micro manager it will read the contents of |
| the target_dir_path and all current files and directories to cache metadata file. |
| |
| If the directory is being tracked it will add any dependencies in deps_list not present in the file |
| to the metadata file or update the updated and removal date of dependencies in deps_list in the file. |
| |
| After the deps are updated or added it will perform the following minor cleanup: |
| * If there is an entry in the metadata file but the package is not on disk it will remove the entry |
| from the file. |
| * If there is a package on disk but not in the file it will simply add it to the metadata file for |
| tracking and eventual removal. |
| * Then it will look at all removal dates from the metadata file and delete all expired packages from |
| disk and remove them from the metadata file. |
| |
| Args: |
| * deps_list(list[str]): the list of dependencies that are currently being used. |
| """ |
| |
| self._initialize(target_dir=target_dir) |
| |
| with self.m.step.nest('Running Cache Micro Manager on {}.'.format( |
| self.cache_target_directory)): |
| |
| cache_exists = self.m.path.exists(self.cache_target_directory) |
| if not cache_exists: |
| self.m.step.empty( |
| 'Cache Micro Manager, cache directory exists check', |
| status='SUCCESS', |
| step_text='Cache dir does not exist, skipping.' |
| ) |
| return |
| |
| if not self.m.path.exists(self.cache_metadata_file_path): |
| directory_files_list = self.m.file.listdir( |
| 'Reading cache directory {}'.format(self.cache_target_directory), |
| self.cache_target_directory, |
| recursive=False, |
| ) |
| |
| directory_cache_entry_list = self.convert_file_list_to_cache_entry_list( |
| directory_files_list |
| ) |
| |
| if len(directory_cache_entry_list) > 0: |
| # It should be safe to ignore the check for existence of the file since no one |
| # actively logs onto the bots and manipulates the file system. |
| self.m.file.write_text( |
| 'Writing cache metadata file.', |
| self.cache_metadata_file_path, |
| json.dumps([ob.__dict__ for ob in directory_cache_entry_list]), |
| ) |
| else: |
| # the currently stored metadata entries. |
| |
| # example of the data that is stored in the metadata file. |
| # [ |
| # { |
| # "name": "package_1", |
| # "updated_date": "datetime", |
| # "removal_date": "datetime + interval" |
| # }, |
| # { |
| # "name": "package_2", |
| # "updated_date": "datetime", |
| # "removal_date": "datetime + interval" |
| # } |
| # ] |
| |
| current_metadata_entries = self.read_metadata_file() |
| |
| # these files may not be in the metadata file, no telling how they got there. |
| current_directory_entries = self.m.file.listdir( |
| 'Reading cache directory {}'.format(self.cache_target_directory), |
| self.cache_target_directory, False |
| ) |
| |
| # remove the cache file itself so we do not record it in the file. |
| current_directory_entries.remove(self.cache_metadata_file_path) |
| |
| # add or update the current working dependencies. |
| for dep in deps_list: |
| if not self.is_file_name_in_cache_entry_list( |
| dep, current_metadata_entries): |
| # we need to add it. |
| current_metadata_entries.append( |
| CacheEntry(file_name=str(dep), updated_date=self.today()) |
| ) |
| else: |
| # we want to update it. |
| existing_entry = self.get_cache_entry_from_list( |
| str(dep), current_metadata_entries |
| ) |
| |
| # package will be active for another month. |
| new_entry = CacheEntry( |
| file_name=str(existing_entry.name), updated_date=self.today() |
| ) |
| current_metadata_entries.remove(existing_entry) |
| current_metadata_entries.append(new_entry) |
| |
| paths_as_str_current_dir_entries = [ |
| str(item) for item in current_directory_entries |
| ] |
| # there can be deps in the file and not in the directory or |
| for current_meta_entry in current_metadata_entries: # current metadata entries should be str |
| if str(current_meta_entry.name |
| ) not in paths_as_str_current_dir_entries: |
| current_metadata_entries.remove(current_meta_entry) |
| |
| # there can be deps in the directory and not in the file. |
| for file_path in paths_as_str_current_dir_entries: |
| cacheEntry = self.get_cache_entry_from_list( |
| str(file_path), current_metadata_entries |
| ) |
| if cacheEntry is None: |
| current_metadata_entries.append( |
| CacheEntry(file_name=str(file_path), updated_date=self.today()) |
| ) |
| |
| # Check dates and delete unused packages. |
| for cacheEntry in current_metadata_entries: |
| today = self.today() |
| if cacheEntry.removal_date_as_datetime().date() < today.date(): |
| self.delete_file(cacheEntry.name) |
| current_metadata_entries.remove(cacheEntry) |
| |
| # write the new file contents. |
| self.m.file.remove( |
| 'Removing existing cache file {}'.format( |
| self.cache_metadata_file_path |
| ), self.cache_metadata_file_path |
| ) |
| self.m.file.write_text( |
| 'Writing cache metadata file.', |
| self.cache_metadata_file_path, |
| json.dumps([ob.__dict__ for ob in current_metadata_entries]), |
| ) |
| |
| def get_cache_entry_from_list( |
| self, file_name: str, cache_entry_list: list |
| ) -> CacheEntry: |
| """Get the associated cache entry when supplied with a specific file name. |
| |
| Args: |
| * file_name (str): the name of a file/directory that represents a dependency. |
| * cache_entry_list (list[CacheEntry]): a list of CacheEntry objects. |
| |
| Returns: |
| CacheEntry | None: Returns a CacheEntry if found in the list otherwise returns None. |
| """ |
| for cache_entry in cache_entry_list: |
| if file_name == cache_entry.name: |
| return cache_entry |
| return None |
| |
| # file name is a path as passed in here |
| def is_file_name_in_cache_entry_list( |
| self, file_name, cache_entry_list: list |
| ) -> bool: |
| """Check to see if a CacheEntry is in the list for the supplied file_name. |
| |
| Args: |
| * file_name (str): the name of a file/directory that represents a dependency. |
| * cache_entry_list (list[CacheEntry]): a list of CacheEntry objects. |
| |
| Returns: |
| bool: True if a cache entry exists with file_name as its name, False if it does not. |
| """ |
| for cache_entry in cache_entry_list: |
| if str(file_name) == str(cache_entry.name): |
| return True |
| return False |
| |
| def convert_file_list_to_cache_entry_list( |
| self, file_names_list: list |
| ) -> list: |
| """Create a list of CacheEntry objects from a list of file names. |
| |
| The primary usage of this is on start of CacheMicroManager when it has not yet begun |
| to manage a directory and we need to record the contents of it. |
| |
| Args: |
| * file_names_list (list[str]): a list of file names to use to instantiate CacheEntry |
| objects. |
| |
| Returns: |
| list[CacheEntry]: a list of newly instantiated CacheEntry objects associate with the |
| supplied file names list. |
| """ |
| cache_entry_list = [] |
| for fd in file_names_list: |
| cache_entry = CacheEntry(file_name=str(fd), updated_date=self.today()) |
| cache_entry_list.append(cache_entry) |
| return cache_entry_list |
| |
| def delete_file(self, file_name): |
| """Delete a file or directory. |
| |
| Args: |
| * file_name (Path): a file or directory name that will be deleted. |
| """ |
| try: |
| if self.m.path.isfile(file_name): |
| self.m.file.remove( |
| 'Removing file descriptor {}'.format(file_name), file_name |
| ) #pragma: nocover |
| elif self.m.path.isdir(file_name): |
| try: #pragma: nocover |
| self.m.file.rmtree( |
| 'Removig dir file descriptor {} (rmtree)'.format(file_name), |
| file_name |
| ) #pragma: nocover |
| except self.m.file.Error: #pragma: nocover |
| self.m.step( |
| 'Removing dir file descriptor {} (fallback rm)'.format(file_name), |
| ['rm', '-rf', file_name] |
| ) #pragma: nocover |
| except self.m.file.Error: #pragma: nocover |
| print('File not found.') #pragma: nocover |
| |
| def read_metadata_file(self) -> list: |
| """Read the metadata file at the self.cache_metadata_file_location path. |
| |
| Returns: |
| list[CacheEntry]: returns the list of CacheEntry's found in the existing file. |
| """ |
| with self.m.step.nest('Reading metadata file {}'.format( |
| self.cache_metadata_file_path)): |
| # it is possible to return an empty list, populated list or None. |
| meta_json = self.m.file.read_text( |
| 'Reading {}'.format(self.cache_metadata_file_path), |
| self.cache_metadata_file_path, |
| ) |
| meta_cache_entries = [] |
| json_items = json.loads(meta_json) |
| for item in json_items: |
| # TODO(ricardoamador) remove this code once the old dates without times are removed. |
| updated_date_found = None |
| if (self.date_format_check(item[JSON_ENTRY_UPDATED_DATE])): |
| updated_date_found = datetime.strptime( |
| item[JSON_ENTRY_UPDATED_DATE], "%m/%d/%Y, %H:%M:%S" |
| ) |
| else: |
| updated_date_found = datetime.strptime( |
| "{}, 12:00:00".format(item[JSON_ENTRY_UPDATED_DATE]), |
| "%m/%d/%Y, %H:%M:%S" |
| ) |
| |
| removal_date_found = None |
| if (self.date_format_check(item[JSON_ENTRY_REMOVAL_DATE])): |
| removal_date_found = datetime.strptime( |
| item[JSON_ENTRY_REMOVAL_DATE], "%m/%d/%Y, %H:%M:%S" |
| ) |
| else: |
| removal_date_found = datetime.strptime( |
| "{}, 12:00:00".format(item[JSON_ENTRY_REMOVAL_DATE]), |
| "%m/%d/%Y, %H:%M:%S" |
| ) |
| |
| new_entry = CacheEntry( |
| file_name=str(item[JSON_ENTRY_NAME]), |
| updated_date=updated_date_found, |
| removal_date=removal_date_found |
| ) |
| meta_cache_entries.append(new_entry) |
| return meta_cache_entries |
| |
| def date_format_check(self, date_str: str) -> bool: |
| """Check the date format of the date we are attempting to process. |
| |
| returns: |
| True if the time stamp includes the time and the date. False if non-compliant. |
| """ |
| try: |
| datetime.strptime(date_str, "%m/%d/%Y, %H:%M:%S") |
| return True |
| except ValueError: |
| return False |