Source code for dripline.core.entity

import datetime
import functools
import types

import numbers

import scarab

from .endpoint import Endpoint
from dripline.core import MsgAlert
__all__ = []

import logging
logger = logging.getLogger(__name__)

def _log_on_set_decoration(self, fun):
    '''
    requires get_on_set be true; log the result of the on_get via an alert message
    '''
    @functools.wraps(fun)
    def wrapper(*args, **kwargs):
        result = fun(*args, **kwargs)

        values = {}
        if result != [u'']:
            if isinstance(result, dict):
                values.update(result)
            else:
                values.update({'value_raw': result})
        else:
            values.update({'value_raw': args[0]})
        logger.debug('set done, now log')
        self.log_a_value(values)
        return result
    return wrapper

def _get_on_set_decoration(self, fun):
    '''
    make a call to on_get immediately after on_set returns, returning the result
    '''
    @functools.wraps(fun)
    def wrapper(*args, **kwargs):
        fun(*args, **kwargs)
        logger.debug("set, now get_on_set")
        result = self.on_get()
        return result
    return wrapper

__all__.append("Entity")
[docs]class Entity(Endpoint): ''' Subclass of Endpoint which adds logic related to logging and confirming values. In particular, there is support for: get_on_set -> setting the endpoint's value returns a get() result rather than an empty success (particularly useful for devices which may round assignment values) log_on_set -> further extends get_on_set to send an alert message in addtion to returning the value in a reply log_interval -> leverages the scheduler class to log the on_get result at a regular cadence ''' #check_on_set -> allows for more complex logic to confirm successful value updates # (for example, the success condition may be measuring another endpoint) def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_interval=0, log_on_set=False, calibration=None, **kwargs): ''' Args: get_on_set: if true, calls to on_set are immediately followed by an on_get, which is returned log_routing_key_prefix: first term in routing key used in alert messages which log values log_interval: how often to log the Entity's value. If 0 then scheduled logging is disabled; if a number, interpreted as number of seconds; if a dict, unpacked as arguments to the datetime.time_delta initializer; if a datetime.timedelta taken as the new value log_on_set: if true, always call log_a_value() immediately after on_set **Note:** requires get_on_set be true, overrides must be equivalent calibration (string || dict) : if string, updated with raw on_get() result via str.format() in @calibrate decorator, used to populate raw and calibrated values fields of a result payload. If a dictionary, the raw result is used to index the dict with the calibrated value being the dict's value. ''' Endpoint.__init__(self, **kwargs) self.log_routing_key_prefix=log_routing_key_prefix self._calibration = calibration # keep a reference to the on_set (possibly decorated in a subclass), needed for changing *_on_set configurations self.__initial_on_set = self.on_set self._get_on_set = None self._log_on_set = None self.get_on_set = get_on_set self.log_on_set = log_on_set self.log_interval = log_interval self._log_action_id = None @property def get_on_set(self): return self._get_on_set @get_on_set.setter def get_on_set(self, value): if value: self.on_set = _get_on_set_decoration(self, self.__initial_on_set) if self.log_on_set: self.on_set = _log_on_set_decoration(self.on_set) else: if self.log_on_set: raise ValueError("unable to disable get_on_set while log_on_set is enabled") self.on_set = self.__initial_on_set self._get_on_set = bool(value) @property def log_on_set(self): return self._log_on_set @log_on_set.setter def log_on_set(self, value): if value: if not self.get_on_set: raise ValueError("unable to enable log_on_set when get_on_set is disabled") self.on_set = _log_on_set_decoration(self, self.on_set) else: self.on_set = self.__initial_on_set if self.get_on_set: self.on_set = _get_on_set_decoration(self, self.on_set) self._log_on_set = bool(value) @property def log_interval(self): return self._log_interval @log_interval.setter def log_interval(self, new_interval): if isinstance(new_interval, numbers.Number): self._log_interval = datetime.timedelta(seconds=new_interval) elif isinstance(new_interval, dict): self._log_interval = datetime.timedelta(**new_interval) elif isinstance(new_interval, datetime.timedelta): self._log_interval = new_interval else: raise ValueError(f"unable to interpret a new_interval of type <{type(new_interval)}>")
[docs] def scheduled_log(self): logger.debug("in a scheduled log event") result = self.on_get() self.log_a_value(result)
[docs] def log_a_value(self, the_value): logger.debug(f"value to log is:\n{the_value}") the_alert = MsgAlert.create(payload=scarab.to_param(the_value), routing_key=f'{self.log_routing_key_prefix}.{self.name}') alert_sent = self.service.send(the_alert)
[docs] def start_logging(self): if self._log_action_id is not None: self.service.unschedule(self._log_action_id) if self.log_interval: logger.info(f'should start logging every {self.log_interval}') self._log_action_id = self.service.schedule(self.scheduled_log, self.log_interval, datetime.datetime.now() + self.service.execution_buffer*3) else: raise ValueError('unable to start logging when log_interval evaluates false') logger.debug(f'log action id is {self._log_action_id}')
[docs] def stop_logging(self): #TODO: should it be an error to stop_logging() when already not logging? if self._log_action_id is not None: self.service.unschedule(self._log_action_id) self._log_action_id = None