Source code for seesaw.runner
'''Pipeline execution.'''
import datetime
import functools
import os.path
import sys
import seesaw.util
from seesaw.config import realize
from seesaw.event import Event
from seesaw.item import Item
from tornado import ioloop
[docs]class Runner(object):
'''Executes and manages the lifetime of :class:`Pipeline` instances.'''
def __init__(self, stop_file=None, concurrent_items=1, max_items=None,
keep_data=False):
self.pipeline = None
self.concurrent_items = concurrent_items
self.max_items = max_items
self.keep_data = keep_data
self.item_count = 0
self.active_items = set()
self.stop_flag = False
self.stop_file = stop_file
self.initial_stop_file_mtime = self.stop_file_mtime()
self.on_status = Event()
self.on_create_item = Event()
self.on_pipeline_start_item = Event()
self.on_pipeline_finish_item = Event()
self.on_finish = Event()
if stop_file:
ioloop.PeriodicCallback(self.check_stop_file, 5000).start()
[docs] def set_current_pipeline(self, pipeline):
old_pipeline = self.pipeline
if pipeline:
pipeline.on_start_item += self._item_starting
pipeline.on_finish_item += self._item_finished
self.pipeline = pipeline
if old_pipeline:
# stop any cancellable items in the previous pipeline
old_pipeline.cancel_items()
[docs] def is_active(self):
return len(self.active_items) > 0
[docs] def start(self):
self.add_items()
[docs] def stop_gracefully(self):
print("Stopping when current tasks are completed...")
self.stop_flag = True
self.pipeline.cancel_items()
self.pipeline.on_stop_requested()
self.initial_stop_file_mtime = self.stop_file_mtime()
self.on_status(self, "stopping")
[docs] def keep_running(self):
print("Keep running...")
self.stop_flag = False
self.pipeline.on_stop_canceled()
self.initial_stop_file_mtime = self.stop_file_mtime()
self.on_status(self, "running")
[docs] def check_stop_file(self):
if self.stop_file_changed():
self.stop_gracefully()
[docs] def should_stop(self):
return self.stop_flag or self.stop_file_changed()
[docs] def stop_file_changed(self):
current_stop_file_mtime = self.stop_file_mtime()
if current_stop_file_mtime:
return self.initial_stop_file_mtime is None \
or self.initial_stop_file_mtime < current_stop_file_mtime
else:
return False
[docs] def stop_file_mtime(self):
if self.stop_file and os.path.exists(self.stop_file):
return os.path.getmtime(self.stop_file)
else:
return None
[docs] def add_items(self):
if self.pipeline:
items_required = int(realize(self.concurrent_items))
while len(self.active_items) < items_required:
if self.max_items and self.max_items <= self.item_count:
return
self.item_count += 1
item_id = "{0}-{1}".format(
seesaw.util.unique_id_str(), self.item_count)
item = Item(
pipeline=self.pipeline,
item_id=item_id,
item_number=self.item_count,
keep_data=self.keep_data
)
self.on_create_item(self, item)
self.active_items.add(item)
self.pipeline.enqueue(item)
def _item_starting(self, pipeline, item):
self.on_pipeline_start_item(self, pipeline, item)
def _item_finished(self, pipeline, item):
if item.failed:
item.log_output("Waiting 10 seconds...")
ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(seconds=10),
functools.partial(
self._item_finished_without_delay, pipeline, item)
)
else:
self._item_finished_without_delay(pipeline, item)
def _item_finished_without_delay(self, pipeline, item):
self.on_pipeline_finish_item(self, pipeline, item)
self.active_items.remove(item)
def add_more_items():
if not self.should_stop():
self.add_items()
if len(self.active_items) == 0:
self.on_finish(self)
ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(),
add_more_items
)
[docs]class SimpleRunner(Runner):
'''Executes a single class:`Pipeline` instance.'''
def __init__(self, pipeline, stop_file=None, concurrent_items=1,
max_items=None, keep_data=False):
Runner.__init__(
self, stop_file=stop_file,
concurrent_items=concurrent_items, max_items=max_items,
keep_data=keep_data)
self.set_current_pipeline(pipeline)
self.on_create_item += self._handle_create_item
self.on_finish += self._stop_ioloop
[docs] def start(self):
Runner.start(self)
ioloop.IOLoop.instance().start()
self.pipeline.on_cleanup()
def _stop_ioloop(self, dummy):
ioloop.IOLoop.instance().stop()
[docs] def forced_stop(self):
print("Stopping immediately...")
# TODO perhaps the subprocesses should be killed
ioloop.IOLoop.instance().stop()
def _handle_create_item(self, dummy, item):
item.on_output += self._handle_item_output
def _handle_item_output(self, item, data):
try:
sys.stdout.write(data)
except UnicodeError:
sys.stdout.write(data.encode('ascii', 'replace').decode('ascii'))
sys.stdout.flush()