Source code for media_nommer.ec2nommerd.node_state
"""
Contains the :py:class:`NodeStateManager` class, which is an abstraction layer
for storing and communicating the status of EC2_ nodes.
"""
import urllib2
import datetime
import boto
from twisted.internet import reactor
from media_nommer.conf import settings
from media_nommer.utils import logger
from media_nommer.utils.compat import total_seconds
[docs]class NodeStateManager(object):
"""
Tracks this node's state, reports it to :doc:`../feederd`, and terminates
itself if certain conditions of inactivity are met.
"""
last_dtime_i_did_something = datetime.datetime.now()
# Used for lazy-loading the SDB connection. Do not refer to directly.
__aws_sdb_connection = None
# Used for lazy-loading the SDB domain. Do not refer to directly.
__aws_sdb_nommer_state_domain = None
# Used for lazy-loading the EC2 connection. Do not refer to directly.
__aws_ec2_connection = None
# Store the instance ID for this EC2 node (if not local).
__instance_id = None
@classmethod
def _aws_ec2_connection(cls):
"""
Lazy-loading of the EC2 boto connection. Refer to this instead of
referencing cls.__aws_ec2_connection directly.
:returns: A boto connection to Amazon's EC2 interface.
"""
if not cls.__aws_ec2_connection:
cls.__aws_ec2_connection = boto.connect_ec2(
settings.AWS_ACCESS_KEY_ID,
settings.AWS_SECRET_ACCESS_KEY)
return cls.__aws_ec2_connection
@classmethod
def _aws_sdb_connection(cls):
"""
Lazy-loading of the SimpleDB boto connection. Refer to this instead of
referencing cls.__aws_sdb_connection directly.
:returns: A boto connection to Amazon's SimpleDB interface.
"""
if not cls.__aws_sdb_connection:
cls.__aws_sdb_connection = boto.connect_sdb(
settings.AWS_ACCESS_KEY_ID,
settings.AWS_SECRET_ACCESS_KEY)
return cls.__aws_sdb_connection
@classmethod
def _aws_sdb_nommer_state_domain(cls):
"""
Lazy-loading of the SimpleDB boto domain. Refer to this instead of
referencing cls.__aws_sdb_nommer_state_domain directly.
:returns: A boto SimpleDB domain for this workflow.
"""
if not cls.__aws_sdb_nommer_state_domain:
cls.__aws_sdb_nommer_state_domain = cls._aws_sdb_connection().create_domain(
settings.SIMPLEDB_EC2_NOMMER_STATE_DOMAIN)
return cls.__aws_sdb_nommer_state_domain
@classmethod
[docs] def get_instance_id(cls, is_local=False):
"""
Determine this EC2 instance's unique instance ID. Lazy load this, and
avoid further re-queries after the first one.
:param bool is_local: When True, don't try to hit EC2's meta data server,
When False, just make up a unique ID.
:rtype: str
:returns: The EC2 instance's ID.
"""
if not cls.__instance_id:
if is_local:
cls.__instance_id = 'local-dev'
else:
aws_meta_url = 'http://169.254.169.254/latest/meta-data/instance-id'
response = urllib2.urlopen(aws_meta_url)
cls.__instance_id = response.read()
return cls.__instance_id
@classmethod
[docs] def is_ec2_instance(cls):
"""
Determine whether this is an EC2 instance or not.
:rtype: bool
:returns: ``True`` if this is an EC2 instance, ``False`` if otherwise.
"""
return cls.get_instance_id() != 'local-dev'
@classmethod
[docs] def send_instance_state_update(cls, state='ACTIVE'):
"""
Sends a status update to feederd through SimpleDB. Lets the daemon
know how many jobs this instance is crunching right now. Also updates
a timestamp field to let feederd know how long it has been since the
instance's last check-in.
:keyword str state: If this EC2_ instance is anything but ``ACTIVE``,
pass the state here. This is useful during node termination.
"""
if cls.is_ec2_instance():
instance_id = cls.get_instance_id()
item = cls._aws_sdb_nommer_state_domain().new_item(instance_id)
item['id'] = instance_id
item['active_jobs'] = cls.get_num_active_threads() - 1
item['last_report_dtime'] = datetime.datetime.now()
item['state'] = state
item.save()
@classmethod
[docs] def contemplate_termination(cls, thread_count_mod=0):
"""
Looks at how long it's been since this worker has done something, and
decides whether to self-terminate.
:param int thread_count_mod: Add this to the amount returned by the call
to :py:meth:`get_num_active_threads`. This is useful when calling
this method from a non-encoder thread.
:rtype: bool
:returns: ``True`` if this instance terminated itself, ``False``
if not.
"""
if not cls.is_ec2_instance():
# Developing locally, don't go here.
return False
# This is -1 since this is also a thread doing the contemplation.
# This would always be 1, even if we had no jobs encoding, if we
# didn't take into account this thread.
num_active_threads = cls.get_num_active_threads() + thread_count_mod
if num_active_threads > 0:
# Encoding right now, don't terminate.
return False
tdelt = datetime.datetime.now() - cls.last_dtime_i_did_something
# Total seconds of inactivity.
inactive_secs = total_seconds(tdelt)
# If we're over the inactivity threshold...
if inactive_secs > settings.NOMMERD_MAX_INACTIVITY:
instance_id = cls.get_instance_id()
conn = cls._aws_ec2_connection()
# Find this particular EC2 instance via boto.
reservations = conn.get_all_instances(instance_ids=[instance_id])
# This should only be one match, but in the interest of
# playing along...
for reservation in reservations:
for instance in reservation.instances:
# Here's the instance, terminate it.
logger.info("Goodbye, cruel world.")
cls.send_instance_state_update(state='TERMINATED')
instance.terminate()
# Seeya later!
return True
# Continue existence, no termination.
return False
@classmethod
[docs] def get_num_active_threads(cls):
"""
Checks the reactor's threadpool to see how many threads are currently
working. This can be used to determine how busy this node is.
:rtype: int
:returns: The number of active threads.
"""
return len(reactor.getThreadPool().working)
@classmethod
[docs] def i_did_something(cls):
"""
Pat ourselves on the back each time we do something.
Used for determining whether this node's continued existence is
necessary anymore in :py:meth:`contemplate_termination`.
"""
cls.last_dtime_i_did_something = datetime.datetime.now()