Source code for media_nommer.feederd.job_cache

"""
Basic job caching module.
"""
import datetime
from media_nommer.conf import settings
from media_nommer.utils import logger
from media_nommer.core.job_state_backend import JobStateBackend
from media_nommer.utils.compat import total_seconds

[docs]class JobCache(dict): """ Caches currently active :py:class:`media_nommer.core.job_state_backend.EncodingJob` objects. This is presently only un-finished jobs, as defined by :py:attr:`media_nommer.core.job_state_backend.JobStateBackend.FINISHED_STATES`. """ CACHE = {} @classmethod
[docs] def update_job(cls, job): """ Updates a job in the cache. Creates the key if it doesn't already exist. :type job: :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :param job: The job to update (or create) a cache entry for. """ cls.CACHE[job.unique_id] = job
@classmethod
[docs] def get_job(cls, job): """ Given a job's unique id, return the job object from the cache. :type job: :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :param job: A job's unique ID or a job object. :rtype: :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :returns: The cached encoding job. """ if isinstance(job, basestring): key = job else: key = job.unique_id return cls.CACHE[key]
@classmethod
[docs] def remove_job(cls, job): """ Removes a job from the cache. :type job: ``str`` or :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :param job: A job's unique ID or a job object. """ if isinstance(job, basestring): key = job else: key = job.unique_id del cls.CACHE[key]
@classmethod
[docs] def is_job_cached(cls, job): """ Given a job object or a unique id, return True if said job is cached, and False if not. :type job: ``str`` or :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :param job: A job's unique ID or a job object. :rtype: bool :returns: ``True`` if the given job exists in the cache, ``False`` if otherwise. """ if isinstance(job, basestring): key = job else: key = job.unique_id return cls.CACHE.has_key(key)
@classmethod
[docs] def get_cached_jobs(cls): """ Returns a dict of all cached jobs. The keys are unique IDs, the values are the job objects. :rtype: dict :returns: A dictionary with the keys being unique IDs of cached jobs, and the values being :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` instances. """ return cls.CACHE
@classmethod
[docs] def get_jobs_with_state(cls, state): """ Given a valid job state (refer to :py:attr:`media_nommer.core.job_state_backend.JobStateBackend.JOB_STATES`), return all jobs that currently have this state. :param str state: The job state to query by. :rtype: ``list`` of :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :returns: A list of jobs matching the given state. """ return [job for id, job in cls.get_cached_jobs.items() if job.job_state == state]
@classmethod
[docs] def load_recent_jobs_at_startup(cls): """ Loads all of the un-finished jobs into the job cache. This is performed when :doc:`../feederd` starts. """ # Use print here because logging isn't fully configured at this point? print("Populating job cache from SimpleDB.") jobs = JobStateBackend.get_unfinished_jobs() for job in jobs: cls.update_job(job) print("Jobs loaded from SDB to cache:") for job in jobs: print('* %s (State: %s -- Finished: %s)' % ( job.unique_id, job.job_state, job.is_finished()) )
@classmethod
[docs] def refresh_jobs_with_state_changes(cls): """ Looks at the state SQS queue specified by the :py:data:`SQS_JOB_STATE_CHANGE_QUEUE_NAME <media_nommer.conf.settings.SQS_JOB_STATE_CHANGE_QUEUE_NAME>` setting and refreshes any jobs that have changed. This simply reloads the job's details from SimpleDB_. :rtype: ``list`` of :py:class:`EncodingJob <media_nommer.core.job_state_backend.EncodingJob>` :returns: A list of changed :py:class:`EncodingJob` objects. """ logger.debug("JobCache.refresh_jobs_with_state_changes(): " \ "Checking state change queue.") # Pops up to 10 changed jobs that we think may have changed. There are # some false alarms in here, whch brings us to... popped_changed_jobs = JobStateBackend.pop_state_changes_from_queue(10) # A temporary list that stores the jobs that actually changed. This # will be returned at the completion of this method's path. changed_jobs = [] if popped_changed_jobs: logger.debug("Potential job state changes found: %s" % popped_changed_jobs) for job in popped_changed_jobs: if cls.is_job_cached(job): current_state = cls.get_job(job).job_state new_state = job.job_state if current_state != new_state: logger.info("* Job state changed %s: %s -> %s" % ( job.unique_id, # Current job state in cache current_state, # New incoming job state new_state, )) cls.update_job(job) # This one actually changed, append this for returning. changed_jobs.append(job) if new_state == 'ERROR': logger.error('Error trace from ec2nommerd:') logger.error(job.job_state_details) return changed_jobs
@classmethod
[docs] def abandon_stale_jobs(cls): """ On rare occasions, nommers crash so hard that no ``ERROR`` state change is made, and the job just gets stuck in a permanent unfinished state (``DOWNLOADING``, ``ENCODING``, ``UPLOADING``, etc). Rather than hang on to these indefinitely, abandon them by setting their state to ``ABANDONED``. The threshold for which jobs are considered abandoned is configurable via the :py:data:`FEEDERD_ABANDON_INACTIVE_JOBS_THRESH <media_nommer.conf.settings.FEEDERD_ABANDON_INACTIVE_JOBS_THRESH>` setting. """ logger.debug("JobCache.abandon_stale_jobs(): "\ "Looking for stale jobs.") for id, job in cls.get_cached_jobs().items(): if not job.is_finished(): now_dtime = datetime.datetime.now() last_mod = job.last_modified_dtime tdelta = now_dtime - last_mod inactive_seconds = total_seconds(tdelta) if inactive_seconds >= settings.FEEDERD_ABANDON_INACTIVE_JOBS_THRESH: cls.remove_job(job) job.set_job_state('ABANDONED', job.job_state_details)
@classmethod
[docs] def uncache_finished_jobs(cls): """ Clears jobs from the cache after they have been finished. TODO: We'll eventually want to clear jobs from the cache that haven't been accessed by the web API recently. """ for id, job in cls.CACHE.items(): if job.is_finished(): logger.info("Removing job %s from job cache." % id) cls.remove_job(id)