blob: 331842f9634674354cd9f3edca5f7aaba6cfd8f7 [file] [log] [blame]
from datetime import datetime
from datetime import timedelta
import json
from recipe_engine import recipe_api
JSON_ENTRY_NAME = 'name'
JSON_ENTRY_UPDATED_DATE = 'updated_date'
JSON_ENTRY_REMOVAL_DATE = 'removal_date'
PACKAGE_REMOVAL_INTERVAL_DAYS = 30
class CacheEntry:
"""An object used to represent a file and its usage in a cache."""
def __init__(
self,
file_name: str,
updated_date: datetime = None,
removal_date: datetime = None,
):
self.name = file_name
# The updated date will be stored as a string for easy conversion to and from json.
self.updated_date = updated_date
self.updated_date = updated_date.strftime("%m/%d/%Y, %H:%M:%S")
# The removal date will be stored as a string for easy conversion to and from json.
self.removal_date = removal_date
if self.removal_date is None:
self._calc_removal_date(self.updated_date)
else:
self.removal_date = removal_date.strftime("%m/%d/%Y, %H:%M:%S")
def _calc_removal_date(self, updated_date: datetime):
"""Calculate the removal date as the update date + 30 days."""
time_delta = timedelta(days=PACKAGE_REMOVAL_INTERVAL_DAYS)
updated_date_datetime = datetime.strptime(
updated_date, "%m/%d/%Y, %H:%M:%S"
)
new_removal_date = updated_date_datetime + time_delta
self.removal_date = new_removal_date.strftime("%m/%d/%Y, %H:%M:%S")
def removal_date_as_datetime(self) -> datetime:
"""Return the stored removal timestamp string as a datetime object."""
return self._convert_str_to_datetime(self.removal_date)
def _convert_str_to_datetime(self, date_str: str) -> datetime:
"""Return the converted string date as a datetime object"""
return datetime.strptime(date_str, "%m/%d/%Y, %H:%M:%S")
class CacheMicroManagerApi(recipe_api.RecipeApi):
"""This module keeps track of root individual files within a directory (based on
a non-recursivce mode). It keeps track of the last time a file was used and
proactively cleans the directory by removing files older than some specified time.
"""
# pylint: disable=unused-argument
def __init__(self, *args, **kwargs):
"""Create a new CacheMicroManager object.
Args:
target_dir (str): the path to the cache directory containing dependencies.
"""
super(CacheMicroManagerApi, self).__init__(*args, **kwargs)
self.cache_target_directory = None
self.cache_name = None
self.metadata_file_name = None
self.cache_metadata_file_path = None
def _initialize(self, target_dir):
self.cache_target_directory = target_dir
self.cache_name = self.m.path.basename(self.cache_target_directory)
self.metadata_file_name = '.{}_cache_metadata.json'.format(self.cache_name)
self.cache_metadata_file_path = self.cache_target_directory.join(
self.metadata_file_name
)
def today(self):
"""Provide a deterministic date when running tests."""
return datetime.now() # pragma: nocover
def run(self, target_dir, deps_list: list):
"""Run the cache micro manager on the target directory.
If the directory is not yet being tracked by the cache micro manager it will read the contents of
the target_dir_path and all current files and directories to cache metadata file.
If the directory is being tracked it will add any dependencies in deps_list not present in the file
to the metadata file or update the updated and removal date of dependencies in deps_list in the file.
After the deps are updated or added it will perform the following minor cleanup:
* If there is an entry in the metadata file but the package is not on disk it will remove the entry
from the file.
* If there is a package on disk but not in the file it will simply add it to the metadata file for
tracking and eventual removal.
* Then it will look at all removal dates from the metadata file and delete all expired packages from
disk and remove them from the metadata file.
Args:
* deps_list(list[str]): the list of dependencies that are currently being used.
"""
self._initialize(target_dir=target_dir)
with self.m.step.nest('Running Cache Micro Manager on {}.'.format(
self.cache_target_directory)):
cache_exists = self.m.path.exists(self.cache_target_directory)
if not cache_exists:
self.m.step.empty(
'Cache Micro Manager, cache directory exists check',
status='SUCCESS',
step_text='Cache dir does not exist, skipping.'
)
return
if not self.m.path.exists(self.cache_metadata_file_path):
directory_files_list = self.m.file.listdir(
'Reading cache directory {}'.format(self.cache_target_directory),
self.cache_target_directory,
recursive=False,
)
directory_cache_entry_list = self.convert_file_list_to_cache_entry_list(
directory_files_list
)
if len(directory_cache_entry_list) > 0:
# It should be safe to ignore the check for existence of the file since no one
# actively logs onto the bots and manipulates the file system.
self.m.file.write_text(
'Writing cache metadata file.',
self.cache_metadata_file_path,
json.dumps([ob.__dict__ for ob in directory_cache_entry_list]),
)
else:
# the currently stored metadata entries.
# example of the data that is stored in the metadata file.
# [
# {
# "name": "package_1",
# "updated_date": "datetime",
# "removal_date": "datetime + interval"
# },
# {
# "name": "package_2",
# "updated_date": "datetime",
# "removal_date": "datetime + interval"
# }
# ]
current_metadata_entries = self.read_metadata_file()
# these files may not be in the metadata file, no telling how they got there.
current_directory_entries = self.m.file.listdir(
'Reading cache directory {}'.format(self.cache_target_directory),
self.cache_target_directory, False
)
# remove the cache file itself so we do not record it in the file.
current_directory_entries.remove(self.cache_metadata_file_path)
# add or update the current working dependencies.
for dep in deps_list:
if not self.is_file_name_in_cache_entry_list(
dep, current_metadata_entries):
# we need to add it.
current_metadata_entries.append(
CacheEntry(file_name=str(dep), updated_date=self.today())
)
else:
# we want to update it.
existing_entry = self.get_cache_entry_from_list(
str(dep), current_metadata_entries
)
# package will be active for another month.
new_entry = CacheEntry(
file_name=str(existing_entry.name), updated_date=self.today()
)
current_metadata_entries.remove(existing_entry)
current_metadata_entries.append(new_entry)
paths_as_str_current_dir_entries = [
str(item) for item in current_directory_entries
]
# there can be deps in the file and not in the directory or
for current_meta_entry in current_metadata_entries: # current metadata entries should be str
if str(current_meta_entry.name
) not in paths_as_str_current_dir_entries:
current_metadata_entries.remove(current_meta_entry)
# there can be deps in the directory and not in the file.
for file_path in paths_as_str_current_dir_entries:
cacheEntry = self.get_cache_entry_from_list(
str(file_path), current_metadata_entries
)
if cacheEntry is None:
current_metadata_entries.append(
CacheEntry(file_name=str(file_path), updated_date=self.today())
)
# Check dates and delete unused packages.
for cacheEntry in current_metadata_entries:
today = self.today()
if cacheEntry.removal_date_as_datetime().date() < today.date():
self.delete_file(cacheEntry.name)
current_metadata_entries.remove(cacheEntry)
# write the new file contents.
self.m.file.remove(
'Removing existing cache file {}'.format(
self.cache_metadata_file_path
), self.cache_metadata_file_path
)
self.m.file.write_text(
'Writing cache metadata file.',
self.cache_metadata_file_path,
json.dumps([ob.__dict__ for ob in current_metadata_entries]),
)
def get_cache_entry_from_list(
self, file_name: str, cache_entry_list: list
) -> CacheEntry:
"""Get the associated cache entry when supplied with a specific file name.
Args:
* file_name (str): the name of a file/directory that represents a dependency.
* cache_entry_list (list[CacheEntry]): a list of CacheEntry objects.
Returns:
CacheEntry | None: Returns a CacheEntry if found in the list otherwise returns None.
"""
for cache_entry in cache_entry_list:
if file_name == cache_entry.name:
return cache_entry
return None
# file name is a path as passed in here
def is_file_name_in_cache_entry_list(
self, file_name, cache_entry_list: list
) -> bool:
"""Check to see if a CacheEntry is in the list for the supplied file_name.
Args:
* file_name (str): the name of a file/directory that represents a dependency.
* cache_entry_list (list[CacheEntry]): a list of CacheEntry objects.
Returns:
bool: True if a cache entry exists with file_name as its name, False if it does not.
"""
for cache_entry in cache_entry_list:
if str(file_name) == str(cache_entry.name):
return True
return False
def convert_file_list_to_cache_entry_list(
self, file_names_list: list
) -> list:
"""Create a list of CacheEntry objects from a list of file names.
The primary usage of this is on start of CacheMicroManager when it has not yet begun
to manage a directory and we need to record the contents of it.
Args:
* file_names_list (list[str]): a list of file names to use to instantiate CacheEntry
objects.
Returns:
list[CacheEntry]: a list of newly instantiated CacheEntry objects associate with the
supplied file names list.
"""
cache_entry_list = []
for fd in file_names_list:
cache_entry = CacheEntry(file_name=str(fd), updated_date=self.today())
cache_entry_list.append(cache_entry)
return cache_entry_list
def delete_file(self, file_name):
"""Delete a file or directory.
Args:
* file_name (Path): a file or directory name that will be deleted.
"""
try:
if self.m.path.isfile(file_name):
self.m.file.remove(
'Removing file descriptor {}'.format(file_name), file_name
) #pragma: nocover
elif self.m.path.isdir(file_name):
try: #pragma: nocover
self.m.file.rmtree(
'Removig dir file descriptor {} (rmtree)'.format(file_name),
file_name
) #pragma: nocover
except self.m.file.Error: #pragma: nocover
self.m.step(
'Removing dir file descriptor {} (fallback rm)'.format(file_name),
['rm', '-rf', file_name]
) #pragma: nocover
except self.m.file.Error: #pragma: nocover
print('File not found.') #pragma: nocover
def read_metadata_file(self) -> list:
"""Read the metadata file at the self.cache_metadata_file_location path.
Returns:
list[CacheEntry]: returns the list of CacheEntry's found in the existing file.
"""
with self.m.step.nest('Reading metadata file {}'.format(
self.cache_metadata_file_path)):
# it is possible to return an empty list, populated list or None.
meta_json = self.m.file.read_text(
'Reading {}'.format(self.cache_metadata_file_path),
self.cache_metadata_file_path,
)
meta_cache_entries = []
json_items = json.loads(meta_json)
for item in json_items:
# TODO(ricardoamador) remove this code once the old dates without times are removed.
updated_date_found = None
if (self.date_format_check(item[JSON_ENTRY_UPDATED_DATE])):
updated_date_found = datetime.strptime(
item[JSON_ENTRY_UPDATED_DATE], "%m/%d/%Y, %H:%M:%S"
)
else:
updated_date_found = datetime.strptime(
"{}, 12:00:00".format(item[JSON_ENTRY_UPDATED_DATE]),
"%m/%d/%Y, %H:%M:%S"
)
removal_date_found = None
if (self.date_format_check(item[JSON_ENTRY_REMOVAL_DATE])):
removal_date_found = datetime.strptime(
item[JSON_ENTRY_REMOVAL_DATE], "%m/%d/%Y, %H:%M:%S"
)
else:
removal_date_found = datetime.strptime(
"{}, 12:00:00".format(item[JSON_ENTRY_REMOVAL_DATE]),
"%m/%d/%Y, %H:%M:%S"
)
new_entry = CacheEntry(
file_name=str(item[JSON_ENTRY_NAME]),
updated_date=updated_date_found,
removal_date=removal_date_found
)
meta_cache_entries.append(new_entry)
return meta_cache_entries
def date_format_check(self, date_str: str) -> bool:
"""Check the date format of the date we are attempting to process.
returns:
True if the time stamp includes the time and the date. False if non-compliant.
"""
try:
datetime.strptime(date_str, "%m/%d/%Y, %H:%M:%S")
return True
except ValueError:
return False