Source code for media_nommer.ec2nommerd.nommers.base_nommer

"""
Classes in this module serve as a basis for Nommers. This should be thought
of as a protocol or a foundation to assist in maintaining a consistent API
between Nommers.
"""
import os
import traceback
import tempfile
import shutil
from media_nommer.utils import logger
from media_nommer.ec2nommerd.node_state import NodeStateManager
from media_nommer.core.storage_backends import get_backend_for_uri

[docs]class BaseNommer(object): """ This is a base class that can be sub-classed by each Nommer to serve as a foundation. Required methods raise a NotImplementedError exception by default, unless overridden by child classes. :ivar EncodingJob job: The encoding job this nommer is handling. """ def __init__(self, job): self.job = job
[docs] def onomnom(self): """ Start nomming. If you're going to override this, make sure to follow the same basic job state updating flow if possible. """ # Have to create a temporary directory, since ffmpeg's libx264 encoder # doesn't obey alternate logfile paths. I'm sure other software # has similar limitations, so let's just not take any chances. self.temp_cwd = tempfile.mkdtemp() try: self._onomnom() except: # If we run into any un-handled exceptions, error out the job # and set as its state details. self.wrapped_set_job_state('ERROR', details=traceback.format_exc()) traceback.print_exc() # Clean up the temporary CWD used during nomming. shutil.rmtree(self.temp_cwd, ignore_errors=True)
def _onomnom(self): """ Call your encoder here, do work. Make sure to update job state as it progresses, if you can. Needs to set one of the two job states: FINISHED, ERROR """ raise NotImplementedError
[docs] def wrapped_set_job_state(self, *args, **kwargs): """ Wraps set_job_state() to perform extra actions before and/or after job state updates. :param str new_state: The job state to set. """ # Tracks the fact that we did something, prevents the node from # terminating itself. NodeStateManager.i_did_something() self.job.set_job_state(*args, **kwargs)
[docs] def download_source_file(self): """ Download the source file to a temporary file. """ self.wrapped_set_job_state('DOWNLOADING') # This is the remote path. file_uri = self.job.source_path logger.debug("BaseNommer.download_source_file(): " \ "Attempting to download %s" % file_uri) # Figure out which backend to use for the protocol in the URI. storage = get_backend_for_uri(file_uri) # Create a temporary file which will be auto deleted when # garbage collected. fobj = tempfile.NamedTemporaryFile(mode='w+b', delete=True) # Using the correct backend, download the file to the given # file-like object. storage.download_file(file_uri, fobj) # flush and fsync to force writing to the file object. Doesn't always # happen otherwise. fobj.flush() os.fsync(fobj.fileno()) logger.debug("BaseNommer.download_source_file(): " \ "Downloaded %s to %s" % (file_uri, fobj.name)) # As soon as this fobj is garbage collected, it is closed(). Be # careful to continue its existence if you need it. return fobj
[docs] def upload_to_destination(self, fobj): """ Upload the output file to the destination specified by the user. """ self.wrapped_set_job_state('UPLOADING') file_uri = self.job.dest_path logger.debug("BaseNommer.upload_to_destination(): " \ "Attempting to upload %s to %s" % (fobj.name, file_uri)) storage = get_backend_for_uri(file_uri) storage.upload_file(file_uri, fobj) logger.debug("BaseNommer.upload_to_destination(): " \ "Finished uploading %s to %s" % (fobj.name, file_uri))