Source code for mmstats.fields

import array
import ctypes
import math
import time
import warnings

from . import defaults

# >=2.7 ignores DeprecationWarning by default, mimic that behavior here
warnings.filterwarnings('ignore', category=DeprecationWarning)

[docs]class DuplicateFieldName(Exception): """Cannot add 2 fields with the same name to MmStat instances"""
def _create_struct(label, type_, type_signature, buffers=None): """Helper to wrap dynamic Structure subclass creation""" if isinstance(label, unicode): label = label.encode('utf8') fields = [ ('label_sz', defaults.SIZE_TYPE), ('label', ctypes.c_char * len(label)), ('type_sig_sz', defaults.SIZE_TYPE), ('type_signature', ctypes.c_char * len(type_signature)), ('write_buffer', ctypes.c_ubyte), ] if buffers is None: fields.append(('value', type_)) else: fields.append(('buffers', (type_ * buffers))) return type("%sStruct" % label.title(), (ctypes.Structure,), {'_fields_': fields, '_pack_': 1} ) class Field(object): initial = 0 def __init__(self, label=None): self._struct = None # initialized in _init if label: self.label = label else: self.label = None def _new(self, state, label_prefix, attrname, buffers=None): """Creates new data structure for field in state instance""" # Key is used to reference field state on the parent instance self.key = attrname # Label defaults to attribute name if no label specified if self.label is None: state.label = label_prefix + attrname else: state.label = label_prefix + self.label state._StructCls = _create_struct( state.label, self.buffer_type, self.type_signature, buffers) state.size = ctypes.sizeof(state._StructCls) return state.size def _init(self, state, mm_ptr, offset): """Initializes value of field's data structure""" state._struct = state._StructCls.from_address(mm_ptr + offset) state._struct.label_sz = len(state.label) state._struct.label = state.label state._struct.type_sig_sz = len(self.type_signature) state._struct.type_signature = self.type_signature state._struct.write_buffer = defaults.WRITE_BUFFER_UNUSED state._struct.value = self.initial return offset + ctypes.sizeof(state._StructCls) @property def type_signature(self): return self.buffer_type._type_ def __repr__(self): return '%s(label=%r)' % (self.__class__.__name__, self.label)
[docs]class NonDataDescriptorMixin(object): """Mixin to add single buffered __get__ method""" def __get__(self, inst, owner): if inst is None: return self return inst._fields[self.key]._struct.value
[docs]class DataDescriptorMixin(object): """Mixin to add single buffered __set__ method""" def __set__(self, inst, value): inst._fields[self.key]._struct.value = value
[docs]class BufferedDescriptorMixin(object): """\ Mixin to add double buffered descriptor methods Always read/write as double buffering doesn't make sense for readonly fields """ def __get__(self, inst, owner): if inst is None: return self state = inst._fields[self.key] # Get from the read buffer return state._struct.buffers[state._struct.write_buffer ^ 1] def __set__(self, inst, value): state = inst._fields[self.key] # Set the write buffer state._struct.buffers[state._struct.write_buffer] = value # Swap the write buffer state._struct.write_buffer ^= 1
class ReadOnlyField(Field, NonDataDescriptorMixin): def __init__(self, label=None, value=None): super(ReadOnlyField, self).__init__(label=label) self.value = value def _init(self, state, mm, offset): if self.value is None: # Value can't be None raise ValueError("value must be set") elif callable(self.value): # If value is a callable, resolve it now during initialization self.value = self.value() # Call super to do standard initialization new_offset = super(ReadOnlyField, self)._init(state, mm, offset) # Set the static field now state._struct.value = self.value # And return the offset as usual return new_offset
[docs]class ReadWriteField(Field, NonDataDescriptorMixin, DataDescriptorMixin): """Base class for simple writable fields"""
[docs]class DoubleBufferedField(Field): """Base class for double buffered writable fields""" def _new(self, state, label_prefix, attrname): return super(DoubleBufferedField, self)._new( state, label_prefix, attrname, buffers=2) def _init(self, state, mm_ptr, offset): state._struct = state._StructCls.from_address(mm_ptr + offset) state._struct.label_sz = len(state.label) state._struct.label = state.label state._struct.type_sig_sz = len(self.type_signature) state._struct.type_signature = self.type_signature state._struct.write_buffer = 0 state._struct.buffers = 0, 0 return offset + ctypes.sizeof(state._StructCls)
[docs]class ComplexDoubleBufferedField(DoubleBufferedField): """Base Class for fields with complex internal state like Counters Set InternalClass in your subclass """ InternalClass = None def _init(self, state, mm_ptr, offset): offset = super(ComplexDoubleBufferedField, self)._init( state, mm_ptr, offset) self._init_internal(state) return offset def _init_internal(self, state): if self.InternalClass is None: raise NotImplementedError( "Must set %s.InternalClass" % type(self).__name__) state.internal = self.InternalClass(state) def __get__(self, inst, owner): if inst is None: return self return inst._fields[self.key].internal
class _InternalFieldInterface(object): """Base class used by internal field interfaces like counter""" def __init__(self, state): self._struct = state._struct @property def value(self): return self._struct.buffers[self._struct.write_buffer ^ 1] @value.setter def value(self, v): self._set(v) def _set(self, v): # Set the write buffer self._struct.buffers[self._struct.write_buffer] = v # Swap the write buffer self._struct.write_buffer ^= 1
[docs]class CounterField(ComplexDoubleBufferedField): """Counter field supporting an inc() method and value attribute""" buffer_type = ctypes.c_uint64 type_signature = 'Q' class InternalClass(_InternalFieldInterface): """Internal counter class used by CounterFields""" def inc(self, n=1): warnings.warn( "inc(n=...) is deprecated. Use incr(amount=...)", DeprecationWarning ) self.incr(n) def incr(self, amount=1): """Increment Counter by `amount` (defaults to 1)""" self._set(self.value + amount)
[docs]class AverageField(ComplexDoubleBufferedField): """Average field supporting an add() method and value attribute""" buffer_type = ctypes.c_double class InternalClass(_InternalFieldInterface): """Internal mean class used by AverageFields""" def __init__(self, state): _InternalFieldInterface.__init__(self, state) # To recalculate the mean we need to store the overall count self._count = 0 # Keep the overall total internally self._total = 0.0 def add(self, value): """Add a new value to the average""" self._count += 1 self._total += value self._set(self._total / self._count)
class _MovingAverageInternal(_InternalFieldInterface): def __init__(self, state): _InternalFieldInterface.__init__(self, state) self._max = state.field.size self._window = array.array('d', [0.0] * self._max) self._idx = 0 self._full = False def add(self, value): """Add a new value to the moving average""" self._window[self._idx] = value if self._full: self._set(math.fsum(self._window) / self._max) else: # Window isn't full, divide by current index self._set(math.fsum(self._window) / (self._idx + 1)) if self._idx == (self._max - 1): # Reset idx self._idx = 0 self._full = True else: self._idx += 1 class MovingAverageField(ComplexDoubleBufferedField): buffer_type = ctypes.c_double InternalClass = _MovingAverageInternal def __init__(self, size=100, **kwargs): super(MovingAverageField, self).__init__(**kwargs) self.size = size class _TimerContext(object): """Class to wrap timer state""" def __init__(self, timer=time.time): self._timer = timer self.start = timer() self.end = None def get_time(self): return self._timer() @property def done(self): """True if timer context has stopped""" return self.end is not None @property def elapsed(self): """Returns time elapsed in context""" if self.done: return self.end - self.start else: return self.get_time() - self.start def stop(self): self.end = self.get_time()
[docs]class TimerField(MovingAverageField): """Moving average field that provides a context manager for easy timings As a context manager: >>> class T(MmStats): ... timer = TimerField() >>> t = T() >>> with t.timer as ctx: ... assert ctx.elapsed > 0.0 >>> assert t.timer.value > 0.0 >>> assert t.timer.last > 0.0 """ def __init__(self, timer=time.time, **kwargs): super(TimerField, self).__init__(**kwargs) self.timer = timer class InternalClass(_MovingAverageInternal): def __init__(self, state): _MovingAverageInternal.__init__(self, state) self._ctx = None self.timer = state.field.timer def start(self): """Start the timer""" self._ctx = _TimerContext(self.timer) def stop(self): """Stop the timer""" self._ctx.stop() self.add(self._ctx.elapsed) def __enter__(self): self.start() return self._ctx def __exit__(self, exc_type, exc_value, exc_tb): self.stop() @property def last(self): """Get the last recorded value""" if self._ctx is None: return 0.0 else: return self._ctx.elapsed
[docs]class BufferedDescriptorField(DoubleBufferedField, BufferedDescriptorMixin): """Base class for double buffered descriptor fields"""
[docs]class UInt64Field(BufferedDescriptorField): """Unbuffered read-only 64bit Unsigned Integer field""" buffer_type = ctypes.c_uint64 type_signature = 'Q'
[docs]class UIntField(BufferedDescriptorField): """32bit Double Buffered Unsigned Integer field""" buffer_type = ctypes.c_uint32 type_signature = 'I'
[docs]class IntField(BufferedDescriptorField): """32bit Double Buffered Signed Integer field""" buffer_type = ctypes.c_int32 type_signature = 'i'
[docs]class ShortField(BufferedDescriptorField): """16bit Double Buffered Signed Integer field""" buffer_type = ctypes.c_int16
[docs]class UShortField(BufferedDescriptorField): """16bit Double Buffered Unsigned Integer field""" buffer_type = ctypes.c_uint16
[docs]class ByteField(ReadWriteField): """8bit Signed Integer Field""" buffer_type = ctypes.c_byte
[docs]class FloatField(BufferedDescriptorField): """32bit Float Field""" buffer_type = ctypes.c_float
[docs]class StaticFloatField(ReadOnlyField): """Unbuffered read-only 32bit Float field""" buffer_type = ctypes.c_float
[docs]class DoubleField(BufferedDescriptorField): """64bit Double Precision Float Field""" buffer_type = ctypes.c_double
[docs]class StaticDoubleField(ReadOnlyField): """Unbuffered read-only 64bit Float field""" buffer_type = ctypes.c_double
[docs]class BoolField(ReadWriteField): """Boolean Field""" # Avoid potential ambiguity and marshal bools to 0/1 manually buffer_type = ctypes.c_byte type_signature = '?' def __init__(self, initial=False, **kwargs): self.initial = initial super(BoolField, self).__init__(**kwargs) def __get__(self, inst, owner): if inst is None: return self return inst._fields[self.key]._struct.value == 1 def __set__(self, inst, value): inst._fields[self.key]._struct.value = 1 if value else 0
[docs]class StringField(ReadWriteField): """UTF-8 String Field""" initial = '' def __init__(self, size=defaults.DEFAULT_STRING_SIZE, **kwargs): self.size = size self.buffer_type = ctypes.c_char * size super(StringField, self).__init__(**kwargs) @property def type_signature(self): return '%ds' % self.size def __get__(self, inst, owner): if inst is None: return self return inst._fields[self.key]._struct.value.decode('utf8') def __set__(self, inst, value): if isinstance(value, unicode): value = value.encode('utf8') if len(value) > self.size: # Round trip utf8 trimmed strings to make sure it's stores # valid utf8 bytes value = value[:self.size] value = value.decode('utf8', 'ignore').encode('utf8') elif len(value) > self.size: value = value[:self.size] inst._fields[self.key]._struct.value = value
[docs]class StaticUIntField(ReadOnlyField): """Unbuffered read-only 32bit Unsigned Integer field""" buffer_type = ctypes.c_uint32 type_signature = 'I'
[docs]class StaticInt64Field(ReadOnlyField): """Unbuffered read-only 64bit Signed Integer field""" buffer_type = ctypes.c_int64 type_signature = 'q'
[docs]class StaticUInt64Field(ReadOnlyField): """Unbuffered read-only 64bit Unsigned Integer field""" buffer_type = ctypes.c_uint64 type_signature = 'Q'
[docs]class StaticTextField(ReadOnlyField): """Unbuffered read-only UTF-8 encoded String field""" initial = '' buffer_type = ctypes.c_char * 256 type_signature = '256s'

Project Versions