summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README4
-rw-r--r--glucometerutils/drivers/otultraeasy.py455
-rw-r--r--setup.py2
-rw-r--r--test/test_otultraeasy.py17
4 files changed, 201 insertions, 277 deletions
diff --git a/README b/README
index a5dcd32..947348c 100644
--- a/README
+++ b/README
@@ -35,8 +35,8 @@ supported.
| Manufacturer | Model Name | Driver | Dependencies |
| --- | --- | --- | --- |
| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] |
-| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] |
-| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] |
+| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] |
+| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] |
| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] |
| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] |
| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ |
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py
index 0538dac..579c07c 100644
--- a/glucometerutils/drivers/otultraeasy.py
+++ b/glucometerutils/drivers/otultraeasy.py
@@ -18,305 +18,246 @@ __email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2014-2017, Diego Elio Pettenò'
__license__ = 'MIT'
-import array
+import binascii
import datetime
import logging
-import re
-import struct
-import time
+
+import construct
from glucometerutils import common
-from glucometerutils import exceptions
from glucometerutils.support import lifescan
from glucometerutils.support import serial
-_STX = 0x02
-_ETX = 0x03
-
-_IDX_STX = 0
-_IDX_LENGTH = 1
-_IDX_CONTROL = 2
-_IDX_DATA = 3
-_IDX_ETX = -3
-_IDX_CHECKSUM = -2
-
-_BIT_SENT_COUNTER = 0x01
-_BIT_EXPECT_RECEIVE = 0x02
-_BIT_ACK = 0x04
-_BIT_DISCONNECT = 0x08
-_BIT_MORE = 0x10
-
-_READ_SERIAL_NUMBER = b'\x05\x0B\x02\x00\x00\x00\x00\x84\x6A\xE8\x73\x00'
-_READ_VERSION = b'\x05\x0D\x02'
-_READ_GLUCOSE_UNIT = b'\x05\x09\x02\x09\x00\x00\x00\x00'
-_DELETE_RECORDS = b'\x05\x1A'
-_READ_DATETIME = b'\x05\x20\x02\x00\x00\x00\x00'
-_WRITE_DATETIME = b'\x05\x20\x01'
-_READ_RECORD = b'\x05\x1F'
_INVALID_RECORD = 501
-_STRUCT_TIMESTAMP = struct.Struct('<I')
-_STRUCT_RECORDID = struct.Struct('<H')
-
-
-class UnsetPacketError(LookupError):
- pass
-
-
-def _convert_timestamp(timestamp_bytes):
- timestamp, = _STRUCT_TIMESTAMP.unpack(timestamp_bytes)
-
- return datetime.datetime.fromtimestamp(timestamp)
-
-
-class _Packet(object):
- _STRUCT = struct.Struct('<H')
-
- def __init__(self):
- self.cmd = array.array('B')
-
- def read_from(self, serial):
- self.cmd.extend(serial.read(3))
-
- if self.cmd[_IDX_STX] != _STX:
- raise lifescan.MalformedCommand(
- 'at position %s expected %02x, received %02x' % (
- _IDX_STX, _STX, self.cmd[_IDX_STX]))
-
- # the length includes prelude and appendix, which are six bytes total.
- if self.length > 6:
- self.cmd.extend(serial.read(self.length - 6))
-
- self.cmd.extend(serial.read(3))
-
- if self.cmd[_IDX_ETX] != _ETX:
- raise lifescan.MalformedCommand(
- 'at position %s expected %02x, received %02x' % (
- _IDX_ETX, _ETX, self.cmd[_IDX_ETX]))
-
- def build_command(self, cmd_bytes):
- self.cmd.append(_STX)
- self.cmd.append(6 + len(cmd_bytes))
- self.cmd.append(0x00) # link control
- self.cmd.extend(cmd_bytes)
- self.cmd.extend([_ETX, 0x00, 0x00])
-
- @property
- def length(self):
- if not self.cmd:
- return None
-
- return self.cmd[_IDX_LENGTH]
-
- def __is_in_control(self, bitmask):
- if not self.cmd:
- return None
-
- return bool(self.cmd[_IDX_CONTROL] & bitmask)
-
- def __set_in_control(self, bitmask, value):
- if not self.cmd:
- return None
-
- if value:
- self.cmd[_IDX_CONTROL] |= bitmask
- else:
- self.cmd[_IDX_CONTROL] &= (~bitmask) & 0xFF
-
- return value
-
- @property
- def sent_counter(self):
- return self.__is_in_control(_BIT_SENT_COUNTER)
-
- @sent_counter.setter
- def sent_counter(self, value):
- self.__set_in_control(_BIT_SENT_COUNTER, value)
-
- @property
- def expect_receive(self):
- return self.__is_in_control(_BIT_EXPECT_RECEIVE)
-
- @expect_receive.setter
- def expect_receive(self, value):
- self.__set_in_control(_BIT_EXPECT_RECEIVE, value)
-
- @property
- def checksum(self):
- return lifescan.crc_ccitt(self.cmd[:_IDX_CHECKSUM].tobytes())
-
- @property
- def acknowledge(self):
- return self.__is_in_control(_BIT_ACK)
-
- @acknowledge.setter
- def acknowledge(self, value):
- self.__set_in_control(_BIT_ACK, value)
-
- @property
- def disconnect(self):
- return self.__is_in_control(_BIT_DISCONNECT)
-
- @disconnect.setter
- def disconnect(self, value):
- self.__set_in_control(_BIT_DISCONNECT, value)
-
- @property
- def more(self):
- return self.__is_in_control(_BIT_MORE)
-
- @more.setter
- def more(self, value):
- self.__set_in_control(_BIT_MORE, value)
-
- def validate_checksum(self):
- expected_checksum = self.checksum
- received_checksum = self._STRUCT.unpack(self.cmd[_IDX_CHECKSUM:])[0]
- if received_checksum != expected_checksum:
- raise exceptions.InvalidChecksum(expected_checksum, received_checksum)
-
- def update_checksum(self):
- self._STRUCT.pack_into(self.cmd, _IDX_CHECKSUM, self.checksum)
-
- def tobytes(self):
- return self.cmd.tobytes()
-
- @property
- def data(self):
- return self.cmd[_IDX_DATA:_IDX_ETX]
-
+_EPOCH = datetime.datetime.utcfromtimestamp(0)
+
+def datetime_to_timestamp(date):
+ delta = date - _EPOCH
+ return int(delta.total_seconds())
+
+
+_PACKET = construct.Struct(
+ construct.RawCopy(
+ construct.Embedded(
+ construct.Struct(
+ construct.Const(b'\x02'), # stx
+ 'length' / construct.Rebuild(
+ construct.Byte, lambda ctx: len(ctx.message) + 6),
+ construct.EmbeddedBitStruct(
+ construct.Padding(3),
+ 'more' / construct.Default(construct.Flag, False),
+ 'disconnect' / construct.Flag,
+ 'acknowledge' / construct.Flag,
+ 'expect_receive' / construct.Flag,
+ 'sequence_number' / construct.Flag,
+ ),
+ 'message' / construct.Bytes(length=lambda ctx: ctx.length - 6),
+ construct.Const(b'\x03'), # etx
+ ),
+ ),
+ ),
+ 'checksum' / construct.Checksum(
+ construct.Int16ul, lifescan.crc_ccitt, construct.this.data),
+)
+
+_COMMAND_SUCCESS = construct.Const(b'\x05\x06')
+_TIMESTAMP_ADAPTER = construct.ExprAdapter(
+ construct.Int32ul,
+ encoder=lambda obj, ctx: datetime_to_timestamp(obj),
+ decoder=lambda obj, ctx: datetime.datetime.fromtimestamp(obj))
+
+_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02')
+
+_VERSION_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+)
+
+_SERIAL_NUMBER_REQUEST = construct.Const(
+ b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+
+_SERIAL_NUMBER_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'serial_number' / construct.GreedyString(encoding='ascii'),
+)
+
+_DATETIME_REQUEST = construct.Struct(
+ construct.Const(b'\x05\x20'), # 0x20 is the datetime
+ 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02),
+ 'timestamp' / construct.Default(_TIMESTAMP_ADAPTER, _EPOCH),
+)
+
+_DATETIME_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'timestamp' / _TIMESTAMP_ADAPTER,
+)
+
+_GLUCOSE_UNIT_REQUEST = construct.Const(
+ b'\x05\x09\x02\x09\x00\x00\x00\x00')
+
+_GLUCOSE_MAPPING = {
+ common.Unit.MG_DL: 0x00,
+ common.Unit.MMOL_L: 0x01,
+}
+
+_GLUCOSE_UNIT_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'unit' / construct.SymmetricMapping(
+ construct.Byte, _GLUCOSE_MAPPING),
+ construct.Padding(3),
+)
+
+_ZERO_LOG_REQUEST = construct.Const(b'\x05\x1A')
+
+_READING_COUNT_RESPONSE = construct.Struct(
+ construct.Const(b'\x05\x0f'),
+ 'count' / construct.Int16ul,
+)
+
+_READ_RECORD_REQUEST = construct.Struct(
+ construct.Const(b'\x05\x1f'),
+ 'record_id' / construct.Int16ul,
+)
+
+_READING_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'timestamp' / _TIMESTAMP_ADAPTER,
+ 'value' / construct.Int32ul,
+)
class Device(serial.SerialDevice):
- BAUDRATE = 9600
- DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
-
- def __init__(self, device):
- super(Device, self).__init__(device)
-
- self.sent_counter_ = False
- self.expect_receive_ = False
-
- def connect(self):
- self._send_command('', disconnect=True)
-
- def disconnect(self):
- self.connect()
-
- def _read_response(self):
- response = _Packet()
-
- response.read_from(self.serial_)
-
- if not response.disconnect and response.sent_counter != self.expect_receive_:
- raise lifescan.MalformedCommand(
- 'at position 2[0b] expected %02x, received %02x' % (
- self.expect_receive_, response.expect_receive))
+ BAUDRATE = 9600
+ DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
+ TIMEOUT = 0.5
- if not response.acknowledge:
- self.expect_receive_ = not self.expect_receive_
+ def __init__(self, device):
+ super(Device, self).__init__(device)
- response.validate_checksum()
+ self.sent_counter_ = False
+ self.expect_receive_ = False
+ self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- if not response.acknowledge:
- self._send_command('', acknowledge=True)
+ def connect(self):
+ self._send_packet(b'', disconnect=True)
+ self._read_ack()
- return response
+ def disconnect(self):
+ self.connect()
- def _send_command(self, cmd_bytes, acknowledge=False, disconnect=False):
- cmd = _Packet()
+ def _send_packet(self, message, acknowledge=False, disconnect=False):
+ pkt = _PACKET.build(
+ {'value': {
+ 'message': message,
+ 'sequence_number': self.sent_counter_,
+ 'expect_receive': self.expect_receive_,
+ 'acknowledge': acknowledge,
+ 'disconnect': disconnect,
+ }})
+ logging.debug('sending packet: %s', binascii.hexlify(pkt))
- # set the proper expectations
- cmd.build_command(cmd_bytes)
- cmd.sent_counter = self.sent_counter_
- cmd.expect_receive = self.expect_receive_
- cmd.acknowledge = acknowledge
- cmd.disconnect = disconnect
+ self.serial_.write(pkt)
+ self.serial_.flush()
- cmd.update_checksum()
+ def _read_packet(self):
+ raw_pkt = self.buffered_reader_.parse_stream(self.serial_)
+ logging.debug('received packet: %r', raw_pkt)
- self.serial_.write(cmd.tobytes())
- self.serial_.flush()
+ # discard the checksum and copy
+ pkt = raw_pkt.value
- if not acknowledge:
- self.sent_counter_ = not self.sent_counter_
- result = self._read_response()
- return result
+ if not pkt.disconnect and pkt.sequence_number != self.expect_receive_:
+ raise lifescan.MalformedCommand(
+ 'at position 2[0b] expected %02x, received %02x' % (
+ self.expect_receive_, pkt.sequence_count))
- def get_meter_info(self):
- return common.MeterInfo(
- 'OneTouch Ultra Easy glucometer',
- serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ return pkt
- def get_version(self):
- result = self._send_command(_READ_VERSION)
+ def _send_ack(self):
+ self._send_packet(b'', acknowledge=True, disconnect=False)
- response = self._read_response()
+ def _read_ack(self):
+ pkt = self._read_packet()
+ assert pkt.acknowledge
- return response.data[3:].tobytes().decode('ascii')
+ def _send_request(self, request_format, *args):
+ request = request_format.build(*args)
+ self._send_packet(request, acknowledge=False, disconnect=False)
- def get_serial_number(self):
- result = self._send_command(_READ_SERIAL_NUMBER)
+ self.sent_counter_ = not self.sent_counter_
+ self._read_ack()
- response = self._read_response()
+ def _read_response(self, response_format):
+ pkt = self._read_packet()
+ assert not pkt.acknowledge
- return response.data[2:].tobytes().decode('ascii')
+ self.expect_receive_ = not self.expect_receive_
+ self._send_ack()
- def get_datetime(self):
- result = self._send_command(_READ_DATETIME)
- response = self._read_response()
+ return response_format.parse(pkt.message)
- return _convert_timestamp(response.data[2:6])
+ def get_meter_info(self):
+ return common.MeterInfo(
+ 'OneTouch Ultra Easy glucometer',
+ serial_number=self.get_serial_number(),
+ version_info=(
+ 'Software version: ' + self.get_version(),),
+ native_unit=self.get_glucose_unit())
- def set_datetime(self, date=datetime.datetime.now()):
- epoch = datetime.datetime.utcfromtimestamp(0)
- delta = date - epoch
- timestamp = int(delta.total_seconds())
+ def get_version(self):
+ self._send_request(_VERSION_REQUEST, None)
- timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp)
+ response = self._read_response(_VERSION_RESPONSE)
- result = self._send_command(_WRITE_DATETIME + timestamp_bytes)
+ return response.version
- response = self._read_response()
- return _convert_timestamp(response.data[2:6])
+ def get_serial_number(self):
+ self._send_request(_SERIAL_NUMBER_REQUEST, None)
- def zero_log(self):
- result = self._send_command(_DELETE_RECORDS)
- response = self._read_response()
+ response = self._read_response(_SERIAL_NUMBER_RESPONSE)
+ return response.serial_number
- if response.data.tobytes() != b'\x05\x06':
- raise exceptions.InvalidResponse(response.data)
+ def get_datetime(self):
+ self._send_request(
+ _DATETIME_REQUEST, {'request_type': 'read'})
+ response = self._read_response(_DATETIME_RESPONSE)
+ return response.timestamp
- def get_glucose_unit(self):
- result = self._send_command(_READ_GLUCOSE_UNIT)
- response = self._read_response()
+ def set_datetime(self, date=datetime.datetime.now()):
+ self._send_request(_DATETIME_REQUEST, {
+ 'request_type': 'write',
+ 'timestamp': date,
+ })
- if response.data[2] == 0:
- return common.Unit.MG_DL
- elif response.data[2] == 1:
- return common.Unit.MMOL_L
- else:
- raise lifescan.MalformedCommand(
- 'at position PM1 invalid value %02x for unit' % response.data[2])
+ response = self._read_response(_DATETIME_RESPONSE)
+ return response.timestamp
- def _get_reading(self, record_id):
- id_bytes = _STRUCT_RECORDID.pack(record_id)
+ def zero_log(self):
+ self._send_request(_ZERO_LOG_REQUEST, None)
+ self._read_response(_COMMAND_SUCCESS)
- result = self._send_command(_READ_RECORD + id_bytes)
- return self._read_response()
+ def get_glucose_unit(self):
+ self._send_request(_GLUCOSE_UNIT_REQUEST, None)
+ response = self._read_response(_GLUCOSE_UNIT_RESPONSE)
- def get_readings(self):
- count_response = self._get_reading(_INVALID_RECORD)
+ return response.unit
- record_count, = _STRUCT_RECORDID.unpack_from(count_response.data, 2)
+ def _get_reading(self, record_id):
+ self._send_request(
+ _READ_RECORD_REQUEST, {'record_id': record_id})
+ return self._read_response(_READING_RESPONSE)
- for record_id in range(record_count):
- record_response = self._get_reading(record_id)
+ def get_readings(self):
+ self._send_request(
+ _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD})
+ count_response = self._read_response(_READING_COUNT_RESPONSE)
- timestamp = _convert_timestamp(record_response.data[2:6])
- value, = _STRUCT_TIMESTAMP.unpack_from(record_response.data, 6)
+ for record_id in range(count_response.count):
+ self._send_request(
+ _READ_RECORD_REQUEST, {'record_id': record_id})
+ reading = self._read_response(_READING_RESPONSE)
- yield common.GlucoseReading(timestamp, float(value))
+ yield common.GlucoseReading(
+ reading.timestamp,
+ float(reading.value))
diff --git a/setup.py b/setup.py
index a9b7c18..3be4b94 100644
--- a/setup.py
+++ b/setup.py
@@ -49,7 +49,7 @@ setup(
# These are all the drivers' dependencies. Optional dependencies are
# listed as mandatory for the feature.
'otultra2': ['pyserial'],
- 'otultraeasy': ['pyserial'],
+ 'otultraeasy': ['construct', 'pyserial'],
'otverio2015': ['python-scsi'],
'fsinsulinx': ['construct', 'hidapi'],
'fslibre': ['construct', 'hidapi'],
diff --git a/test/test_otultraeasy.py b/test/test_otultraeasy.py
index a1d4c02..52a98f1 100644
--- a/test/test_otultraeasy.py
+++ b/test/test_otultraeasy.py
@@ -38,23 +38,6 @@ class TestOTUltraMini(unittest.TestCase):
0x62C2,
lifescan.crc_ccitt(cmd_array))
- def test_packet_update_checksum(self):
- packet = otultraeasy._Packet()
-
- packet.build_command('')
- packet.disconnect = True
-
- packet.update_checksum()
- self.assertEqual(
- b'\x02\x06\x08\x03\xC2\x62',
- packet.tobytes())
-
- packet.validate_checksum()
- packet.disconnect = False
-
- with self.assertRaises(exceptions.InvalidChecksum):
- packet.validate_checksum()
-
if __name__ == '__main__':
unittest.main()