Source code for decocare.session
import logging
import time
log = logging.getLogger( ).getChild(__name__)
import commands
import lib
from errors import StickError, AckError, BadDeviceCommError
[docs]class Session(object):
def __init__(self, stick):
self.stick = stick
self.should_download = True
[docs] def init(self, skip_power_control=False):
stick = self.stick
log.info('test fetching product info %s' % stick)
log.info(pformat(stick.product_info( )))
log.info('get signal strength of %s' % stick)
signal = 0
while signal < 50:
signal = stick.signal_strength( )
log.info('we seem to have found a nice signal strength of: %s' % signal)
[docs] def end(self):
stick = self.stick
log.info(pformat(stick.usb_stats( )))
log.info(pformat(stick.radio_stats( )))
[docs] def execute(self, command):
self.command = command
for i in xrange(max(1, self.command.retries)):
log.info('execute attempt: %s' % (i + 1))
try:
self.expectedLength = self.command.bytesPerRecord * self.command.maxRecords
self.transfer( )
if self.should_download:
log.info('sleeping %s before download' % command.effectTime)
time.sleep(command.effectTime)
self.download( )
log.info('finished executing:%s' % command)
if command.done( ):
return command
except BadDeviceCommError, e:
log.critical("ERROR: %s" % e)
# self.clearBuffers( )
log.critical('this seems like a problem')
[docs] def download(self):
errors = [ ]
if self.expectedLength > 0:
log.info('proceeding with download')
data = self.stick.download( )
#self.command.data = data
self.command.respond(data)
return data
# remove
for i in xrange(3):
pass
try:
pass
except AckError, e:
time.sleep(.010)
log.error(e)
errors.append(e)
else:
log.info('no download required')
assert not errors, ("with errors:%s" %"\n".join( map(str, errors) ))
[docs] def transfer(self):
log.info('session transferring packet')
return self.stick.transmit_packet(self.command)
[docs]class Pump(Session):
def __init__(self, stick, serial='208850'):
super(type(self), self).__init__(stick)
self.serial = serial
log.info('setting up to talk with %s' % serial)
[docs] def power_control(self, minutes=None):
log.info('BEGIN POWER CONTROL')
print "PowerControl SERIAL", self.serial
response = self.query(commands.PowerControl, minutes=minutes)
power = self.command
log.info('manually download PowerControl serial %s' % self.serial)
data = self.stick.download( )
log.info("ENDING manual download:\n%s" % lib.hexdump(data))
return data
[docs] def read_model(self):
model = self.query(commands.ReadPumpModel)
self.model = model
return model
[docs] def read_history_0(self):
log.info("read HISTORY DATA")
comm = commands.ReadHistoryData(serial=self.serial, page=0)
self.execute(comm)
log.info('comm:READ history data page!!!:\n%s' % (lib.hexdump(comm.getData( ))))
[docs] def execute(self, command):
command.serial = self.serial
return super(type(self), self).execute(command)
[docs] def query(self, Command, **kwds):
command = Command(serial=self.serial, **kwds)
self.execute(command)
return command
if __name__ == '__main__':
import doctest
doctest.testmod( )
import sys
port = None
port = sys.argv[1:] and sys.argv[1] or False
serial_num = sys.argv[2:] and sys.argv[2] or False
if not port or not serial_num:
print "usage:\n%s <port> <serial>, eg /dev/ttyUSB0 208850" % sys.argv[0]
sys.exit(1)
import link
import stick
from pprint import pformat
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
log.info("howdy! I'm going to take a look at your pump.")
stick = stick.Stick(link.Link(port, timeout=.200))
stick.open( )
session = Pump(stick, serial_num)
log.info(pformat(stick.interface_stats( )))
session.power_control( )
log.info(pformat(stick.interface_stats( )))
log.info('PUMP MODEL: %s' % session.read_model( ))
log.info(pformat(stick.interface_stats( )))
log.info('PUMP READ PAGE 0: %s' % session.read_history_0( ))
log.info(pformat(stick.interface_stats( )))
log.info("howdy! all done looking at pump")
# stick.open( )
#####
# EOF