diff options
-rw-r--r-- | README | 4 | ||||
-rw-r--r-- | glucometerutils/drivers/otultraeasy.py | 455 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | test/test_otultraeasy.py | 17 |
4 files changed, 201 insertions, 277 deletions
@@ -35,8 +35,8 @@ supported. | Manufacturer | Model Name | Driver | Dependencies | | --- | --- | --- | --- | | LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] | -| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] | -| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] | +| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] | +| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] | | LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] | | LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] | | Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ | diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 0538dac..579c07c 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -18,305 +18,246 @@ __email__ = 'flameeyes@flameeyes.eu' __copyright__ = 'Copyright © 2014-2017, Diego Elio Pettenò' __license__ = 'MIT' -import array +import binascii import datetime import logging -import re -import struct -import time + +import construct from glucometerutils import common -from glucometerutils import exceptions from glucometerutils.support import lifescan from glucometerutils.support import serial -_STX = 0x02 -_ETX = 0x03 - -_IDX_STX = 0 -_IDX_LENGTH = 1 -_IDX_CONTROL = 2 -_IDX_DATA = 3 -_IDX_ETX = -3 -_IDX_CHECKSUM = -2 - -_BIT_SENT_COUNTER = 0x01 -_BIT_EXPECT_RECEIVE = 0x02 -_BIT_ACK = 0x04 -_BIT_DISCONNECT = 0x08 -_BIT_MORE = 0x10 - -_READ_SERIAL_NUMBER = b'\x05\x0B\x02\x00\x00\x00\x00\x84\x6A\xE8\x73\x00' -_READ_VERSION = b'\x05\x0D\x02' -_READ_GLUCOSE_UNIT = b'\x05\x09\x02\x09\x00\x00\x00\x00' -_DELETE_RECORDS = b'\x05\x1A' -_READ_DATETIME = b'\x05\x20\x02\x00\x00\x00\x00' -_WRITE_DATETIME = b'\x05\x20\x01' -_READ_RECORD = b'\x05\x1F' _INVALID_RECORD = 501 -_STRUCT_TIMESTAMP = struct.Struct('<I') -_STRUCT_RECORDID = struct.Struct('<H') - - -class UnsetPacketError(LookupError): - pass - - -def _convert_timestamp(timestamp_bytes): - timestamp, = _STRUCT_TIMESTAMP.unpack(timestamp_bytes) - - return datetime.datetime.fromtimestamp(timestamp) - - -class _Packet(object): - _STRUCT = struct.Struct('<H') - - def __init__(self): - self.cmd = array.array('B') - - def read_from(self, serial): - self.cmd.extend(serial.read(3)) - - if self.cmd[_IDX_STX] != _STX: - raise lifescan.MalformedCommand( - 'at position %s expected %02x, received %02x' % ( - _IDX_STX, _STX, self.cmd[_IDX_STX])) - - # the length includes prelude and appendix, which are six bytes total. - if self.length > 6: - self.cmd.extend(serial.read(self.length - 6)) - - self.cmd.extend(serial.read(3)) - - if self.cmd[_IDX_ETX] != _ETX: - raise lifescan.MalformedCommand( - 'at position %s expected %02x, received %02x' % ( - _IDX_ETX, _ETX, self.cmd[_IDX_ETX])) - - def build_command(self, cmd_bytes): - self.cmd.append(_STX) - self.cmd.append(6 + len(cmd_bytes)) - self.cmd.append(0x00) # link control - self.cmd.extend(cmd_bytes) - self.cmd.extend([_ETX, 0x00, 0x00]) - - @property - def length(self): - if not self.cmd: - return None - - return self.cmd[_IDX_LENGTH] - - def __is_in_control(self, bitmask): - if not self.cmd: - return None - - return bool(self.cmd[_IDX_CONTROL] & bitmask) - - def __set_in_control(self, bitmask, value): - if not self.cmd: - return None - - if value: - self.cmd[_IDX_CONTROL] |= bitmask - else: - self.cmd[_IDX_CONTROL] &= (~bitmask) & 0xFF - - return value - - @property - def sent_counter(self): - return self.__is_in_control(_BIT_SENT_COUNTER) - - @sent_counter.setter - def sent_counter(self, value): - self.__set_in_control(_BIT_SENT_COUNTER, value) - - @property - def expect_receive(self): - return self.__is_in_control(_BIT_EXPECT_RECEIVE) - - @expect_receive.setter - def expect_receive(self, value): - self.__set_in_control(_BIT_EXPECT_RECEIVE, value) - - @property - def checksum(self): - return lifescan.crc_ccitt(self.cmd[:_IDX_CHECKSUM].tobytes()) - - @property - def acknowledge(self): - return self.__is_in_control(_BIT_ACK) - - @acknowledge.setter - def acknowledge(self, value): - self.__set_in_control(_BIT_ACK, value) - - @property - def disconnect(self): - return self.__is_in_control(_BIT_DISCONNECT) - - @disconnect.setter - def disconnect(self, value): - self.__set_in_control(_BIT_DISCONNECT, value) - - @property - def more(self): - return self.__is_in_control(_BIT_MORE) - - @more.setter - def more(self, value): - self.__set_in_control(_BIT_MORE, value) - - def validate_checksum(self): - expected_checksum = self.checksum - received_checksum = self._STRUCT.unpack(self.cmd[_IDX_CHECKSUM:])[0] - if received_checksum != expected_checksum: - raise exceptions.InvalidChecksum(expected_checksum, received_checksum) - - def update_checksum(self): - self._STRUCT.pack_into(self.cmd, _IDX_CHECKSUM, self.checksum) - - def tobytes(self): - return self.cmd.tobytes() - - @property - def data(self): - return self.cmd[_IDX_DATA:_IDX_ETX] - +_EPOCH = datetime.datetime.utcfromtimestamp(0) + +def datetime_to_timestamp(date): + delta = date - _EPOCH + return int(delta.total_seconds()) + + +_PACKET = construct.Struct( + construct.RawCopy( + construct.Embedded( + construct.Struct( + construct.Const(b'\x02'), # stx + 'length' / construct.Rebuild( + construct.Byte, lambda ctx: len(ctx.message) + 6), + construct.EmbeddedBitStruct( + construct.Padding(3), + 'more' / construct.Default(construct.Flag, False), + 'disconnect' / construct.Flag, + 'acknowledge' / construct.Flag, + 'expect_receive' / construct.Flag, + 'sequence_number' / construct.Flag, + ), + 'message' / construct.Bytes(length=lambda ctx: ctx.length - 6), + construct.Const(b'\x03'), # etx + ), + ), + ), + 'checksum' / construct.Checksum( + construct.Int16ul, lifescan.crc_ccitt, construct.this.data), +) + +_COMMAND_SUCCESS = construct.Const(b'\x05\x06') +_TIMESTAMP_ADAPTER = construct.ExprAdapter( + construct.Int32ul, + encoder=lambda obj, ctx: datetime_to_timestamp(obj), + decoder=lambda obj, ctx: datetime.datetime.fromtimestamp(obj)) + +_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') + +_VERSION_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'version' / construct.PascalString(construct.Byte, encoding='ascii'), +) + +_SERIAL_NUMBER_REQUEST = construct.Const( + b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00') + +_SERIAL_NUMBER_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'serial_number' / construct.GreedyString(encoding='ascii'), +) + +_DATETIME_REQUEST = construct.Struct( + construct.Const(b'\x05\x20'), # 0x20 is the datetime + 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), + 'timestamp' / construct.Default(_TIMESTAMP_ADAPTER, _EPOCH), +) + +_DATETIME_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / _TIMESTAMP_ADAPTER, +) + +_GLUCOSE_UNIT_REQUEST = construct.Const( + b'\x05\x09\x02\x09\x00\x00\x00\x00') + +_GLUCOSE_MAPPING = { + common.Unit.MG_DL: 0x00, + common.Unit.MMOL_L: 0x01, +} + +_GLUCOSE_UNIT_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'unit' / construct.SymmetricMapping( + construct.Byte, _GLUCOSE_MAPPING), + construct.Padding(3), +) + +_ZERO_LOG_REQUEST = construct.Const(b'\x05\x1A') + +_READING_COUNT_RESPONSE = construct.Struct( + construct.Const(b'\x05\x0f'), + 'count' / construct.Int16ul, +) + +_READ_RECORD_REQUEST = construct.Struct( + construct.Const(b'\x05\x1f'), + 'record_id' / construct.Int16ul, +) + +_READING_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / _TIMESTAMP_ADAPTER, + 'value' / construct.Int32ul, +) class Device(serial.SerialDevice): - BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. - - def __init__(self, device): - super(Device, self).__init__(device) - - self.sent_counter_ = False - self.expect_receive_ = False - - def connect(self): - self._send_command('', disconnect=True) - - def disconnect(self): - self.connect() - - def _read_response(self): - response = _Packet() - - response.read_from(self.serial_) - - if not response.disconnect and response.sent_counter != self.expect_receive_: - raise lifescan.MalformedCommand( - 'at position 2[0b] expected %02x, received %02x' % ( - self.expect_receive_, response.expect_receive)) + BAUDRATE = 9600 + DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + TIMEOUT = 0.5 - if not response.acknowledge: - self.expect_receive_ = not self.expect_receive_ + def __init__(self, device): + super(Device, self).__init__(device) - response.validate_checksum() + self.sent_counter_ = False + self.expect_receive_ = False + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - if not response.acknowledge: - self._send_command('', acknowledge=True) + def connect(self): + self._send_packet(b'', disconnect=True) + self._read_ack() - return response + def disconnect(self): + self.connect() - def _send_command(self, cmd_bytes, acknowledge=False, disconnect=False): - cmd = _Packet() + def _send_packet(self, message, acknowledge=False, disconnect=False): + pkt = _PACKET.build( + {'value': { + 'message': message, + 'sequence_number': self.sent_counter_, + 'expect_receive': self.expect_receive_, + 'acknowledge': acknowledge, + 'disconnect': disconnect, + }}) + logging.debug('sending packet: %s', binascii.hexlify(pkt)) - # set the proper expectations - cmd.build_command(cmd_bytes) - cmd.sent_counter = self.sent_counter_ - cmd.expect_receive = self.expect_receive_ - cmd.acknowledge = acknowledge - cmd.disconnect = disconnect + self.serial_.write(pkt) + self.serial_.flush() - cmd.update_checksum() + def _read_packet(self): + raw_pkt = self.buffered_reader_.parse_stream(self.serial_) + logging.debug('received packet: %r', raw_pkt) - self.serial_.write(cmd.tobytes()) - self.serial_.flush() + # discard the checksum and copy + pkt = raw_pkt.value - if not acknowledge: - self.sent_counter_ = not self.sent_counter_ - result = self._read_response() - return result + if not pkt.disconnect and pkt.sequence_number != self.expect_receive_: + raise lifescan.MalformedCommand( + 'at position 2[0b] expected %02x, received %02x' % ( + self.expect_receive_, pkt.sequence_count)) - def get_meter_info(self): - return common.MeterInfo( - 'OneTouch Ultra Easy glucometer', - serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + return pkt - def get_version(self): - result = self._send_command(_READ_VERSION) + def _send_ack(self): + self._send_packet(b'', acknowledge=True, disconnect=False) - response = self._read_response() + def _read_ack(self): + pkt = self._read_packet() + assert pkt.acknowledge - return response.data[3:].tobytes().decode('ascii') + def _send_request(self, request_format, *args): + request = request_format.build(*args) + self._send_packet(request, acknowledge=False, disconnect=False) - def get_serial_number(self): - result = self._send_command(_READ_SERIAL_NUMBER) + self.sent_counter_ = not self.sent_counter_ + self._read_ack() - response = self._read_response() + def _read_response(self, response_format): + pkt = self._read_packet() + assert not pkt.acknowledge - return response.data[2:].tobytes().decode('ascii') + self.expect_receive_ = not self.expect_receive_ + self._send_ack() - def get_datetime(self): - result = self._send_command(_READ_DATETIME) - response = self._read_response() + return response_format.parse(pkt.message) - return _convert_timestamp(response.data[2:6]) + def get_meter_info(self): + return common.MeterInfo( + 'OneTouch Ultra Easy glucometer', + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self.get_version(),), + native_unit=self.get_glucose_unit()) - def set_datetime(self, date=datetime.datetime.now()): - epoch = datetime.datetime.utcfromtimestamp(0) - delta = date - epoch - timestamp = int(delta.total_seconds()) + def get_version(self): + self._send_request(_VERSION_REQUEST, None) - timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) + response = self._read_response(_VERSION_RESPONSE) - result = self._send_command(_WRITE_DATETIME + timestamp_bytes) + return response.version - response = self._read_response() - return _convert_timestamp(response.data[2:6]) + def get_serial_number(self): + self._send_request(_SERIAL_NUMBER_REQUEST, None) - def zero_log(self): - result = self._send_command(_DELETE_RECORDS) - response = self._read_response() + response = self._read_response(_SERIAL_NUMBER_RESPONSE) + return response.serial_number - if response.data.tobytes() != b'\x05\x06': - raise exceptions.InvalidResponse(response.data) + def get_datetime(self): + self._send_request( + _DATETIME_REQUEST, {'request_type': 'read'}) + response = self._read_response(_DATETIME_RESPONSE) + return response.timestamp - def get_glucose_unit(self): - result = self._send_command(_READ_GLUCOSE_UNIT) - response = self._read_response() + def set_datetime(self, date=datetime.datetime.now()): + self._send_request(_DATETIME_REQUEST, { + 'request_type': 'write', + 'timestamp': date, + }) - if response.data[2] == 0: - return common.Unit.MG_DL - elif response.data[2] == 1: - return common.Unit.MMOL_L - else: - raise lifescan.MalformedCommand( - 'at position PM1 invalid value %02x for unit' % response.data[2]) + response = self._read_response(_DATETIME_RESPONSE) + return response.timestamp - def _get_reading(self, record_id): - id_bytes = _STRUCT_RECORDID.pack(record_id) + def zero_log(self): + self._send_request(_ZERO_LOG_REQUEST, None) + self._read_response(_COMMAND_SUCCESS) - result = self._send_command(_READ_RECORD + id_bytes) - return self._read_response() + def get_glucose_unit(self): + self._send_request(_GLUCOSE_UNIT_REQUEST, None) + response = self._read_response(_GLUCOSE_UNIT_RESPONSE) - def get_readings(self): - count_response = self._get_reading(_INVALID_RECORD) + return response.unit - record_count, = _STRUCT_RECORDID.unpack_from(count_response.data, 2) + def _get_reading(self, record_id): + self._send_request( + _READ_RECORD_REQUEST, {'record_id': record_id}) + return self._read_response(_READING_RESPONSE) - for record_id in range(record_count): - record_response = self._get_reading(record_id) + def get_readings(self): + self._send_request( + _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD}) + count_response = self._read_response(_READING_COUNT_RESPONSE) - timestamp = _convert_timestamp(record_response.data[2:6]) - value, = _STRUCT_TIMESTAMP.unpack_from(record_response.data, 6) + for record_id in range(count_response.count): + self._send_request( + _READ_RECORD_REQUEST, {'record_id': record_id}) + reading = self._read_response(_READING_RESPONSE) - yield common.GlucoseReading(timestamp, float(value)) + yield common.GlucoseReading( + reading.timestamp, + float(reading.value)) @@ -49,7 +49,7 @@ setup( # These are all the drivers' dependencies. Optional dependencies are # listed as mandatory for the feature. 'otultra2': ['pyserial'], - 'otultraeasy': ['pyserial'], + 'otultraeasy': ['construct', 'pyserial'], 'otverio2015': ['python-scsi'], 'fsinsulinx': ['construct', 'hidapi'], 'fslibre': ['construct', 'hidapi'], diff --git a/test/test_otultraeasy.py b/test/test_otultraeasy.py index a1d4c02..52a98f1 100644 --- a/test/test_otultraeasy.py +++ b/test/test_otultraeasy.py @@ -38,23 +38,6 @@ class TestOTUltraMini(unittest.TestCase): 0x62C2, lifescan.crc_ccitt(cmd_array)) - def test_packet_update_checksum(self): - packet = otultraeasy._Packet() - - packet.build_command('') - packet.disconnect = True - - packet.update_checksum() - self.assertEqual( - b'\x02\x06\x08\x03\xC2\x62', - packet.tobytes()) - - packet.validate_checksum() - packet.disconnect = False - - with self.assertRaises(exceptions.InvalidChecksum): - packet.validate_checksum() - if __name__ == '__main__': unittest.main() |