From 649a12b906f8310b46f0e0d88374446abd1632e1 Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 18 Feb 2020 18:14:56 +0100 Subject: factor out common driver methods --- glucometerutils/drivers/accuchek_reports.py | 6 ++-- glucometerutils/drivers/contourusb.py | 19 +++++----- glucometerutils/drivers/fsinsulinx.py | 4 +++ glucometerutils/drivers/fslibre.py | 3 +- glucometerutils/drivers/fsoptium.py | 21 +++-------- glucometerutils/drivers/fsprecisionneo.py | 2 ++ glucometerutils/drivers/otultra2.py | 20 ++--------- glucometerutils/drivers/otultraeasy.py | 14 +++----- glucometerutils/drivers/otverio2015.py | 12 +++---- glucometerutils/drivers/otverioiq.py | 19 ++-------- glucometerutils/drivers/sdcodefree.py | 10 +++--- glucometerutils/drivers/td4277.py | 14 +++++--- glucometerutils/support/driver_base.py | 55 +++++++++++++++++++++++++++++ glucometerutils/support/freestyle.py | 16 ++++----- 14 files changed, 114 insertions(+), 101 deletions(-) create mode 100644 glucometerutils/support/driver_base.py diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py index 0b27196..06b69bc 100644 --- a/glucometerutils/drivers/accuchek_reports.py +++ b/glucometerutils/drivers/accuchek_reports.py @@ -21,6 +21,7 @@ import os from glucometerutils import common from glucometerutils import exceptions +from glucometerutils.support import driver_base _UNIT_MAP = { 'mmol/l': common.Unit.MMOL_L, @@ -44,7 +45,8 @@ _TIME_FORMAT = '%H:%M' _DATETIME_FORMAT = ' '.join((_DATE_FORMAT, _TIME_FORMAT)) -class Device: + +class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device or not os.path.isdir(device): raise exceptions.CommandLineError( @@ -104,7 +106,7 @@ class Device: def get_datetime(self): raise NotImplementedError - def set_datetime(self, date=None): + def _set_device_datetime(self, date): raise NotImplementedError def zero_log(self): diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 6c333fb..397eb4f 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -21,7 +21,8 @@ http://protocols.ascensia.com/Programming-Guide.aspx import datetime from glucometerutils import common -from glucometerutils.support import contourusb +from glucometerutils.support import contourusb, driver_base + def _extract_timestamp(parsed_record, prefix=''): """Extract the timestamp from a parsed record. @@ -39,15 +40,13 @@ def _extract_timestamp(parsed_record, prefix=''): 0) -class Device(contourusb.ContourHidDevice): +class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver): """Glucometer driver for FreeStyle Libre devices.""" USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int - def get_meter_info(self): - """Return the device information in structured form.""" self._get_info_record() return common.MeterInfo( 'Contour USB', @@ -57,14 +56,11 @@ class Device(contourusb.ContourHidDevice): native_unit= self.get_glucose_unit()) def get_glucose_unit(self): # pylint: disable=no-self-use - """Returns the glucose unit of the device.""" - if self._get_glucose_unit() == '0': return common.Unit.MG_DL else: return common.Unit.MMOL_L - def get_readings(self): """ Get reading dump from download data mode(all readings stored) @@ -77,5 +73,12 @@ class Device(contourusb.ContourHidDevice): comment=parsed_record['markers'], measure_method=common.MeasurementMethod.BLOOD_SAMPLE ) - + def get_serial_number(self): + raise NotImplementedError + + def _set_device_datetime(self, date): + raise NotImplementedError + + def zero_log(self): + raise NotImplementedError diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py index 76707ec..f3cf043 100644 --- a/glucometerutils/drivers/fsinsulinx.py +++ b/glucometerutils/drivers/fsinsulinx.py @@ -71,3 +71,7 @@ class Device(freestyle.FreeStyleHidDevice): raw_reading.hour, raw_reading.minute) yield common.GlucoseReading(timestamp, raw_reading.value) + + def zero_log(self): + raise NotImplementedError + diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index b07a7ca..f1ac525 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -124,7 +124,6 @@ def _parse_arresult(record): else: return None - # Check right away if we have rapid insulin if parsed_record['rapid-acting-flag']: parsed_record.update( @@ -194,7 +193,6 @@ def _parse_arresult(record): else: comment_parts.append('Rapid-acting insulin') - return cls( _extract_timestamp(parsed_record), value, @@ -203,6 +201,7 @@ def _parse_arresult(record): extra_data={'device_id': parsed_record['device_id']}, ) + class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Libre devices.""" diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py index b1122f1..66b23ca 100644 --- a/glucometerutils/drivers/fsoptium.py +++ b/glucometerutils/drivers/fsoptium.py @@ -22,8 +22,7 @@ import re from glucometerutils import common from glucometerutils import exceptions -from glucometerutils.support import serial - +from glucometerutils.support import serial, driver_base _CLOCK_RE = re.compile( r'^Clock:\t(?P[A-Z][a-z]{2}) (?P[0-9]{2}) (?P[0-9]{4})\t' @@ -85,7 +84,7 @@ def _parse_clock(datestr): return datetime.datetime(year, month, day, hour, minute, second) -class Device(serial.SerialDevice): +class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 DEFAULT_CABLE_ID = '1a61:3420' @@ -107,7 +106,7 @@ class Device(serial.SerialDevice): return decoded_response def connect(self): - self._send_command('xmem') # ignore output this time + self._send_command('xmem') # ignore output this time self._fetch_device_information() def disconnect(self): # pylint: disable=no-self-use @@ -190,19 +189,7 @@ class Device(serial.SerialDevice): raise exceptions.InvalidResponse('\n'.join(data)) - def set_datetime(self, date=None): - """Sets the date and time of the glucometer. - - Args: - date: The value to set the date/time of the glucometer to. If none is - given, the current date and time of the computer is used. - - Returns: - A datetime object built according to the returned response. - """ - if not date: - date = datetime.datetime.now() - + def _set_device_datetime(self, date): data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M')) parsed_data = ''.join(data) diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py index 6c975c3..58564e5 100644 --- a/glucometerutils/drivers/fsprecisionneo.py +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -98,3 +98,5 @@ class Device(freestyle.FreeStyleHidDevice): yield cls(timestamp, value) + def zero_log(self): + raise NotImplementedError diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py index bfab2b4..39be859 100644 --- a/glucometerutils/drivers/otultra2.py +++ b/glucometerutils/drivers/otultra2.py @@ -18,8 +18,7 @@ import re from glucometerutils import common from glucometerutils import exceptions -from glucometerutils.support import lifescan -from glucometerutils.support import serial +from glucometerutils.support import driver_base, lifescan, serial # The following two hashes are taken directly from LifeScan's documentation _MEAL_CODES = { @@ -129,7 +128,7 @@ def _parse_datetime(response): return datetime.datetime(2000 + year, month, day, hour, minute, second) -class Device(serial.SerialDevice): +class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. @@ -229,22 +228,9 @@ class Device(serial.SerialDevice): response = self._send_oneliner_command('DMF') return _parse_datetime(response[2:]) - def set_datetime(self, date=None): - """Sets the date and time of the glucometer. - - Args: - date: The value to set the date/time of the glucometer to. If none is - given, the current date and time of the computer is used. - - Returns: - A datetime object built according to the returned response. - """ - if not date: - date = datetime.datetime.now() - + def _set_device_datetime(self, date): response = self._send_oneliner_command( 'DMT' + date.strftime('%m/%d/%y %H:%M:%S')) - return _parse_datetime(response[2:]) def zero_log(self): diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index aee3adc..7f4934e 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -22,10 +22,7 @@ import logging import construct from glucometerutils import common -from glucometerutils.support import construct_extras -from glucometerutils.support import lifescan -from glucometerutils.support import lifescan_binary_protocol -from glucometerutils.support import serial +from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial _PACKET = lifescan_binary_protocol.LifeScanPacket(True) @@ -102,7 +99,8 @@ def _make_packet( }, }}}) -class Device(serial.SerialDevice): + +class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. TIMEOUT = 0.5 @@ -204,16 +202,12 @@ class Device(serial.SerialDevice): return response.timestamp - def set_datetime(self, date=None): - if not date: - date = datetime.datetime.now() - + def _set_device_datetime(self, date): response = self._send_request( _DATETIME_REQUEST, { 'request_type': 'write', 'timestamp': date, }, _DATETIME_RESPONSE) - return response.timestamp def zero_log(self): diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index f0dddd2..bde0af3 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -23,7 +23,6 @@ https://protocols.glucometers.tech/lifescan/onetouch-verio-2015 """ import binascii -import datetime import logging import construct @@ -32,8 +31,7 @@ from pyscsi.pyscsi.scsi_device import SCSIDevice from glucometerutils import common from glucometerutils import exceptions -from glucometerutils.support import lifescan -from glucometerutils.support import lifescan_binary_protocol +from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol # This device uses SCSI blocks as registers. _REGISTER_SIZE = 512 @@ -112,7 +110,8 @@ _READ_RECORD_RESPONSE = construct.Struct( construct.Padding(4), ) -class Device: + +class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device: raise exceptions.CommandLineError( @@ -201,10 +200,7 @@ class Device: 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp - def set_datetime(self, date=None): - if not date: - date = datetime.datetime.now() - + def _set_device_datetime(self, date): self._send_request( 3, _WRITE_RTC_REQUEST, {'timestamp': date}, _COMMAND_SUCCESS) diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py index 63f8f1c..69bdac9 100644 --- a/glucometerutils/drivers/otverioiq.py +++ b/glucometerutils/drivers/otverioiq.py @@ -16,16 +16,12 @@ auto-detected. """ import binascii -import datetime import logging import construct from glucometerutils import common -from glucometerutils.support import construct_extras -from glucometerutils.support import lifescan -from glucometerutils.support import lifescan_binary_protocol -from glucometerutils.support import serial +from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol, serial _PACKET = lifescan_binary_protocol.LifeScanPacket(False) @@ -101,7 +97,7 @@ _READING_RESPONSE = construct.Struct( ) -class Device(serial.SerialDevice): +class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x TIMEOUT = 0.5 @@ -110,12 +106,6 @@ class Device(serial.SerialDevice): super(Device, self).__init__(device) self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def connect(self): - pass - - def disconnect(self): - pass - def _send_packet(self, message): pkt = _PACKET.build( {'data': {'value': { @@ -172,10 +162,7 @@ class Device(serial.SerialDevice): return response.timestamp - def set_datetime(self, date=None): - if not date: - date = datetime.datetime.now() - + def _set_device_datetime(self, date): self._send_request( _WRITE_RTC_REQUEST, { 'timestamp': date, diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index b42c9d3..a6e2ce5 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -26,7 +26,8 @@ import construct from glucometerutils import common from glucometerutils import exceptions -from glucometerutils.support import serial +from glucometerutils.support import serial, driver_base + def xor_checksum(msg): return functools.reduce(operator.xor, msg) @@ -84,7 +85,7 @@ _READING = construct.Struct( ) -class Device(serial.SerialDevice): +class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. @@ -160,10 +161,7 @@ class Device(serial.SerialDevice): def get_datetime(self): # pylint: disable=no-self-use raise NotImplementedError - def set_datetime(self, date=None): - if not date: - date = datetime.datetime.now() - + def _set_device_datetime(self, date): setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') # Ignore the readings count. diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py index ce5667c..4ab25ee 100644 --- a/glucometerutils/drivers/td4277.py +++ b/glucometerutils/drivers/td4277.py @@ -23,7 +23,8 @@ import construct from glucometerutils import common from glucometerutils import exceptions -from glucometerutils.support import serial +from glucometerutils.support import serial, driver_base + class Direction(enum.Enum): In = 0xa5 @@ -130,7 +131,8 @@ def _select_record(record_id): return _READING_SELECTION_STRUCT.build({'record_id': record_id}) -class Device(serial.SerialDevice): +class Device(serial.SerialDevice, driver_base.GlucometerDriver): + BAUDRATE = 19200 TIMEOUT = 0.5 @@ -186,9 +188,7 @@ class Device(serial.SerialDevice): return _parse_datetime(message) - def set_datetime(self, date=None): - if not date: - date = datetime.datetime.now() + def _set_device_datetime(self, date): assert date.year >= 2000 day_struct = _DAY_BITSTRUCT.build({ @@ -234,3 +234,7 @@ class Device(serial.SerialDevice): def zero_log(self): self._send_command(_CLEAR_MEMORY) + + def get_glucose_unit(self): + """Maybe this could be implemented by someone who knows the device""" + raise NotImplementedError diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py new file mode 100644 index 0000000..b7b3d0f --- /dev/null +++ b/glucometerutils/support/driver_base.py @@ -0,0 +1,55 @@ +from abc import ABC, abstractmethod +from datetime import datetime + + +class GlucometerDriver(ABC): + + def connect(self): + pass + + def disconnect(self): + pass + + @abstractmethod + def get_meter_info(self): + """Return the device information in structured form.""" + pass + + @abstractmethod + def get_serial_number(self): + pass + + @abstractmethod + def get_glucose_unit(self): + """Returns the glucose unit of the device.""" + pass + + @abstractmethod + def get_datetime(self): + pass + + def set_datetime(self, date=None): + """Sets the date and time of the glucometer. + + Args: + date: The value to set the date/time of the glucometer to. If none is + given, the current date and time of the computer is used. + + Returns: + A datetime object built according to the returned response. + """ + if not date: + date = datetime.now() + return self._set_device_datetime(date) + + @abstractmethod + def _set_device_datetime(self, date): + pass + + @abstractmethod + def zero_log(self): + pass + + @abstractmethod + def get_readings(self): + pass diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index ed8f7e9..1e8c097 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -12,12 +12,13 @@ import csv import datetime import logging import re +from abc import ABC from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple import construct from glucometerutils import exceptions -from glucometerutils.support import hiddevice +from glucometerutils.support import hiddevice, driver_base _INIT_COMMAND = 0x01 _INIT_RESPONSE = 0x71 @@ -103,6 +104,7 @@ def _verify_checksum(message, expected_checksum_hex): if expected_checksum != calculated_checksum: raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) + def convert_ketone_unit(raw_value): """Convert raw ketone value as read in the device to its value in mmol/L. @@ -113,7 +115,8 @@ def convert_ketone_unit(raw_value): """ return raw_value / 18.0 -class FreeStyleHidDevice(hiddevice.HidDevice): + +class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC): """Base class implementing the FreeStyle HID common protocol. This class implements opening, initializing the connection and sending @@ -295,12 +298,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice): except ValueError: raise exceptions.InvalidDateTime() - def set_datetime(self, date=None): + def _set_device_datetime(self, date): # type: (datetime.datetime) -> datetime.datetime - """Sets the date and time of the device.""" - if not date: - date = datetime.datetime.now() # The format used by the FreeStyle devices is not composable based on # standard strftime() (namely it includes no leading zeros), so we need # to build it manually. @@ -314,10 +314,6 @@ class FreeStyleHidDevice(hiddevice.HidDevice): return self.get_datetime() - def zero_log(self): - """Not implemented, Abbott devices don't allow resetting memory.""" - raise NotImplementedError - def _get_multirecord(self, command): # type: (bytes) -> Iterator[List[Text]] """Queries for, and returns, "multirecords" results. -- cgit v1.2.3