Source code for decocare.history

#!/usr/bin/python
"""
This module provides some basic helper/formatting utilities,
specifically targeted at decoding ReadHistoryData data.

"""
import io
from binascii import hexlify

import lib
from records import *

_remote_ids = [
  bytearray([ 0x01, 0xe2, 0x40 ]),
  bytearray([ 0x03, 0x42, 0x2a ]),
  bytearray([ 0x0c, 0x89, 0x92 ]),
]

[docs]def decode_remote_id(msg): """ practice decoding some remote ids: | 0x27 | 0x01 0xe2 0x40 | 0x03 0x42 0x2a | 0x28 0x0c 0x89 | 0x92 0x00 0x00 0x00 >>> decode_remote_id(_remote_ids[0]) '123456' >>> decode_remote_id(_remote_ids[1]) '213546' >>> decode_remote_id(_remote_ids[2]) '821650' """ high = msg[ 0 ] * 256 * 256 middle = msg[ 1 ] * 256 low = msg[ 2 ] return str(high + middle + low)
[docs]class NoDelivery(KnownRecord): opcode = 0x06 head_length = 4 #class ResultTotals(KnownRecord):
[docs]class MResultTotals(InvalidRecord): """On 722 this seems like two records.""" opcode = 0x07 #head_length = 5 head_length = 5 date_length = 2 #body_length = 37 + 4 #body_length = 2 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if larger: self.body_length = 3
[docs] def parse_time(self): mid = unmask_m_midnight(self.date) try: self.datetime = date = datetime(*mid) return date except ValueError, e: print "ERROR", e, mid, lib.hexdump(self.date) pass return mid
[docs] def date_str(self): result = 'unknown' if self.datetime is not None: result = self.datetime.isoformat( ) else: if len(self.date) >=2: result = "{}".format(unmask_m_midnight(self.date)) return result
[docs]class ChangeBasalProfile_old_profile (KnownRecord): opcode = 0x08 # old/bad # body_length = 46 # XXX: on LeDanaScott's, 522, this seems right body_length = 145 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if larger: self.body_length = 145
[docs] def decode (self): self.parse_time( ) rates = [ ] i = 0 for x in range(47): start = x * 3 end = start + 3 (offset, rate, q) = self.body[start:end] if [offset, rate, q] == [ 0x00, 0x00, 0x00]: break try: rates.append(describe_rate(offset, rate, q)) except TypeError, e: remainder = [ offset, rate, q ] rates.append(remainder) return rates
[docs]def describe_rate (offset, rate, q): return (dict(offset=(30*1000*60)*offset, rate=rate*0.025))
[docs]class ChangeBasalProfile_new_profile (KnownRecord): opcode = 0x09 body_length = 145
[docs] def decode (self): self.parse_time( ) rates = [ ] i = 0 for x in range(47): start = x * 3 end = start + 3 (offset, rate, q) = self.body[start:end] if [offset, rate, q] == [ 0x00, 0x00, 0x00]: break rates.append(describe_rate(offset, rate, q)) return rates
[docs]class ClearAlarm(KnownRecord): opcode = 0x0C
[docs]class SelectBasalProfile(KnownRecord): opcode = 0x14
[docs]class ChangeTime(KnownRecord): opcode = 0x17
[docs]class NewTimeSet(KnownRecord): opcode = 0x18
[docs]class LowBattery(KnownRecord): opcode = 0x19
[docs]class Battery(KnownRecord): opcode = 0x1a
[docs]class PumpSuspend(KnownRecord): opcode = 0x1e
[docs]class PumpResume(KnownRecord): opcode = 0x1f
[docs]class Rewind(KnownRecord): opcode = 0x21
[docs]class EnableDisableRemote(KnownRecord): opcode = 0x26 body_length = 14
[docs]class ChangeRemoteID(KnownRecord): opcode = 0x27
[docs]class TempBasalDuration(KnownRecord): opcode = 0x16 _test_1 = bytearray([ ])
[docs] def decode(self): self.parse_time( ) basal = { 'duration (min)': self.head[1] * 30, } return basal
[docs]class TempBasal(KnownRecord): opcode = 0x33 body_length = 1 _test_1 = bytearray([ ])
[docs] def decode(self): self.parse_time( ) basal = { 'rate': self.head[1] / 40.0, } return basal
[docs]class LowReservoir(KnownRecord): """ >>> rec = LowReservoir( LowReservoir._test_1[:2] ) >>> decoded = rec.parse(LowReservoir._test_1) >>> print str(rec) LowReservoir 2012-12-07T11:02:43 head[2], body[0] op[0x34] >>> print pformat(decoded) {'amount': 20.0} """ opcode = 0x34 _test_1 = bytearray([ 0x34, 0xc8, 0xeb, 0x02, 0x0b, 0x07, 0x0c, ])
[docs] def decode(self): self.parse_time( ) reservoir = {'amount' : int(self.head[1]) / 10.0 } return reservoir
[docs]class ChangeUtility(KnownRecord): opcode = 0x63
[docs]class ChangeTimeDisplay(KnownRecord): opcode = 0x64
_confirmed = [ Bolus, Prime, NoDelivery, MResultTotals, ChangeBasalProfile_old_profile, ChangeBasalProfile_new_profile, ClearAlarm, SelectBasalProfile, TempBasalDuration, ChangeTime, NewTimeSet, LowBattery, Battery, PumpSuspend, PumpResume, CalBGForPH, Rewind, EnableDisableRemote, ChangeRemoteID, TempBasal, LowReservoir, BolusWizard, UnabsorbedInsulinBolus, ChangeUtility, ChangeTimeDisplay ] # _confirmed.append(DanaScott0x09)
[docs]class Ian69(KnownRecord): opcode = 0x69 body_length = 2
_confirmed.append(Ian69)
[docs]class Ian50(KnownRecord): opcode = 0x50 body_length = 34
_confirmed.append(Ian50)
[docs]class Ian54(KnownRecord): opcode = 0x54 body_length = 34 + 23 body_length = 57
_confirmed.append(Ian54)
[docs]class Ian0B(KnownRecord): opcode = 0x0B head_length = 3
_confirmed.append(Ian0B)
[docs]class Ian3F(KnownRecord): opcode = 0x3F body_length = 3
_confirmed.append(Ian3F)
[docs]class IanA8(KnownRecord): opcode = 0xA8 head_length = 10
_confirmed.append(IanA8)
[docs]class BasalProfileStart(KnownRecord): opcode = 0x7b body_length = 3
[docs] def decode (self): self.parse_time( ) if (len(self.body) % 3 == 0): return describe_rate(*self.body) else: return dict(raw=hexlify(self.body))
_confirmed.append(BasalProfileStart) # 123, 143
[docs]class OldBolusWizardChange (KnownRecord): opcode = 0x5a body_length = 117
_confirmed.append(OldBolusWizardChange)
[docs]class BigBolusWizardChange (KnownRecord): opcode = 0x5a body_length = 143
[docs]class old6c(InvalidRecord): opcode = 0x6c #head_length = 45 body_length = 38 # body_length = 34
_confirmed.append(old6c)
[docs]class Model522ResultTotals(KnownRecord): opcode = 0x6d head_length = 1 date_length = 2 body_length = 40
[docs] def parse_time(self): mid = unmask_m_midnight(self.date) try: self.datetime = date = datetime(*mid) return date except ValueError, e: print "ERROR", e, lib.hexdump(self.date) pass return mid
[docs] def date_str(self): result = 'unknown' if self.datetime is not None: result = self.datetime.isoformat( ) else: if len(self.date) >=2: result = "{}".format(unmask_m_midnight(self.date)) return result
[docs]def unmask_m_midnight(data): """ Extract date values from a series of bytes. Always returns tuple given a bytearray of at least 3 bytes. Returns 6-tuple of scalar values year, month, day, hours, minutes, seconds. """ data = data[:] seconds = 0 minutes = 0 hours = 0 day = parse_day(data[0]) high = data[0] >> 4 low = data[0] & 0x1F year_high = data[1] >> 4 # month = int(high) #+ year_high # month = parse_months( data[0], data[1] ) mhigh = (data[0] & 0xE0) >> 4 mlow = (data[1] & 0x80) >> 7 month = int(mhigh + mlow) day = int(low) + 1 year = parse_years(data[1]) return (year, month, day, hours, minutes, seconds)
_confirmed.append(Model522ResultTotals)
[docs]class Sara6E(Model522ResultTotals): """Seems specific to 722?""" opcode = 0x6e #head_length = 52 - 5 # body_length = 1 body_length = 48 #body_length = 0 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if larger: self.body_length = 48
_confirmed.append(Sara6E) _known = { } _variant = { } for x in _confirmed: _known[x.opcode] = x del x
[docs]def suggest(head, larger=False): """ Look in the known table of commands to find a suitable record type for this opcode. """ klass = _known.get(head[0], Base) record = klass(head, larger) return record
[docs]def parse_record(fd, head=bytearray( ), larger=False): """ Given a file-like object, and the head of a record, parse the rest of the record. Look up the type of record, read in just enough data to parse it, return the result. """ # head = bytearray(fd.read(2)) date = bytearray( ) body = bytearray( ) record = suggest(head, larger) remaining = record.head_length - len(head) if remaining > 0: head.extend(bytearray(fd.read(remaining))) if record.date_length > 0: date.extend(bytearray(fd.read(record.date_length))) if record.body_length > 0: body.extend(bytearray(fd.read(record.body_length))) record.parse( head + date + body ) # print str(record) # print record.pformat(prefix=str(record) ) return record
[docs]def describe( ): keys = _known.keys( ) out = [ ] for k in keys: out.append(_known[k].describe( )) return out
[docs]class PagedData (object): """ PagedData - context for parsing a page of cgm data. """ def __init__ (self, raw): data, crc = raw[0:1022], raw[1022:] computed = lib.CRC16CCITT.compute(bytearray(data)) if lib.BangInt(crc) != computed: assert lib.BangInt(crc) == computed, "CRC does not match page data" self.raw = raw self.clean(data)
[docs] def clean (self, data): data.reverse( ) self.data = self.eat_nulls(data) self.stream = io.BufferedReader(io.BytesIO(self.data))
[docs] def eat_nulls (self, data): i = 0 while data[i] == 0x00: i = i+1 return data[i:]
[docs]class HistoryPage (PagedData):
[docs] def clean (self, data): data.reverse( ) self.data = self.eat_nulls(data) self.data.reverse( ) # XXX: under some circumstances, zero is the correct value and # eat_nulls actually eats valid data. This ugly hack restores two # nulls back ot the end. self.data.append(0x00) self.data.append(0x00) self.stream = io.BufferedReader(io.BytesIO(self.data))
[docs] def decode (self, larger=False): records = [ ] skipped = [ ] for B in iter(lambda: bytearray(self.stream.read(2)), bytearray("")): if B == bytearray( [ 0x00, 0x00 ] ): break record = parse_record(self.stream, B, larger=larger ) if record.datetime: rec = dict(timestamp=record.datetime.isoformat( ), date=lib.epochize(record.datetime), _type=str(record.__class__.__name__), _description=str(record)) data = record.decode( ) if data is not None: rec.update(data) if skipped: rec.update(appended=skipped) skipped = [ ] records.append(rec) else: rec = dict(_type=str(record.__class__.__name__), _description=str(record)) data = record.decode( ) if data is not None: rec.update(data=data) skipped.append(rec) return records
if __name__ == '__main__': import doctest doctest.testmod( ) ##### # EOF