Source code for media_nommer.ec2nommerd.interval_tasks

"""
This module contains tasks that are executed at intervals, and is imported at
the time the server is started. The intervals at which the tasks run
are configurable via :py:mod:`media_nommer.conf.settings`.

All functions prefixed with ``task_`` are task functions that are registered
with the Twisted_ reactor. All functions prefixed with ``threaded_`` are
the interesting bits that actually do things.
"""
from twisted.internet import task, reactor
from media_nommer.conf import settings
from media_nommer.utils import logger
from media_nommer.core.job_state_backend import JobStateBackend
from media_nommer.ec2nommerd.node_state import NodeStateManager

[docs]def threaded_encode_job(job): """ Given a job, run it through its encoding workflow in a non-blocking manner. """ # Update the timestamp for when the node last did something so it # won't terminate itself. NodeStateManager.i_did_something() job.nommer.onomnom()
[docs]def task_check_for_new_jobs(): """ Looks at the number of currently active threads and compares it against the :py:data:`MAX_ENCODING_JOBS_PER_EC2_INSTANCE <media_nommer.conf.settings.MAX_ENCODING_JOBS_PER_EC2_INSTANCE>` setting. If we are under the max, fire up another thread for encoding additional job(s). The interval at which :doc:`../ec2nommerd` checks for new jobs is determined by the :py:data:`NOMMERD_NEW_JOB_CHECK_INTERVAL <media_nommer.conf.settings.NOMMERD_NEW_JOB_CHECK_INTERVAL>` setting. Calls :py:func:`threaded_encode_job` for any jobs to encode. """ num_active_threads = NodeStateManager.get_num_active_threads() max_threads = settings.MAX_ENCODING_JOBS_PER_EC2_INSTANCE num_jobs_to_pop = max(0, max_threads - num_active_threads) if num_jobs_to_pop > 0: # We have more room for encoding threads, determine how many. logger.debug("task_check_for_new_jobs: " \ "Popping up to %d new jobs." % num_jobs_to_pop) # This is an iterable of BaseEncodingJob sub-classed instances for # each job returned from the queue. jobs = JobStateBackend.pop_new_jobs_from_queue(num_jobs_to_pop) if jobs: logger.debug("* Popped %d jobs from the queue." % len(jobs)) for job in jobs: # For each job returned, render in another thread. logger.debug("* Starting encoder thread for job: %s" % job.unique_id) reactor.callInThread(threaded_encode_job, job)
[docs]def threaded_heartbeat(): """ Fires off a threaded task to check in with feederd via SimpleDB_. There is a domain that contains all of the running EC2_ instances and their unique IDs, along with some state data. The interval at which heartbeats occur is determined by the :py:data:`NOMMERD_HEARTBEAT_INTERVAL <media_nommer.conf.settings.NOMMERD_HEARTBEAT_INTERVAL` setting. """ if settings.NOMMERD_TERMINATE_WHEN_IDLE: # thread_count_mod factors out this thread when counting active threads. is_terminated = NodeStateManager.contemplate_termination(thread_count_mod= -1) else: is_terminated = False if not is_terminated: NodeStateManager.send_instance_state_update()
[docs]def task_heartbeat(): """ Checks in with feederd in a non-blocking manner via :py:meth:`threaded_heartbeat`. Calls :py:func:`threaded_heartbeat`. """ reactor.callInThread(threaded_heartbeat)
[docs]def register_tasks(): """ Registers all tasks. Called by the :doc:`../ec2nommerd` Twisted_ plugin. """ task.LoopingCall(task_check_for_new_jobs).start( settings.NOMMERD_NEW_JOB_CHECK_INTERVAL, now=True) task.LoopingCall(task_heartbeat).start(settings.NOMMERD_HEARTBEAT_INTERVAL, now=False)