Source code for media_nommer.feederd.interval_tasks
"""
This module contains tasks that are executed at intervals, and is imported at
the time the server is started. Much of :doc:`../feederd`'s 'intelligence'
can be found here.
All functions prefixed with ``task_`` are task functions that are registered
with the Twisted_ reactor. All functions prefixed with ``threaded_`` are
the interesting bits that actually do things.
"""
from twisted.internet import task, reactor
from media_nommer.conf import settings
from media_nommer.utils import logger
from media_nommer.feederd.job_cache import JobCache
from media_nommer.feederd.ec2_instance_manager import EC2InstanceManager
from media_nommer.feederd import job_state_notifier
[docs]def threaded_check_for_job_state_changes():
"""
Checks the SQS queue specified in the
:py:data:`SQS_JOB_STATE_CHANGE_QUEUE_NAME <media_nommer.conf.settings.SQS_JOB_STATE_CHANGE_QUEUE_NAME>`
setting for announcements of state changes from the EC2_ instances running
:doc:`../ec2nommerd`. This lets :doc:`../feederd` know it needs to get
updated job details from the SimpleDB_ domain defined in the
:py:data:`SIMPLEDB_JOB_STATE_DOMAIN <media_nommer.conf.settings.SIMPLEDB_JOB_STATE_DOMAIN>`
setting.
"""
changed_jobs = JobCache.refresh_jobs_with_state_changes()
for job in changed_jobs:
job_state_notifier.send_notification(job)
# If jobs have completed, remove them from the job cache.
JobCache.uncache_finished_jobs()
[docs]def task_check_for_job_state_changes():
"""
Checks for job state changes in a non-blocking manner.
Calls :py:func:`threaded_check_for_job_state_changes`.
"""
reactor.callInThread(threaded_check_for_job_state_changes)
[docs]def threaded_prune_jobs():
"""
Sometimes failure happens, but a Nommer doesn't handle said failure
gracefully. Instead of state changing to ``ERROR``, it gets stuck in
some un-finished state in the SimpleDB_ domain defined in
:py:data:`SIMPLEDB_JOB_STATE_DOMAIN <media_nommer.conf.settings.SIMPLEDB_JOB_STATE_DOMAIN>`
setting.
This process finds jobs that haven't been updated in a very long time
(a day or so) that are probably dead. It marks them with an ``ABANDONED``
state, letting us know something went really wrong.
"""
JobCache.abandon_stale_jobs()
# Expire any newly abandoned jobs, too. Removes them from job cache.
JobCache.uncache_finished_jobs()
[docs]def task_prune_jobs():
"""
Prune expired or abandoned jobs from the domain specified in the
:py:data:`SIMPLEDB_JOB_STATE_DOMAIN <media_nommer.conf.settings.SIMPLEDB_JOB_STATE_DOMAIN>`
setting. Also prunes :doc:`../feederd`'s job cache.
Calls :py:func:`threaded_prune_jobs`.
"""
reactor.callInThread(threaded_prune_jobs)
[docs]def threaded_manage_ec2_instances():
"""
Looks at the current number of jobs needing encoding and compares them
to the pool of currently running EC2_ instances. Spawns more instances
as needed.
See source of
:py:meth:`media_nommer.feederd.ec2_instance_manager.EC2InstanceManager.spawn_if_needed`
for the logic behind this.
"""
EC2InstanceManager.spawn_if_needed()
[docs]def task_manage_ec2_instances():
"""
Calls the instance creation logic in a non-blocking manner.
Calls :py:func:`threaded_manage_ec2_instances`.
"""
reactor.callInThread(threaded_manage_ec2_instances)
[docs]def register_tasks():
"""
Registers all tasks. Called by the :doc:`../feederd` Twisted_ plugin.
"""
task.LoopingCall(task_check_for_job_state_changes).start(
settings.FEEDERD_JOB_STATE_CHANGE_CHECK_INTERVAL,
now=False)
task.LoopingCall(task_prune_jobs).start(
settings.FEEDERD_PRUNE_JOBS_INTERVAL,
now=True)
# Only register the instance auto-spawning if enabled.
if settings.FEEDERD_ALLOW_EC2_LAUNCHES:
logger.debug("feederd will automatically scale EC2 instances.")
task.LoopingCall(task_manage_ec2_instances).start(
settings.FEEDERD_AUTO_SCALE_INTERVAL, now=False)