From 7d153624c3a3543f0b8f906144e35105a5d48801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Fri, 27 Mar 2020 15:13:50 +0000 Subject: The big typing cleanup. Now that Python 3.7 is the minimum Python version, typing can be done inline, which makes it easier for mypy to know the files to check. Indeed, all the files are now typechecked, which uncovered a few minor bugs and mistakes here and there. --- glucometerutils/common.py | 11 +++- glucometerutils/drivers/accuchek_reports.py | 33 ++++++---- glucometerutils/drivers/contourusb.py | 21 +++--- glucometerutils/drivers/fsinsulinx.py | 11 ++-- glucometerutils/drivers/fslibre.py | 39 ++++++----- glucometerutils/drivers/fsoptium.py | 29 +++++---- glucometerutils/drivers/fsprecisionneo.py | 69 ++++++++++---------- glucometerutils/drivers/otultra2.py | 36 ++++++----- glucometerutils/drivers/otultraeasy.py | 54 ++++++++++------ glucometerutils/drivers/otverio2015.py | 42 +++++++----- glucometerutils/drivers/otverioiq.py | 37 ++++++----- glucometerutils/drivers/sdcodefree.py | 33 +++++----- glucometerutils/drivers/td4277.py | 46 +++++++------ glucometerutils/exceptions.py | 35 +++++----- glucometerutils/support/construct_extras.py | 6 +- glucometerutils/support/driver_base.py | 55 +++++++++------- glucometerutils/support/freestyle.py | 75 +++++++++++----------- glucometerutils/support/hiddevice.py | 16 +++-- glucometerutils/support/lifescan.py | 9 ++- .../support/lifescan_binary_protocol.py | 5 +- glucometerutils/support/serial.py | 12 ++-- glucometerutils/tests/test_common.py | 38 +++++++++-- pyproject.toml | 2 +- 23 files changed, 409 insertions(+), 305 deletions(-) diff --git a/glucometerutils/common.py b/glucometerutils/common.py index ddb2607..bf944c8 100644 --- a/glucometerutils/common.py +++ b/glucometerutils/common.py @@ -6,7 +6,7 @@ import datetime import enum import textwrap -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, Optional, Sequence, Union import attr @@ -91,6 +91,10 @@ class KetoneReading: timestamp: datetime.datetime value: float comment: str = "" + measure_method: MeasurementMethod = attr.ib( + default=MeasurementMethod.BLOOD_SAMPLE, + validator=attr.validators.in_({MeasurementMethod.BLOOD_SAMPLE}), + ) extra_data: Dict[str, Any] = attr.Factory(dict) def as_csv(self, unit: Unit) -> str: @@ -100,7 +104,7 @@ class KetoneReading: return '"%s","%.2f","%s","%s"' % ( self.timestamp, self.value, - MeasurementMethod.BLOOD_SAMPLE.value, + self.measure_method.value, self.comment, ) @@ -123,6 +127,9 @@ class TimeAdjustment: ) +AnyReading = Union[GlucoseReading, KetoneReading, TimeAdjustment] + + @attr.s(auto_attribs=True) class MeterInfo: """General information about the meter. diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py index e6dc2ea..de29064 100644 --- a/glucometerutils/drivers/accuchek_reports.py +++ b/glucometerutils/drivers/accuchek_reports.py @@ -18,6 +18,7 @@ import csv import datetime import glob import os +from typing import Dict, Generator, NoReturn, Optional from glucometerutils import common, exceptions from glucometerutils.support import driver_base @@ -46,7 +47,7 @@ _DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT)) class Device(driver_base.GlucometerDriver): - def __init__(self, device): + def __init__(self, device: Optional[str]) -> None: if not device or not os.path.isdir(device): raise exceptions.CommandLineError( "--device parameter is required, should point to mount path " @@ -62,7 +63,7 @@ class Device(driver_base.GlucometerDriver): self.report_file = report_files[0] - def _get_records_reader(self): + def _get_records_reader(self) -> csv.DictReader: self.report.seek(0) # Skip the first two lines next(self.report) @@ -72,51 +73,55 @@ class Device(driver_base.GlucometerDriver): self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE ) - def connect(self): + def connect(self) -> None: self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8") - def disconnect(self): + def disconnect(self) -> None: self.report.close() - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( f"{self.get_model()} glucometer", serial_number=self.get_serial_number(), native_unit=self.get_glucose_unit(), ) - def get_model(self): + def get_model(self) -> str: # $device/MODEL/Reports/*.csv return os.path.basename(os.path.dirname(os.path.dirname(self.report_file))) - def get_serial_number(self): + def get_serial_number(self) -> str: self.report.seek(0) # ignore the first line. next(self.report) # The second line of the CSV is serial-no;report-date;report-time;;;;;;; return next(self.report).split(";")[0] - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: # Get the first record available and parse that. record = next(self._get_records_reader()) return _UNIT_MAP[record[_UNIT_CSV_KEY]] - def get_datetime(self): + def get_datetime(self) -> NoReturn: raise NotImplementedError - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> NoReturn: raise NotImplementedError - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError - def _extract_datetime(self, record): # pylint: disable=no-self-use + def _extract_datetime( + self, record: Dict[str, str] + ) -> datetime.datetime: # pylint: disable=no-self-use # Date and time are in separate column, but we want to parse them # together. date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT) - def _extract_meal(self, record): # pylint: disable=no-self-use + def _extract_meal( + self, record: Dict[str, str] + ) -> common.Meal: # pylint: disable=no-self-use if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]: raise exceptions.InvalidResponse("Reading cannot be before and after meal.") elif record[_AFTER_MEAL_CSV_KEY]: @@ -126,7 +131,7 @@ class Device(driver_base.GlucometerDriver): else: return common.Meal.NONE - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: for record in self._get_records_reader(): if record[_RESULT_CSV_KEY] is None: continue diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 095f920..8696b04 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -19,12 +19,13 @@ http://protocols.ascensia.com/Programming-Guide.aspx """ import datetime +from typing import Dict, Generator, NoReturn, Optional from glucometerutils import common from glucometerutils.support import contourusb -def _extract_timestamp(parsed_record, prefix=""): +def _extract_timestamp(parsed_record: Dict[str, str]): """Extract the timestamp from a parsed record. This leverages the fact that all the reading records have the same base structure. @@ -42,12 +43,12 @@ def _extract_timestamp(parsed_record, prefix=""): class Device(contourusb.ContourHidDevice): - """Glucometer driver for FreeStyle Libre devices.""" + """Glucometer driver for Contour devices.""" - def __init__(self, device): - self._hid_session = contourusb.ContourHidSession((0x1A79, 0x6002), device) + def __init__(self, device: Optional[str]) -> None: + super().__init__((0x1A79, 0x6002), device) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: self._get_info_record() return common.MeterInfo( "Contour USB", @@ -56,13 +57,13 @@ class Device(contourusb.ContourHidDevice): native_unit=self.get_glucose_unit(), ) - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: if self._get_glucose_unit() == "0": return common.Unit.MG_DL else: return common.Unit.MMOL_L - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """ Get reading dump from download data mode(all readings stored) This meter supports only blood samples @@ -75,11 +76,11 @@ class Device(contourusb.ContourHidDevice): measure_method=common.MeasurementMethod.BLOOD_SAMPLE, ) - def get_serial_number(self): + def get_serial_number(self) -> NoReturn: raise NotImplementedError - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> NoReturn: raise NotImplementedError - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py index a3e54fb..e984719 100644 --- a/glucometerutils/drivers/fsinsulinx.py +++ b/glucometerutils/drivers/fsinsulinx.py @@ -18,6 +18,7 @@ Xavier Claessens. import collections import datetime +from typing import Generator, NoReturn, Optional from glucometerutils import common from glucometerutils.support import freestyle @@ -51,10 +52,10 @@ _InsulinxReading = collections.namedtuple( class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle InsuLinux devices.""" - def __init__(self, device_path): + def __init__(self, device_path: Optional[str]) -> None: super().__init__(0x3460, device_path) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" return common.MeterInfo( "FreeStyle InsuLinx", @@ -63,11 +64,11 @@ class Device(freestyle.FreeStyleHidDevice): native_unit=self.get_glucose_unit(), ) - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use """Returns the glucose unit of the device.""" return common.Unit.MG_DL - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterate through the reading records in the device.""" for record in self._session.query_multirecord(b"$result?"): if not record or record[0] != _TYPE_GLUCOSE_READING: @@ -87,5 +88,5 @@ class Device(freestyle.FreeStyleHidDevice): yield common.GlucoseReading(timestamp, raw_reading.value) - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index 2e37ec1..558dcf4 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -20,6 +20,7 @@ https://protocols.glucometers.tech/abbott/freestyle-libre """ import datetime +from typing import Dict, Generator, Mapping, Optional, Sequence, Tuple, Type from glucometerutils import common from glucometerutils.support import freestyle @@ -70,7 +71,9 @@ _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = ( _ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),) -def _parse_record(record, entry_map): +def _parse_record( + record: Sequence[str], entry_map: Sequence[Tuple[int, str]] +) -> Dict[str, int]: """Parses a list of string fields into a dictionary of integers.""" if not record: @@ -82,7 +85,9 @@ def _parse_record(record, entry_map): return {} -def _extract_timestamp(parsed_record, prefix=""): +def _extract_timestamp( + parsed_record: Mapping[str, int], prefix: str = "" +) -> datetime.datetime: """Extract the timestamp from a parsed record. This leverages the fact that all the records have the same base structure. @@ -98,7 +103,7 @@ def _extract_timestamp(parsed_record, prefix=""): ) -def _parse_arresult(record): +def _parse_arresult(record: Sequence[str]) -> Optional[common.AnyReading]: """Takes an array of string fields as input and parses it into a Reading.""" parsed_record = _parse_record(record, _BASE_ENTRY_MAP) @@ -126,9 +131,9 @@ def _parse_arresult(record): return None comment_parts = [] - measure_method = None - cls = None - value = None + measure_method: Optional[common.MeasurementMethod] = None + cls: Optional[Type[common.AnyReading]] = None + value: Optional[float] = None if parsed_record["reading-type"] == 2: comment_parts.append("(Scan)") @@ -145,7 +150,10 @@ def _parse_arresult(record): measure_method = common.MeasurementMethod.BLOOD_SAMPLE cls = common.KetoneReading # automatically convert the raw value in mmol/L - value = freestyle.convert_ketone_unit(parsed_record["value"]) + raw_value = parsed_record["value"] + if raw_value is None: + raise ValueError(f"Invalid Ketone value: {parsed_record!r}") + value = freestyle.convert_ketone_unit(raw_value) else: # unknown reading return None @@ -183,7 +191,7 @@ def _parse_arresult(record): else: comment_parts.append("Rapid-acting insulin") - return cls( + reading = cls( _extract_timestamp(parsed_record), value, comment="; ".join(comment_parts), @@ -191,14 +199,16 @@ def _parse_arresult(record): extra_data={"device_id": parsed_record["device_id"]}, ) + return reading + class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Libre devices.""" - def __init__(self, device_path): + def __init__(self, device_path: Optional[str]) -> None: super().__init__(0x3650, device_path) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" return common.MeterInfo( "FreeStyle Libre", @@ -208,18 +218,17 @@ class Device(freestyle.FreeStyleHidDevice): patient_name=self.get_patient_name(), ) - def get_serial_number(self): + def get_serial_number(self) -> str: """Overridden function as the command is not compatible.""" return self._session.send_text_command(b"$sn?").rstrip("\r\n") - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use """Returns the glucose unit of the device.""" # TODO(Flameeyes): figure out how to identify the actual unit on the # device. return common.Unit.MG_DL - def get_readings(self): - + def get_readings(self) -> Generator[common.AnyReading, None, None]: # First of all get the usually longer list of sensor readings, and # convert them to Readings objects. for record in self._session.query_multirecord(b"$history?"): @@ -244,5 +253,5 @@ class Device(freestyle.FreeStyleHidDevice): if reading: yield reading - def zero_log(self): + def zero_log(self) -> None: self._session.send_text_command(b"$resetpatient") diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py index 77244af..cafd539 100644 --- a/glucometerutils/drivers/fsoptium.py +++ b/glucometerutils/drivers/fsoptium.py @@ -19,6 +19,7 @@ https://protocols.glucometers.tech/abbott/freestyle-optium import datetime import logging import re +from typing import Generator, NoReturn, Sequence from glucometerutils import common, exceptions from glucometerutils.support import driver_base, serial @@ -64,8 +65,8 @@ _MONTH_MATCHES = { } -def _parse_clock(datestr): - """Convert the date/time string used by the the device into a datetime. +def _parse_clock(datestr: str) -> datetime.datetime: + """Convert the date/time string used by the device into a datetime. Args: datestr: a string as returned by the device during information handling. @@ -88,7 +89,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 DEFAULT_CABLE_ID = "1a61:3420" - def _send_command(self, command): + def _send_command(self, command: str) -> Sequence[str]: cmd_bytes = bytes(f"$%s\r\n" % command, "ascii") logging.debug("Sending command: %r", cmd_bytes) @@ -104,14 +105,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response] return decoded_response - def connect(self): + def connect(self) -> None: self._send_command("xmem") # ignore output this time self._fetch_device_information() - def disconnect(self): # pylint: disable=no-self-use + def disconnect(self) -> None: # pylint: disable=no-self-use return - def _fetch_device_information(self): + def _fetch_device_information(self) -> None: data = self._send_command("colq") for line in data: @@ -134,7 +135,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): # back the commands and not replying to them. raise exceptions.ConnectionFailed() - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Fetch and parses the device information. Returns: @@ -147,7 +148,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: """Returns an identifier of the firmware version of the glucometer. Returns: @@ -155,7 +156,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ return self.device_version_ - def get_serial_number(self): + def get_serial_number(self) -> str: """Retrieve the serial number of the device. Returns: @@ -163,7 +164,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ return self.device_serialno_ - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: """Returns a constant representing the unit displayed by the meter. Returns: @@ -172,7 +173,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ return self.device_glucose_unit_ - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: """Returns the current date and time for the glucometer. Returns: @@ -188,7 +189,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse("\n".join(data)) - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M")) parsed_data = "".join(data) @@ -197,10 +198,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return self.get_datetime() - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterates over the reading values stored in the glucometer. Args: diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py index 3009c7b..d6c6975 100644 --- a/glucometerutils/drivers/fsprecisionneo.py +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -21,8 +21,9 @@ https://protocols.glucometers.tech/abbott/freestyle-precision-neo """ -import collections +import dataclasses import datetime +from typing import Generator, NoReturn, Optional, Sequence, Type from glucometerutils import common from glucometerutils.support import freestyle @@ -31,35 +32,40 @@ from glucometerutils.support import freestyle _TYPE_GLUCOSE_READING = "7" _TYPE_KETONE_READING = "9" -_NeoReading = collections.namedtuple( - "_NeoReading", - ( - "type", # 7 = blood glucose, 9 = blood ketone - "id", - "month", - "day", - "year", # year is two-digits - "hour", - "minute", - "unknown2", - "value", - # Extra trailing and so-far-unused fields; so discard them: - # * for blood glucose: 10 unknown trailing fields - # 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', - # 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', - # * for blood ketone: 2 unknown trailing fields - # 'unknown3', 'unknown4', - ), -) + +@dataclasses.dataclass +class _NeoReading: + type: int # 7 = blood glucose, 9 = blood ketone + id: int + month: int + day: int + year: int # year is two-digits + hour: int + minute: int + unknown: int + value: float + # Extra trailing and so-far-unused fields; so discard them: + # * for blood glucose: 10 unknown trailing fields + # 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', + # 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', + # * for blood ketone: 2 unknown trailing fields + # 'unknown3', 'unknown4', + + def __init__(self, record: Sequence[str]) -> None: + for idx, field in enumerate(dataclasses.fields(self)): + if record[idx] == "HI": + setattr(self, field.name, float("inf")) + else: + setattr(self, field.name, int(record[idx])) class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Precision Neo devices.""" - def __init__(self, device_path): + def __init__(self, device_path: Optional[str]): super().__init__(0x3850, device_path) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" return common.MeterInfo( "FreeStyle Precision Neo", @@ -69,14 +75,14 @@ class Device(freestyle.FreeStyleHidDevice): patient_name=self.get_patient_name(), ) - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use """Returns the glucose unit of the device.""" return common.Unit.MG_DL - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterate through the reading records in the device.""" for record in self._session.query_multirecord(b"$result?"): - cls = None + cls: Optional[Type[common.AnyReading]] = None if record and record[0] == _TYPE_GLUCOSE_READING: cls = common.GlucoseReading elif record and record[0] == _TYPE_KETONE_READING: @@ -84,14 +90,9 @@ class Device(freestyle.FreeStyleHidDevice): else: continue - # Build a _reading object by parsing each of the entries in the raw + # Build a _NeoReading object by parsing each of the entries in the raw # record - values = [] - for value in record: - if value == "HI": - value = float("inf") - values.append(int(value)) - raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)]) + raw_reading = _NeoReading(record) timestamp = datetime.datetime( raw_reading.year + 2000, @@ -108,5 +109,5 @@ class Device(freestyle.FreeStyleHidDevice): yield cls(timestamp, value) - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py index 5e90b87..c1af4a0 100644 --- a/glucometerutils/drivers/otultra2.py +++ b/glucometerutils/drivers/otultra2.py @@ -15,6 +15,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import datetime import re +from typing import Generator from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, serial @@ -51,7 +52,7 @@ _DUMP_LINE_RE = re.compile( _RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$") -def _calculate_checksum(bytestring): +def _calculate_checksum(bytestring: bytes) -> int: """Calculate the checksum used by OneTouch Ultra and Ultra2 devices Args: @@ -71,7 +72,7 @@ def _calculate_checksum(bytestring): return checksum -def _validate_and_strip_checksum(line): +def _validate_and_strip_checksum(line: str) -> str: """Verify the simple 16-bit checksum and remove it from the line. Args: @@ -104,7 +105,7 @@ _DATETIME_RE = re.compile( ) -def _parse_datetime(response): +def _parse_datetime(response: str) -> datetime.datetime: """Convert a response with date and time from the meter into a datetime. Args: @@ -132,23 +133,23 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. - def connect(self): # pylint: disable=no-self-use + def connect(self) -> None: # pylint: disable=no-self-use return - def disconnect(self): # pylint: disable=no-self-use + def disconnect(self) -> None: # pylint: disable=no-self-use return - def _send_command(self, cmd): + def _send_command(self, cmd: str) -> None: """Send command interface. Args: cmd: command and parameters to send (without newline) """ - cmdstring = bytes("\x11\r" + cmd + "\r", "ascii") + cmdstring = bytes(f"\x11\r{cmd}\r", "ascii") self.serial_.write(cmdstring) self.serial_.flush() - def _send_oneliner_command(self, cmd): + def _send_oneliner_command(self, cmd: str) -> str: """Send command and read a one-line response. Args: @@ -163,7 +164,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): line = self.serial_.readline().decode("ascii") return _validate_and_strip_checksum(line) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Fetch and parses the device information. Returns: @@ -176,7 +177,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: """Returns an identifier of the firmware version of the glucometer. Returns: @@ -192,7 +193,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): _SERIAL_NUMBER_RE = re.compile('^@ "([A-Z0-9]{9})"$') - def get_serial_number(self): + def get_serial_number(self) -> str: """Retrieve the serial number of the device. Returns: @@ -219,7 +220,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: """Returns the current date and time for the glucometer. Returns: @@ -228,13 +229,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): response = self._send_oneliner_command("DMF") return _parse_datetime(response[2:]) - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_oneliner_command( "DMT" + date.strftime("%m/%d/%y %H:%M:%S") ) return _parse_datetime(response[2:]) - def zero_log(self): + def zero_log(self) -> None: """Zeros out the data log of the device. This function will clear the memory of the device deleting all the @@ -246,7 +247,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): _GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"') - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: """Returns a constant representing the unit displayed by the meter. Returns: @@ -264,6 +265,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): response = self._send_oneliner_command("DMSU?") match = self._GLUCOSE_UNIT_RE.match(response) + if match is None: + raise exceptions.InvalidGlucoseUnit(response) + unit = match.group(1) if unit == "MG/DL ": @@ -274,7 +278,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidGlucoseUnit(response) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterates over the reading values stored in the glucometer. Args: diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 41e2acd..d5eb3c9 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -18,6 +18,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import binascii import datetime import logging +from typing import Any, Dict, Generator, Optional import construct @@ -92,7 +93,13 @@ _READING_RESPONSE = construct.Struct( ) -def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect): +def _make_packet( + message: bytes, + sequence_number: int, + expect_receive: bool, + acknowledge: bool, + disconnect: bool, +): return _PACKET.build( { "data": { @@ -115,24 +122,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__(device) + def __init__(self, device: Optional[str]) -> None: + super().__init__(device) self.sent_counter_ = False self.expect_receive_ = False self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def connect(self): + def connect(self) -> None: try: self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def disconnect(self): + def disconnect(self) -> None: self.connect() - def _send_packet(self, message, acknowledge=False, disconnect=False): + def _send_packet( + self, message: bytes, acknowledge: bool = False, disconnect: bool = False + ) -> None: pkt = _make_packet( message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect ) @@ -141,7 +150,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.serial_.write(pkt) self.serial_.flush() - def _read_packet(self): + def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) @@ -157,14 +166,19 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return pkt - def _send_ack(self): + def _send_ack(self) -> None: self._send_packet(b"", acknowledge=True, disconnect=False) - def _read_ack(self): + def _read_ack(self) -> None: pkt = self._read_packet() assert pkt.link_control.acknowledge - def _send_request(self, request_format, request_obj, response_format): + def _send_request( + self, + request_format: construct.Struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request, acknowledge=False, disconnect=False) @@ -182,7 +196,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), @@ -190,26 +204,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version - def get_serial_number(self): + def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE ) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "write", "timestamp": date}, @@ -217,17 +231,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): ) return response.timestamp - def zero_log(self): + def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": _INVALID_RECORD}, @@ -235,14 +249,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) return common.GlucoseReading(response.timestamp, float(response.value)) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index 3882d70..4e9138a 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -23,7 +23,9 @@ https://protocols.glucometers.tech/lifescan/onetouch-verio-2015 """ import binascii +import datetime import logging +from typing import Any, Dict, Generator, Optional import construct from pyscsi.pyscsi.scsi import SCSI @@ -105,19 +107,21 @@ _READ_RECORD_RESPONSE = construct.Struct( class Device(driver_base.GlucometerDriver): - def __init__(self, device): + def __init__(self, device: Optional[str]) -> None: if not device: raise exceptions.CommandLineError( "--device parameter is required, should point to the disk " "device representing the meter." ) + super().__init__(device) + self.device_name_ = device self.scsi_device_ = SCSIDevice(device, readwrite=True) self.scsi_ = SCSI(self.scsi_device_) self.scsi_.blocksize = _REGISTER_SIZE - def connect(self): + def connect(self) -> None: inq = self.scsi_.inquiry() logging.debug("Device connected: %r", inq.result) vendor = inq.result["t10_vendor_identification"][:32] @@ -126,14 +130,20 @@ class Device(driver_base.GlucometerDriver): f"Device {self.device_name_} is not a LifeScan glucometer." ) - def disconnect(self): # pylint: disable=no-self-use + def disconnect(self) -> None: # pylint: disable=no-self-use return - def _send_request(self, lba, request_format, request_obj, response_format): + def _send_request( + self, + lba: int, + request_format: construct.Struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: """Send a request to the meter, and read its response. Args: - lba: (int) the address of the block register to use, known + lba: the address of the block register to use, known valid addresses are 3, 4 and 5. request_format: a construct format identifier of the request to send request_obj: the object to format with the provided identifier @@ -168,14 +178,14 @@ class Device(driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def _query_string(self, selector): + def _query_string(self, selector: str) -> str: response = self._send_request( 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE ) return response.value - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: model = self._query_string("model") return common.MeterInfo( f"OneTouch {model} glucometer", @@ -184,39 +194,39 @@ class Device(driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_serial_number(self): + def get_serial_number(self) -> str: return self._query_string("serial") - def get_version(self): + def get_version(self) -> str: return self._query_string("software") - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() - def zero_log(self): + def zero_log(self) -> None: self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE ) @@ -224,7 +234,7 @@ class Device(driver_base.GlucometerDriver): response.timestamp, float(response.value), meal=response.meal ) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py index 69f88de..b4a4a6c 100644 --- a/glucometerutils/drivers/otverioiq.py +++ b/glucometerutils/drivers/otverioiq.py @@ -16,7 +16,9 @@ auto-detected. """ import binascii +import datetime import logging +from typing import Any, Dict, Generator, Optional import construct @@ -101,18 +103,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__(device) + def __init__(self, device: Optional[str]) -> None: + super().__init__(device) self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def _send_packet(self, message): + def _send_packet(self, message: bytes) -> None: pkt = _PACKET.build({"data": {"value": {"message": message}}}) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() - def _read_packet(self): + def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) @@ -121,7 +123,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return pkt - def _send_request(self, request_format, request_obj, response_format): + def _send_request( + self, + request_format: construct.struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request) @@ -132,7 +139,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Verio IQ glucometer", serial_number=self.get_serial_number(), @@ -140,47 +147,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version - def get_serial_number(self): + def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() - def zero_log(self): + def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> Optional[common.GlucoseReading]: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) @@ -193,7 +200,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): response.timestamp, float(response.value), meal=response.meal ) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): reading = self._get_reading(record_id) diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index 08a3afc..5d1d42c 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -21,6 +21,7 @@ import enum import functools import logging import operator +from typing import Generator, NoReturn import construct @@ -28,7 +29,7 @@ from glucometerutils import common, exceptions from glucometerutils.support import driver_base, serial -def xor_checksum(msg): +def xor_checksum(msg: bytes) -> int: return functools.reduce(operator.xor, msg) @@ -87,12 +88,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. - def read_message(self): + def read_message(self) -> bytes: pkt = _PACKET.parse_stream(self.serial_) logging.debug("received packet: %r", pkt) return pkt.message - def wait_and_ready(self): + def wait_and_ready(self) -> int: challenge = b"\0" while challenge == b"\0": @@ -126,37 +127,37 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return first_message.count - def send_message(self, message): + def send_message(self, message: bytes) -> None: pkt = _PACKET.build({"message": message, "direction": Direction.Out}) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) - def connect(self): # pylint: disable=no-self-use + def connect(self) -> None: # pylint: disable=no-self-use print("Please connect and turn on the device.") - def disconnect(self): + def disconnect(self) -> None: self.send_message(_DISCONNECT_MESSAGE) response = self.read_message() if response != _DISCONNECTED_MESSAGE: - raise exceptions.InvalidResponse(response=response) + raise exceptions.InvalidResponse(response=repr(response)) - def get_meter_info(self): # pylint: disable=no-self-use + def get_meter_info(self) -> common.MeterInfo: # pylint: disable=no-self-use return common.MeterInfo("SD CodeFree glucometer") - def get_version(self): # pylint: disable=no-self-use + def get_version(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_serial_number(self): # pylint: disable=no-self-use + def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use # Device does not provide information on glucose unit. return common.Unit.MG_DL - def get_datetime(self): # pylint: disable=no-self-use + def get_datetime(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii") # Ignore the readings count. @@ -165,17 +166,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.send_message(setdatecmd) response = self.read_message() if response != _DATE_SET_MESSAGE: - raise exceptions.InvalidResponse(response=response) + raise exceptions.InvalidResponse(response=repr(response)) # The date we return should only include up to minute, unfortunately. return datetime.datetime( date.year, date.month, date.day, date.hour, date.minute ) - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: count = self.wait_and_ready() for _ in range(count): diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py index 31901db..be8a180 100644 --- a/glucometerutils/drivers/td4277.py +++ b/glucometerutils/drivers/td4277.py @@ -18,6 +18,7 @@ import enum import functools import logging import operator +from typing import Generator, NoReturn, Optional, Tuple import construct @@ -49,7 +50,7 @@ _PACKET = construct.Struct( / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data), ) -_EMPTY_MESSAGE = 0 +_EMPTY_MESSAGE = b"\x00\x00\x00\x00" _CONNECT_REQUEST = 0x22 _VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54} @@ -100,7 +101,9 @@ _READING_VALUE_STRUCT = construct.Struct( ) -def _make_packet(command, message, direction=Direction.Out): +def _make_packet( + command: int, message: bytes, direction: Direction = Direction.Out +) -> bytes: return _PACKET.build( { "data": { @@ -114,7 +117,7 @@ def _make_packet(command, message, direction=Direction.Out): ) -def _parse_datetime(message): +def _parse_datetime(message: bytes) -> datetime.datetime: date = _DATETIME_STRUCT.parse(message) # We can't parse the day properly with a single pass of Construct # unfortunately. @@ -124,7 +127,7 @@ def _parse_datetime(message): ) -def _select_record(record_id): +def _select_record(record_id: int) -> bytes: return _READING_SELECTION_STRUCT.build({"record_id": record_id}) @@ -133,11 +136,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__("cp2110://" + device) + def __init__(self, device: Optional[str]): + super().__init__(f"cp2110://{device}") self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True): + def _send_command( + self, + command: int, + message: bytes = _EMPTY_MESSAGE, + validate_response: bool = True, + ) -> Tuple[int, bytes]: pkt = _make_packet(command, message) logging.debug("sending packet: %s", binascii.hexlify(pkt)) @@ -151,7 +159,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return response.data.value.command, response.data.value.message - def connect(self): + def connect(self) -> None: response_command, message = self._send_command( _CONNECT_REQUEST, validate_response=False ) @@ -168,24 +176,24 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): f"Invalid model identified: {model_message!r}" ) - def disconnect(self): + def disconnect(self) -> None: pass - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo("TaiDoc TD-4277 glucometer") - def get_version(self): # pylint: disable=no-self-use + def get_version(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_serial_number(self): # pylint: disable=no-self-use + def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: _, message = self._send_command(_GET_DATETIME) return _parse_datetime(message) - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: assert date.year >= 2000 day_struct = _DAY_BITSTRUCT.build( @@ -202,12 +210,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return _parse_datetime(message) - def _get_reading_count(self): + def _get_reading_count(self) -> int: _, message = self._send_command(_GET_READING_COUNT) return _READING_COUNT_STRUCT.parse(message).count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: _, reading_date_message = self._send_command( _GET_READING_DATETIME, _select_record(record_id) ) @@ -222,14 +230,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): reading_date, reading_value.value, meal=reading_value.meal ) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) - def zero_log(self): + def zero_log(self) -> None: self._send_command(_CLEAR_MEMORY) - def get_glucose_unit(self): + def get_glucose_unit(self) -> NoReturn: """Maybe this could be implemented by someone who knows the device""" raise NotImplementedError diff --git a/glucometerutils/exceptions.py b/glucometerutils/exceptions.py index 1f72308..d56dbe6 100644 --- a/glucometerutils/exceptions.py +++ b/glucometerutils/exceptions.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: MIT """Common exceptions for glucometerutils.""" +from typing import Any, Optional + class Error(Exception): """Base class for the errors.""" @@ -15,42 +17,43 @@ class CommandLineError(Error): class ConnectionFailed(Error): """It was not possible to connect to the meter.""" - def __init__(self, message="Unable to connect to the meter."): - super(ConnectionFailed, self).__init__(message) + def __init__(self, message: str = "Unable to connect to the meter.") -> None: + super().__init__(message) class CommandError(Error): """It was not possible to send a command to the device.""" - def __init__(self, message="Unable to send command to device."): - super(CommandError, self).__init__(message) + def __init__(self, message: str = "Unable to send command to device.") -> None: + super().__init__(message) class InvalidResponse(Error): """The response received from the meter was not understood""" - def __init__(self, response): - super(InvalidResponse, self).__init__(f"Invalid response received:\n{response}") + def __init__(self, response: str) -> None: + super().__init__(f"Invalid response received:\n{response}") class InvalidChecksum(InvalidResponse): - def __init__(self, wire, calculated): - super(InvalidChecksum, self).__init__( - f"Response checksum not matching: {wire:08x} (wire) != {calculated:08x} (calculated)" - ) + def __init__(self, wire: int, calculated: Optional[int]) -> None: + if calculated is not None: + message = f"Response checksum not matching: {wire:08x} (wire) != {calculated:08x} (calculated)" + else: + message = f"Unable to calculate checksum. Expected {wire:08x}." + + super().__init__(message) class InvalidGlucoseUnit(Error): """Unable to parse the given glucose unit""" - def __init__(self, unit): - super(InvalidGlucoseUnit, self).__init__( - f"Invalid glucose unit received:\n{unit}" - ) + def __init__(self, unit: Any) -> None: + super().__init__(f"Invalid glucose unit received:\n{unit}") class InvalidDateTime(Error): """The device has an invalid date/time setting.""" - def __init__(self): - super(InvalidDateTime, self).__init__("Invalid date and time for device") + def __init__(self) -> None: + super().__init__("Invalid date and time for device") diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py index b44ee84..4ed8a7a 100644 --- a/glucometerutils/support/construct_extras.py +++ b/glucometerutils/support/construct_extras.py @@ -18,15 +18,15 @@ class Timestamp(construct.Adapter): __slots__ = ["epoch"] - def __init__(self, subcon, epoch=0): + def __init__(self, subcon, epoch: int = 0) -> None: super(Timestamp, self).__init__(subcon) self.epoch = epoch - def _encode(self, obj, context, path): + def _encode(self, obj: datetime.datetime, context, path) -> int: assert isinstance(obj, datetime.datetime) epoch_date = datetime.datetime.utcfromtimestamp(self.epoch) delta = obj - epoch_date return int(delta.total_seconds()) - def _decode(self, obj, context, path): + def _decode(self, obj: int, context, path) -> datetime.datetime: return datetime.datetime.utcfromtimestamp(obj + self.epoch) diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py index d3e45b7..2bd4264 100644 --- a/glucometerutils/support/driver_base.py +++ b/glucometerutils/support/driver_base.py @@ -1,38 +1,45 @@ -from abc import ABC, abstractmethod -from datetime import datetime -from typing import Optional, Text +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +import abc +import datetime +from typing import Generator, Optional, Text -class GlucometerDriver(ABC): - def __init__(self, device_path): - # type: (Optional[Text]) -> None +from glucometerutils import common + + +class GlucometerDriver(abc.ABC): + def __init__(self, device_path: Optional[Text]) -> None: pass - def connect(self): + def connect(self) -> None: pass - def disconnect(self): + def disconnect(self) -> None: pass - @abstractmethod - def get_meter_info(self): + @abc.abstractmethod + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" pass - @abstractmethod - def get_serial_number(self): + @abc.abstractmethod + def get_serial_number(self) -> str: pass - @abstractmethod - def get_glucose_unit(self): + @abc.abstractmethod + def get_glucose_unit(self) -> common.Unit: """Returns the glucose unit of the device.""" pass - @abstractmethod - def get_datetime(self): + @abc.abstractmethod + def get_datetime(self) -> datetime.datetime: pass - def set_datetime(self, date=None): + def set_datetime( + self, date: Optional[datetime.datetime] = None + ) -> datetime.datetime: """Sets the date and time of the glucometer. Args: @@ -43,17 +50,17 @@ class GlucometerDriver(ABC): A datetime object built according to the returned response. """ if not date: - date = datetime.now() + date = datetime.datetime.now() return self._set_device_datetime(date) - @abstractmethod - def _set_device_datetime(self, date): + @abc.abstractmethod + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: pass - @abstractmethod - def zero_log(self): + @abc.abstractmethod + def zero_log(self) -> None: pass - @abstractmethod - def get_readings(self): + @abc.abstractmethod + def get_readings(self) -> Generator[common.AnyReading, None, None]: pass diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index c77282b..245fde7 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -12,7 +12,7 @@ import csv import datetime import logging import re -from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple +from typing import AnyStr, Callable, Iterator, List, Optional, Tuple import construct @@ -45,9 +45,10 @@ _ALWAYS_UNENCRYPTED_MESSAGES = ( ) -def _create_matcher(message_type, content): - # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool] - def _matcher(message): +def _create_matcher( + message_type: int, content: Optional[bytes] +) -> Callable[[Tuple[int, bytes]], bool]: + def _matcher(message: Tuple[int, bytes]) -> bool: return message[0] == message_type and (content is None or content == message[1]) return _matcher @@ -91,8 +92,7 @@ _MULTIRECORDS_FORMAT = re.compile( ) -def _verify_checksum(message, expected_checksum_hex): - # type: (AnyStr, AnyStr) -> None +def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None: """Calculate the simple checksum of the message and compare with expected. Args: @@ -116,7 +116,7 @@ def _verify_checksum(message, expected_checksum_hex): raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) -def convert_ketone_unit(raw_value): +def convert_ketone_unit(raw_value: float) -> float: """Convert raw ketone value as read in the device to its value in mmol/L. As per https://protocols.glucometers.tech/abbott/freestyle-libre this is @@ -132,9 +132,12 @@ ABBOTT_VENDOR_ID = 0x1A61 class FreeStyleHidSession: def __init__( - self, product_id, device_path, text_message_type, text_reply_message_type - ): - # type: (int, Optional[Text], int, int) -> None + self, + product_id: int, + device_path: Optional[str], + text_message_type: int, + text_reply_message_type: int, + ) -> None: self._hid_session = hiddevice.HidSession( (ABBOTT_VENDOR_ID, product_id), device_path @@ -151,13 +154,12 @@ class FreeStyleHidSession: f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}" ) - def send_command(self, message_type, command, encrypted=False): - # type: (int, bytes, bool) -> None + def send_command(self, message_type: int, command: bytes, encrypted: bool = False): """Send a raw command to the device. Args: - message_type: (int) The first byte sent with the report to the device. - command: (bytes) The command to send out the device. + message_type: The first byte sent with the report to the device. + command: The command to send out the device. """ if encrypted: assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES @@ -172,8 +174,7 @@ class FreeStyleHidSession: logging.debug("Sending packet: %r", usb_packet) self._hid_session.write(usb_packet) - def read_response(self, encrypted=False): - # type: (bool) -> Tuple[int, bytes] + def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]: """Read the response from the device and extracts it.""" usb_packet = self._hid_session.read() @@ -211,8 +212,7 @@ class FreeStyleHidSession: return message - def send_text_command(self, command): - # type: (bytes) -> Text + def send_text_command(self, command: bytes) -> str: """Send a command to the device that expects a text reply.""" self.send_command(self._text_message_type, command) @@ -237,13 +237,13 @@ class FreeStyleHidSession: match = _TEXT_REPLY_FORMAT.search(full_content) if not match: - raise exceptions.InvalidResponse(full_content) + raise exceptions.InvalidResponse(repr(full_content)) message = match.group("message") _verify_checksum(message, match.group("checksum")) if match.group("status") != b"OK": - raise exceptions.InvalidResponse(message or "Command failed") + raise exceptions.InvalidResponse(repr(message) or "Command failed") # If there is anything in the response that is not ASCII-safe, this is # probably in the patient name. The Windows utility does not seem to @@ -251,8 +251,7 @@ class FreeStyleHidSession: # unknown codepoint. return message.decode("ascii", "replace") - def query_multirecord(self, command): - # type: (bytes) -> Iterator[List[Text]] + def query_multirecord(self, command: bytes) -> Iterator[List[str]]: """Queries for, and returns, "multirecords" results. Multirecords are used for querying events, readings, history and similar @@ -298,43 +297,44 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver): though. """ - def __init__(self, product_id, device_path, text_cmd=0x60, text_reply_cmd=0x60): - # type: (int, Optional[Text], int, int) -> None + def __init__( + self, + product_id: int, + device_path: Optional[str], + text_cmd: int = 0x60, + text_reply_cmd: int = 0x60, + ) -> None: super().__init__(device_path) self._session = FreeStyleHidSession( product_id, device_path, text_cmd, text_reply_cmd ) - def connect(self): + def connect(self) -> None: """Open connection to the device, starting the knocking sequence.""" self._session.connect() - def disconnect(self): + def disconnect(self) -> None: """Disconnect the device, nothing to be done.""" pass # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change # between them. - def _get_version(self): - # type: () -> Text + def _get_version(self) -> str: """Return the software version of the device.""" return self._session.send_text_command(b"$swver?").rstrip("\r\n") - def get_serial_number(self): - # type: () -> Text + def get_serial_number(self) -> str: """Returns the serial number of the device.""" return self._session.send_text_command(b"$serlnum?").rstrip("\r\n") - def get_patient_name(self): - # type: () -> Optional[Text] + def get_patient_name(self) -> Optional[str]: patient_name = self._session.send_text_command(b"$ptname?").rstrip("\r\n") if not patient_name: return None return patient_name - def set_patient_name(self, name): - # type: (Text) -> None + def set_patient_name(self, name: str) -> None: try: encoded_name = name.encode("ascii") except UnicodeDecodeError: @@ -342,8 +342,7 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver): self._session.send_text_command(b"$ptname," + encoded_name) - def get_datetime(self): - # type: () -> datetime.datetime + def get_datetime(self) -> datetime.datetime: """Gets the date and time as reported by the device. This is one of the few commands that appear common to many of the @@ -364,9 +363,7 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver): except ValueError: raise exceptions.InvalidDateTime() - def _set_device_datetime(self, date): - # type: (datetime.datetime) -> datetime.datetime - + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: # The format used by the FreeStyle devices is not composable based on # standard strftime() (namely it includes no leading zeros), so we need # to build it manually. diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py index 41ad17c..124b2e6 100644 --- a/glucometerutils/support/hiddevice.py +++ b/glucometerutils/support/hiddevice.py @@ -6,7 +6,7 @@ import logging import os -from typing import BinaryIO, Optional, Text, Tuple +from typing import BinaryIO, Optional, Tuple from glucometerutils import exceptions @@ -18,8 +18,12 @@ class HidSession: methods abstracting the HID library. """ - def __init__(self, usb_id, device, timeout_ms=0): - # type: (Optional[Tuple[int, int]], Optional[Text], int) -> None + def __init__( + self, + usb_id: Optional[Tuple[int, int]], + device: Optional[str], + timeout_ms: int = 0, + ) -> None: """Construct a new session object. Args: @@ -66,8 +70,7 @@ class HidSession: message=f"Unable to connect to meter: {e}." ) - def write(self, report): - # type: (bytes) -> None + def write(self, report: bytes) -> None: """Writes a report to the HID handle.""" if self.handle_: @@ -78,8 +81,7 @@ class HidSession: if written < 0: raise exceptions.CommandError() - def read(self, size=64): - # type: (int) -> bytes + def read(self, size: int = 64) -> bytes: """Read a report from the HID handle. This is important as it handles the one incompatible interface between diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py index 1c329c6..20869f9 100644 --- a/glucometerutils/support/lifescan.py +++ b/glucometerutils/support/lifescan.py @@ -9,7 +9,7 @@ from glucometerutils import exceptions class MissingChecksum(exceptions.InvalidResponse): """The response misses the expected 4-digits checksum.""" - def __init__(self, response): + def __init__(self, response: str): super(MissingChecksum, self).__init__( f"Response is missing checksum: {response}" ) @@ -18,19 +18,18 @@ class MissingChecksum(exceptions.InvalidResponse): class InvalidSerialNumber(exceptions.Error): """The serial number is not as expected.""" - def __init__(self, serial_number): + def __init__(self, serial_number: str): super(InvalidSerialNumber, self).__init__( f"Serial number {serial_number} is invalid." ) class MalformedCommand(exceptions.InvalidResponse): - def __init__(self, message): + def __init__(self, message: str): super(MalformedCommand, self).__init__(f"Malformed command: {message}") -def crc_ccitt(data): - # type: (bytes) -> int +def crc_ccitt(data: bytes) -> int: """Calculate the CRC-16-CCITT with LifeScan's common seed. Args: diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py index 441226e..1cef4d9 100644 --- a/glucometerutils/support/lifescan_binary_protocol.py +++ b/glucometerutils/support/lifescan_binary_protocol.py @@ -24,8 +24,9 @@ _LINK_CONTROL = construct.BitStruct( ) -def LifeScanPacket(include_link_control): # pylint: disable=invalid-name - # type: (bool) -> construct.Struct +def LifeScanPacket( + include_link_control: bool, +) -> construct.Struct: # pylint: disable=invalid-name if include_link_control: link_control_construct = _LINK_CONTROL else: diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py index d9e80ea..6a2d142 100644 --- a/glucometerutils/support/serial.py +++ b/glucometerutils/support/serial.py @@ -5,9 +5,10 @@ """ import logging -from typing import Optional, Text +from typing import Optional import serial + from glucometerutils import exceptions @@ -37,13 +38,12 @@ class SerialDevice: """ - BAUDRATE = None # type: int - DEFAULT_CABLE_ID = None # type: Text + BAUDRATE: Optional[int] = None + DEFAULT_CABLE_ID: Optional[str] = None - TIMEOUT = 1 # type: float + TIMEOUT: float = 1 - def __init__(self, device): - # type: (Optional[Text]) -> None + def __init__(self, device: Optional[str]) -> None: assert self.BAUDRATE is not None if not device and self.DEFAULT_CABLE_ID: diff --git a/glucometerutils/tests/test_common.py b/glucometerutils/tests/test_common.py index 3e733e3..2a3a23d 100644 --- a/glucometerutils/tests/test_common.py +++ b/glucometerutils/tests/test_common.py @@ -6,10 +6,13 @@ # pylint: disable=protected-access,missing-docstring import datetime +import unittest from absl.testing import parameterized from glucometerutils import common +TEST_DATETIME = datetime.datetime(2018, 1, 1, 0, 30, 45) + class TestGlucoseConversion(parameterized.TestCase): def test_convert_to_mmol(self): @@ -43,11 +46,8 @@ class TestGlucoseConversion(parameterized.TestCase): class TestGlucoseReading(parameterized.TestCase): - - TEST_DATETIME = datetime.datetime(2018, 1, 1, 0, 30, 45) - def test_minimal(self): - reading = common.GlucoseReading(self.TEST_DATETIME, 100) + reading = common.GlucoseReading(TEST_DATETIME, 100) self.assertEqual( reading.as_csv(common.Unit.MG_DL), '"2018-01-01 00:30:45","100.00","","blood sample",""', @@ -57,7 +57,7 @@ class TestGlucoseReading(parameterized.TestCase): ("_mgdl", common.Unit.MG_DL, 100), ("_mmoll", common.Unit.MMOL_L, 5.56) ) def test_value(self, unit, expected_value): - reading = common.GlucoseReading(self.TEST_DATETIME, 100) + reading = common.GlucoseReading(TEST_DATETIME, 100) self.assertAlmostEqual(reading.get_value_as(unit), expected_value, places=2) @parameterized.named_parameters( @@ -98,10 +98,36 @@ class TestGlucoseReading(parameterized.TestCase): ), ) def test_csv(self, kwargs_dict, expected_csv): - reading = common.GlucoseReading(self.TEST_DATETIME, 100, **kwargs_dict) + reading = common.GlucoseReading(TEST_DATETIME, 100, **kwargs_dict) self.assertEqual(reading.as_csv(common.Unit.MG_DL), expected_csv) +class TestKetoneReading(unittest.TestCase): + def test_measure_method(self): + """Raise an exception if an invalid measurement method is provided. + + We allow measure_method as a parameter for compatibility with the other + Readings, but we don't want anything _but_ the BLOOD_SAMPLE method. + """ + with self.subTest("No measure_method parameter."): + self.assertIsNotNone(common.KetoneReading(TEST_DATETIME, 100)) + + with self.subTest("measure_method=MeasurementMethod.BLOOD_SAMPLE is valid"): + self.assertIsNotNone( + common.KetoneReading( + TEST_DATETIME, + 100, + measure_method=common.MeasurementMethod.BLOOD_SAMPLE, + ) + ) + + with self.subTest("measure_method=MeasurementMethod.TIME raises ValueError"): + with self.assertRaises(ValueError): + common.KetoneReading( + TEST_DATETIME, 100, measure_method=common.MeasurementMethod.TIME + ) + + class TestMeterInfo(parameterized.TestCase): @parameterized.named_parameters( ("_no_serial_number", {}, "Serial Number: N/A\n"), diff --git a/pyproject.toml b/pyproject.toml index 3ab8a28..4e2ae7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,6 @@ multi_line_output = 3 include_trailing_comma = true known_first_party = ['glucometerutils'] -known_third_party = ['construct', 'hidapi', 'pyserial', 'pyscsi'] +known_third_party = ['construct', 'hidapi', 'pyscsi', 'serial'] [tool.setuptools_scm] -- cgit v1.2.3