summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--glucometerutils/drivers/sdcodefree.py273
1 files changed, 138 insertions, 135 deletions
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py
index e7ce753..c89d894 100644
--- a/glucometerutils/drivers/sdcodefree.py
+++ b/glucometerutils/drivers/sdcodefree.py
@@ -3,7 +3,7 @@
__author__ = 'Diego Elio Pettenò'
__email__ = 'flameeyes@flameeyes.eu'
-__copyright__ = 'Copyright © 2016, Diego Elio Pettenò'
+__copyright__ = 'Copyright © 2017, Diego Elio Pettenò'
__license__ = 'MIT'
import array
@@ -19,8 +19,8 @@ import serial
from glucometerutils import common
from glucometerutils import exceptions
-_STX = 0x53 # Not really 'STX'
-_ETX = 0xAA # Not really 'ETX'
+_STX = 0x53 # Not really 'STX'
+_ETX = 0xAA # Not really 'ETX'
_DIR_IN = 0x20
_DIR_OUT = 0x10
@@ -46,145 +46,148 @@ _STRUCT_READINGS_COUNT = struct.Struct('>H')
_FETCH_PACKET = b'\x10\x60'
_ReadingRecord = collections.namedtuple(
- '_ReadingRecord',
- ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute',
- 'value', 'meal_flag'))
+ '_ReadingRecord',
+ ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute',
+ 'value', 'meal_flag'))
_STRUCT_READING = struct.Struct('>BBBBBBBHB')
_MEAL_FLAG = {
- 0x00: common.NO_MEAL,
- 0x10: common.BEFORE_MEAL,
- 0x20: common.AFTER_MEAL
+ 0x00: common.NO_MEAL,
+ 0x10: common.BEFORE_MEAL,
+ 0x20: common.AFTER_MEAL
}
def parse_reading(msgdata):
- return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata))
+ return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata))
def xor_checksum(msg):
- return functools.reduce(operator.xor, msg)
+ return functools.reduce(operator.xor, msg)
class Device(object):
- def __init__(self, device):
- self.serial_ = serial.Serial(
- port=device, baudrate=38400, bytesize=serial.EIGHTBITS,
- parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
- timeout=300, xonxoff=False, rtscts=False, dsrdtr=False,
- writeTimeout=None)
-
- def read_packet(self):
- preamble = self.serial_.read(3)
- if len(preamble) != 3:
- raise exceptione.InvalidResponse(
- response='Expected 3 bytes, received %d' % len(preamble))
- if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE:
- raise exceptions.InvalidResponse(
- response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH])
-
- msglen = preamble[_IDX_LENGTH]
- message = self.serial_.read(msglen)
- if len(message) != msglen:
- raise exception.InvalidResponse(
- response='Expected %d bytes, received %d' % (msglen, len(message)))
- if message[_IDX_ETX] != _ETX:
- raise exception.InvalidResponse(
- response='Unexpected end-of-transmission byte: %02x' % message[_IDX_ETX])
-
- # Calculate the checksum up until before the checksum itself.
- msgdata = message[:_IDX_CHECKSUM]
-
- cksum = xor_checksum(msgdata)
- if cksum != message[_IDX_CHECKSUM]:
- raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum)
-
- return msgdata
-
- def wait_and_ready(self):
- challenge = self.serial_.read(1)
-
- # The first packet read may have a prefixed zero, it might be a bug in the
- # cp210x driver or device, but discard it if found.
- if challenge == b'\0':
- challege = self.serial_.read(1)
- if challenge != b'\x53':
- raise exceptions.ConnectionFailed(
- message='Unexpected starting bytes %r' % challenge)
-
- challenge += self.serial_.read(6)
-
- if challenge != _CHALLENGE_PACKET_FULL:
- raise exceptions.ConnectionFailed(
- message='Unexpected challenge %r' % challenge)
-
- self.send_packet(_RESPONSE_PACKET)
-
- # The first packet only contains the counter of how many readings are
- # available.
- first_packet = self.read_packet()
-
- count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1)
-
- return count[0]
-
- def send_packet(self, msgdata):
- packet = array.array('B')
- packet.extend((_STX, _DIR_OUT, len(msgdata)+2))
- packet.extend(msgdata)
- packet.extend((xor_checksum(msgdata), _ETX))
- self.serial_.write(packet.tobytes())
-
- def connect(self):
- print("Please connect and turn on the device.")
-
- def disconnect(self):
- self.send_packet(_DISCONNECT_PACKET)
- response = self.read_packet()
- if response != _DISCONNECTED_PACKET:
- raise exceptions.InvalidResponse(response=response)
-
- def get_meter_info(self):
- return common.MeterInfo('SD CodeFree glucometer')
-
- def get_version(self):
- raise NotImplementedError
-
- def get_serial_number(self):
- raise NotImplementedError
-
- def get_glucose_unit(self):
- # Device does not provide information on glucose unit.
- return common.UNIT_MGDL
-
- def get_datetime(self):
- raise NotImplementedError
-
- def set_datetime(self, date=datetime.datetime.now()):
- setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')
-
- # Ignore the readings count.
- self.wait_and_ready()
-
- self.send_packet(setdatecmd)
- response = self.read_packet()
- if response != _DATE_SET_PACKET:
- raise exceptions.InvalidResponse(response=response)
-
- # The date we return should only include up to minute, unfortunately.
- return datetime.datetime(date.year, date.month, date.day,
- date.hour, date.minute)
-
- def zero_log(self):
- raise NotmplementedError
-
- def get_readings(self):
- count = self.wait_and_ready()
-
- for _ in range(count):
- self.send_packet(_FETCH_PACKET)
- rpkt = self.read_packet()
-
- r = parse_reading(rpkt)
- meal = _MEAL_FLAG[r.meal_flag]
-
- yield common.Reading(
- datetime.datetime(2000 + r.year, r.month, r.day, r.hour, r.minute),
- r.value, meal=meal)
+ def __init__(self, device):
+ self.serial_ = serial.Serial(
+ port=device, baudrate=38400, bytesize=serial.EIGHTBITS,
+ parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
+ timeout=300, xonxoff=False, rtscts=False, dsrdtr=False,
+ writeTimeout=None)
+
+ def read_packet(self):
+ preamble = self.serial_.read(3)
+ if len(preamble) != 3:
+ raise exceptione.InvalidResponse(
+ response='Expected 3 bytes, received %d' % len(preamble))
+ if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE:
+ raise exceptions.InvalidResponse(
+ response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH])
+
+ msglen = preamble[_IDX_LENGTH]
+ message = self.serial_.read(msglen)
+ if len(message) != msglen:
+ raise exception.InvalidResponse(
+ response='Expected %d bytes, received %d' %
+ (msglen, len(message)))
+ if message[_IDX_ETX] != _ETX:
+ raise exception.InvalidResponse(
+ response='Unexpected end-of-transmission byte: %02x' %
+ message[_IDX_ETX])
+
+ # Calculate the checksum up until before the checksum itself.
+ msgdata = message[:_IDX_CHECKSUM]
+
+ cksum = xor_checksum(msgdata)
+ if cksum != message[_IDX_CHECKSUM]:
+ raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum)
+
+ return msgdata
+
+ def wait_and_ready(self):
+ challenge = self.serial_.read(1)
+
+ # The first packet read may have a prefixed zero, it might be a bug in
+ # the cp210x driver or device, but discard it if found.
+ if challenge == b'\0':
+ challege = self.serial_.read(1)
+ if challenge != b'\x53':
+ raise exceptions.ConnectionFailed(
+ message='Unexpected starting bytes %r' % challenge)
+
+ challenge += self.serial_.read(6)
+
+ if challenge != _CHALLENGE_PACKET_FULL:
+ raise exceptions.ConnectionFailed(
+ message='Unexpected challenge %r' % challenge)
+
+ self.send_packet(_RESPONSE_PACKET)
+
+ # The first packet only contains the counter of how many readings are
+ # available.
+ first_packet = self.read_packet()
+
+ count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1)
+
+ return count[0]
+
+ def send_packet(self, msgdata):
+ packet = array.array('B')
+ packet.extend((_STX, _DIR_OUT, len(msgdata)+2))
+ packet.extend(msgdata)
+ packet.extend((xor_checksum(msgdata), _ETX))
+ self.serial_.write(packet.tobytes())
+
+ def connect(self):
+ print("Please connect and turn on the device.")
+
+ def disconnect(self):
+ self.send_packet(_DISCONNECT_PACKET)
+ response = self.read_packet()
+ if response != _DISCONNECTED_PACKET:
+ raise exceptions.InvalidResponse(response=response)
+
+ def get_meter_info(self):
+ return common.MeterInfo('SD CodeFree glucometer')
+
+ def get_version(self):
+ raise NotImplementedError
+
+ def get_serial_number(self):
+ raise NotImplementedError
+
+ def get_glucose_unit(self):
+ # Device does not provide information on glucose unit.
+ return common.UNIT_MGDL
+
+ def get_datetime(self):
+ raise NotImplementedError
+
+ def set_datetime(self, date=datetime.datetime.now()):
+ setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')
+
+ # Ignore the readings count.
+ self.wait_and_ready()
+
+ self.send_packet(setdatecmd)
+ response = self.read_packet()
+ if response != _DATE_SET_PACKET:
+ raise exceptions.InvalidResponse(response=response)
+
+ # The date we return should only include up to minute, unfortunately.
+ return datetime.datetime(date.year, date.month, date.day,
+ date.hour, date.minute)
+
+ def zero_log(self):
+ raise NotmplementedError
+
+ def get_readings(self):
+ count = self.wait_and_ready()
+
+ for _ in range(count):
+ self.send_packet(_FETCH_PACKET)
+ rpkt = self.read_packet()
+
+ r = parse_reading(rpkt)
+ meal = _MEAL_FLAG[r.meal_flag]
+
+ yield common.Reading(
+ datetime.datetime(
+ 2000 + r.year, r.month, r.day, r.hour, r.minute),
+ r.value, meal=meal)