Source code for freezer.scheduler.scheduler_job

"""
Copyright 2015 Hewlett-Packard

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

"""

import datetime
import json
import os
import subprocess
import tempfile
import time

from freezer.utils import utils
from oslo_config import cfg
from oslo_log import log
from six.moves import configparser


CONF = cfg.CONF
LOG = log.getLogger(__name__)


[docs]class StopState(object): @staticmethod
[docs] def stop(job, doc): job.job_doc = doc job.event = Job.NO_EVENT job.job_doc_status = Job.STOP_STATUS job.scheduler.update_job(job.id, job.job_doc) return Job.NO_EVENT
@staticmethod
[docs] def abort(job, doc): return StopState.stop(job, doc)
@staticmethod
[docs] def start(job, doc): job.job_doc = doc job.event = Job.NO_EVENT job.job_doc_status = Job.STOP_STATUS job.schedule() job.scheduler.update_job(job.id, job.job_doc) return Job.NO_EVENT
@staticmethod
[docs] def remove(job): job.unschedule() job.job_doc_status = Job.REMOVED_STATUS return Job.NO_EVENT
[docs]class ScheduledState(object): @staticmethod
[docs] def stop(job, doc): job.unschedule() job.scheduler.update_job(job.id, job.job_doc) return Job.STOP_EVENT
@staticmethod
[docs] def abort(job, doc): return StopState.stop(job, doc)
@staticmethod
[docs] def start(job, doc): job.event = Job.NO_EVENT job.scheduler.update_job(job.id, job.job_doc) return Job.NO_EVENT
@staticmethod
[docs] def remove(job): job.unschedule() job.job_doc_status = Job.REMOVED_STATUS return Job.NO_EVENT
[docs]class RunningState(object): @staticmethod
[docs] def stop(job, doc): job.event = Job.STOP_EVENT return Job.NO_EVENT
@staticmethod
[docs] def abort(job, doc): job.event = Job.ABORT_EVENT job.scheduler.update_job(job.id, job.job_doc) return Job.ABORTED_RESULT
@staticmethod
[docs] def start(job, doc): job.event = Job.NO_EVENT job.scheduler.update_job(job.id, job.job_doc) return Job.NO_EVENT
@staticmethod
[docs] def remove(job): job.event = Job.REMOVE_EVENT return Job.NO_EVENT
[docs]class Job(object): NO_EVENT = '' STOP_EVENT = 'stop' START_EVENT = 'start' ABORT_EVENT = 'abort' REMOVE_EVENT = 'remove' STOP_STATUS = 'stop' SCHEDULED_STATUS = 'scheduled' RUNNING_STATUS = 'running' REMOVED_STATUS = 'removed' COMPLETED_STATUS = 'completed' FAIL_RESULT = 'fail' SUCCESS_RESULT = 'success' ABORTED_RESULT = 'aborted' TIME_NULL = -1 @staticmethod
[docs] def create(scheduler, executable, job_doc): job = Job(scheduler, executable, job_doc) if job.job_doc_status in ['running', 'scheduled']: LOG.warning('Resetting {0} status from job {1}' .format(job.job_doc_status, job.id)) if job.job_doc_status == 'stop' and not job.event: LOG.info('Job {0} was stopped.'.format(job.id)) job.event = Job.STOP_EVENT elif not job.event: LOG.info('Autostart Job {0}'.format(job.id)) job.event = Job.START_EVENT return job
def __init__(self, scheduler, executable, job_doc): self.scheduler = scheduler self.executable = executable self.job_doc = job_doc self.process = None self.state = StopState
[docs] def remove(self): with self.scheduler.lock: # delegate to state object LOG.info('REMOVE job {0}'.format(self.id)) self.state.remove(self)
@property def id(self): return self.job_doc['job_id'] @property def session_id(self): return self.job_doc.get('session_id', '') @session_id.setter def session_id(self, value): self.job_doc['session_id'] = value @property def session_tag(self): return self.job_doc.get('session_tag', 0) @session_tag.setter def session_tag(self, value): self.job_doc['session_tag'] = value @property def event(self): return self.job_doc['job_schedule'].get('event', '') @event.setter def event(self, value): self.job_doc['job_schedule']['event'] = value @property def job_doc_status(self): return self.job_doc['job_schedule'].get('status', '') @job_doc_status.setter def job_doc_status(self, value): self.job_doc['job_schedule']['status'] = value @property def result(self): return self.job_doc['job_schedule'].get('result', '') @result.setter def result(self, value): self.job_doc['job_schedule']['result'] = value
[docs] def can_be_removed(self): return self.job_doc_status == Job.REMOVED_STATUS
@staticmethod
[docs] def save_action_to_file(action, f): parser = configparser.ConfigParser() parser.add_section('action') for action_k, action_v in action.items(): parser.set('action', action_k, action_v) parser.write(f) f.seek(0)
@property def schedule_date(self): return self.job_doc['job_schedule'].get('schedule_date', '') @property def schedule_interval(self): return self.job_doc['job_schedule'].get('schedule_interval', '') @property def schedule_start_date(self): return self.job_doc['job_schedule'].get('schedule_start_date', '') @property def schedule_end_date(self): return self.job_doc['job_schedule'].get('schedule_end_date', '') @property def schedule_cron_fields(self): cron_fields = ['year', 'month', 'day', 'week', 'day_of_week', 'hour', 'minute', 'second'] cron_schedule = {} for cron in self.job_doc['job_schedule'].keys(): if cron.startswith('schedule_'): cron_key = cron.split('_', 1)[1] cron_schedule.update({ cron_key: self.job_doc['job_schedule'][cron]}) return {key: value for key, value in cron_schedule.items() if key in cron_fields} @property def scheduled(self): return self.scheduler.is_scheduled(self.id)
[docs] def get_schedule_args(self): def get_start_date(date): # start_date format "%Y-%m-%dT%H:%M:%S" now = datetime.datetime.now() start_date = now + datetime.timedelta(0, 2, 0) if (utils.date_to_timestamp(date) > utils.date_to_timestamp(now.isoformat().split('.')[0])): start_date = datetime.datetime.strptime( date, "%Y-%m-%dT%H:%M:%S") return start_date def get_end_date(start, end): # start end format "%Y-%m-%dT%H:%M:%S" end_date = datetime.datetime.strptime(end, "%Y-%m-%dT%H:%M:%S") if (utils.date_to_timestamp(start) > utils.date_to_timestamp(end)): end_date = None return end_date kwargs_date = {} if self.schedule_start_date: kwargs_date.update({ 'start_date': get_start_date(self.schedule_start_date) }) if self.schedule_end_date: end_date = get_end_date(self.schedule_start_date, self.schedule_end_date) kwargs_date.update({ 'end_date': end_date }) if self.schedule_date: return {'trigger': 'date', 'run_date': self.schedule_date} elif self.schedule_interval: kwargs = {'trigger': 'interval'} kwargs.update(kwargs_date) if self.schedule_interval == 'continuous': kwargs.update({'seconds': 1}) else: val, unit = self.schedule_interval.split(' ') kwargs.update({unit: int(val)}) return kwargs elif self.schedule_cron_fields: kwargs = {'trigger': 'cron'} kwargs.update(kwargs_date) cron_fields = self.schedule_cron_fields kwargs.update(cron_fields) return kwargs else: # no scheduling information, schedule to start within a few seconds return {'trigger': 'date', 'run_date': datetime.datetime.now() + datetime.timedelta(0, 2, 0)}
[docs] def process_event(self, job_doc): with self.scheduler.lock: next_event = job_doc['job_schedule'].get('event', '') while next_event: if next_event == Job.STOP_EVENT: LOG.info('JOB {0} event: STOP'.format(self.id)) next_event = self.state.stop(self, job_doc) elif next_event == Job.START_EVENT: LOG.info('JOB {0} event: START'.format(self.id)) next_event = self.state.start(self, job_doc) elif next_event == Job.ABORT_EVENT: LOG.info('JOB {0} event: ABORT'.format(self.id)) next_event = self.state.abort(self, job_doc) elif next_event == Job.ABORTED_RESULT: LOG.info('JOB {0} aborted.'.format(self.id)) break
[docs] def upload_metadata(self, metadata_string): try: metadata = json.loads(metadata_string) if metadata: metadata['job_id'] = self.id self.scheduler.upload_metadata(metadata) LOG.info("Job {0}, freezer action metadata uploaded" .format(self.id)) except Exception as e: LOG.error('metrics upload error: {0}'.format(e))
[docs] def execute_job_action(self, job_action): max_tries = (job_action.get('max_retries', 0) + 1) tries = max_tries freezer_action = job_action.get('freezer_action', {}) max_retries_interval = job_action.get('max_retries_interval', 60) action_name = freezer_action.get('action', '') config_file_name = None while tries: with tempfile.NamedTemporaryFile(delete=False) as config_file: self.save_action_to_file(freezer_action, config_file) config_file_name = config_file.name freezer_command = '{0} --metadata-out - --config {1}'.\ format(self.executable, config_file.name) self.process = subprocess.Popen(freezer_command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ.copy()) # store the pid for this process in the api try: self.job_doc['job_schedule']['current_pid'] = \ self.process.pid self.scheduler.update_job(self.job_doc['job_id'], self.job_doc) except Exception as error: LOG.error("Error saving the process id {}".format(error)) output, error = self.process.communicate() # ensure the tempfile gets deleted utils.delete_file(config_file_name) if error: LOG.error("Freezer client error: {0}".format(error)) elif output: self.upload_metadata(output) if self.process.returncode == -15: # This means the job action was aborted by the scheduler LOG.warning('Freezer-agent was killed by the scheduler. ' 'Cleanup should be done manually: container, ' 'mountpoint and lvm snapshots.') return Job.ABORTED_RESULT elif self.process.returncode: # ERROR tries -= 1 if tries: LOG.warning('Job {0} failed {1} action,' ' retrying in {2} seconds' .format(self.id, action_name, max_retries_interval)) time.sleep(max_retries_interval) else: # SUCCESS LOG.info('Job {0} action {1}' ' returned success exit code'. format(self.id, action_name)) return Job.SUCCESS_RESULT LOG.error('Job {0} action {1} failed after {2} tries' .format(self.id, action_name, max_tries)) return Job.FAIL_RESULT
[docs] def contains_exec(self): jobs = self.job_doc.get('job_actions') for job in jobs: freezer_action = job.get('freezer_action') action = freezer_action.get('action') if action == 'exec': return True return False
[docs] def update_job_schedule_doc(self, **kwargs): """ Updates the job_schedule section of the job doc with the provided keyword args. No checks about accepted key/values are being made here since they may vary in the future. :param kwargs: keyword args to add :return: None """ job_schedule = self.job_doc['job_schedule'] job_schedule.update(kwargs)
[docs] def execute(self): result = Job.SUCCESS_RESULT with self.scheduler.lock: LOG.info('job {0} running'.format(self.id)) self.state = RunningState self.update_job_schedule_doc(status=Job.RUNNING_STATUS, result="", time_started=int(time.time()), time_ended=Job.TIME_NULL) self.scheduler.update_job_schedule( self.id, self.job_doc['job_schedule']) self.start_session() # if the job contains exec action and the scheduler passes the # parameter --disable-exec job execution should fail if self.contains_exec() and CONF.disable_exec: LOG.info("Job {0} failed because it contains exec action " "and exec actions are disabled by scheduler" .format(self.id)) self.result = Job.FAIL_RESULT self.finish() return for job_action in self.job_doc.get('job_actions', []): if job_action.get('mandatory', False) or\ (result == Job.SUCCESS_RESULT): action_result = self.execute_job_action(job_action) if action_result == Job.FAIL_RESULT: result = Job.FAIL_RESULT if action_result == Job.ABORTED_RESULT: result = Job.ABORTED_RESULT else: freezer_action = job_action.get('freezer_action', {}) action_name = freezer_action.get('action', '') LOG.warning("skipping {0} action". format(action_name)) self.result = result self.finish()
[docs] def finish(self): self.update_job_schedule_doc(time_ended=int(time.time())) self.end_session(self.result) with self.scheduler.lock: if self.event == Job.REMOVE_EVENT: self.unschedule() self.job_doc_status = Job.REMOVED_STATUS return if not self.scheduled: self.job_doc_status = Job.COMPLETED_STATUS self.state = StopState self.scheduler.update_job(self.id, self.job_doc) return if self.event in [Job.STOP_EVENT, Job.ABORT_EVENT]: self.unschedule() self.job_doc_status = Job.COMPLETED_STATUS self.scheduler.update_job(self.id, self.job_doc) else: self.job_doc_status = Job.SCHEDULED_STATUS self.state = ScheduledState self.scheduler.update_job_schedule( self.id, self.job_doc['job_schedule'])
[docs] def start_session(self): if not self.session_id: return retry = 5 while retry: try: resp = self.scheduler.start_session(self.session_id, self.id, self.session_tag) if resp['result'] == 'success': self.session_tag = resp['session_tag'] return except Exception as e: LOG.error('Error while starting session {0}. {1}'. format(self.session_id, e)) LOG.warning('Retrying to start session {0}'. format(self.session_id)) retry -= 1 LOG.error('Unable to start session {0}'.format(self.session_id))
[docs] def end_session(self, result): if not self.session_id: return retry = 5 while retry: try: resp = self.scheduler.end_session(self.session_id, self.id, self.session_tag, result) if resp['result'] == 'success': return except Exception as e: LOG.error('Error while ending session {0}. {1}'. format(self.session_id, e)) LOG.warning('Retrying to end session {0}'. format(self.session_id)) retry -= 1 LOG.error('Unable to end session {0}'.format(self.session_id))
[docs] def schedule(self): try: kwargs = self.get_schedule_args() self.scheduler.add_job(self.execute, id=self.id, executor='threadpool', misfire_grace_time=3600, **kwargs) except Exception as e: LOG.error("Unable to schedule job {0}: {1}". format(self.id, e)) LOG.info('scheduler job with parameters {0}'.format(kwargs)) if self.scheduled: self.job_doc_status = Job.SCHEDULED_STATUS self.state = ScheduledState else: # job not scheduled or already started and waiting for lock self.job_doc_status = Job.COMPLETED_STATUS self.state = StopState
[docs] def unschedule(self): try: # already executing job are not present in the apscheduler list self.scheduler.remove_job(job_id=self.id) except Exception: pass self.event = Job.NO_EVENT self.job_doc_status = Job.STOP_STATUS self.state = StopState
[docs] def terminate(self): if self.process: self.process.terminate()
[docs] def kill(self): if self.process: self.process.kill()