diff options
35 files changed, 1297 insertions, 1162 deletions
diff --git a/glucometerutils/common.py b/glucometerutils/common.py index e7be7ec..6576ebd 100644 --- a/glucometerutils/common.py +++ b/glucometerutils/common.py @@ -10,21 +10,24 @@ from typing import Optional, Sequence import attr + class Unit(enum.Enum): - MG_DL = 'mg/dL' - MMOL_L = 'mmol/L' + MG_DL = "mg/dL" + MMOL_L = "mmol/L" + # Constants for meal information class Meal(enum.Enum): - NONE = '' - BEFORE = 'Before Meal' - AFTER = 'After Meal' + NONE = "" + BEFORE = "Before Meal" + AFTER = "After Meal" + # Constants for measure method class MeasurementMethod(enum.Enum): - BLOOD_SAMPLE = 'blood sample' - CGM = 'CGM' # Continuous Glucose Monitoring - TIME = 'time' + BLOOD_SAMPLE = "blood sample" + CGM = "CGM" # Continuous Glucose Monitoring + TIME = "time" def convert_glucose_unit(value, from_unit, to_unit): @@ -50,19 +53,19 @@ def convert_glucose_unit(value, from_unit, to_unit): return round(value * 18.0, 0) + @attr.s class GlucoseReading: timestamp = attr.ib(type=datetime.datetime) value = attr.ib(type=float) - meal = attr.ib( - default=Meal.NONE, validator=attr.validators.in_(Meal), - type=Meal) - comment = attr.ib(default='', type=str) + meal = attr.ib(default=Meal.NONE, validator=attr.validators.in_(Meal), type=Meal) + comment = attr.ib(default="", type=str) measure_method = attr.ib( default=MeasurementMethod.BLOOD_SAMPLE, validator=attr.validators.in_(MeasurementMethod), - type=MeasurementMethod) + type=MeasurementMethod, + ) extra_data = attr.ib(factory=dict) def get_value_as(self, to_unit): @@ -78,15 +81,20 @@ class GlucoseReading: # type: (Unit) -> str """Returns the reading as a formatted comma-separated value string.""" return '"%s","%.2f","%s","%s","%s"' % ( - self.timestamp, self.get_value_as(unit), self.meal.value, - self.measure_method.value, self.comment) + self.timestamp, + self.get_value_as(unit), + self.meal.value, + self.measure_method.value, + self.comment, + ) + @attr.s class KetoneReading: timestamp = attr.ib(type=datetime.datetime) value = attr.ib(type=float) - comment = attr.ib(default='', type=str) + comment = attr.ib(default="", type=str) extra_data = attr.ib(factory=dict) def as_csv(self, unit): @@ -94,23 +102,28 @@ class KetoneReading: del unit # Unused for Ketone readings. return '"%s","%.2f","%s","%s"' % ( - self.timestamp, self.value, MeasurementMethod.BLOOD_SAMPLE.value, - self.comment) + self.timestamp, + self.value, + MeasurementMethod.BLOOD_SAMPLE.value, + self.comment, + ) + @attr.s class TimeAdjustment: timestamp = attr.ib() # type: datetime.datetime old_timestamp = attr.ib() # type: datetime.datetime measure_method = attr.ib( - default=MeasurementMethod.TIME, - validator=attr.validators.in_( - MeasurementMethod)) # type: MeasurementMethod + default=MeasurementMethod.TIME, validator=attr.validators.in_(MeasurementMethod) + ) # type: MeasurementMethod extra_data = attr.ib(factory=dict) def as_csv(self, unit): del unit return '"%s","","%s","%s"' % ( - self.timestamp, self.measure_method.value, self.old_timestamp + self.timestamp, + self.measure_method.value, + self.old_timestamp, ) @@ -126,32 +139,38 @@ class MeterInfo: the device. It can include hardware and software version. native_unit: One of the Unit values to identify the meter native unit. """ + model = attr.ib(type=str) - serial_number = attr.ib(default='N/A', type=str) + serial_number = attr.ib(default="N/A", type=str) version_info = attr.ib(default=(), type=Sequence[str]) native_unit = attr.ib( - default=Unit.MG_DL, validator=attr.validators.in_(Unit), - type=Unit) + default=Unit.MG_DL, validator=attr.validators.in_(Unit), type=Unit + ) patient_name = attr.ib(default=None, type=Optional[str]) def __str__(self): - version_information_string = 'N/A' + version_information_string = "N/A" if self.version_info: - version_information_string = '\n '.join( - self.version_info).strip() + version_information_string = "\n ".join(self.version_info).strip() - base_output = textwrap.dedent("""\ + base_output = textwrap.dedent( + """\ {model} Serial Number: {serial_number} Version Information: {version_information_string} Native Unit: {native_unit} - """).format(model=self.model, serial_number=self.serial_number, - version_information_string=version_information_string, - native_unit=self.native_unit.value) + """ + ).format( + model=self.model, + serial_number=self.serial_number, + version_information_string=version_information_string, + native_unit=self.native_unit.value, + ) if self.patient_name != None: - base_output += 'Patient Name: {patient_name}\n'.format( - patient_name=self.patient_name) + base_output += "Patient Name: {patient_name}\n".format( + patient_name=self.patient_name + ) return base_output diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py index 06b69bc..c4d7527 100644 --- a/glucometerutils/drivers/accuchek_reports.py +++ b/glucometerutils/drivers/accuchek_reports.py @@ -19,45 +19,46 @@ import datetime import glob import os -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base _UNIT_MAP = { - 'mmol/l': common.Unit.MMOL_L, - 'mg/dl': common.Unit.MG_DL, + "mmol/l": common.Unit.MMOL_L, + "mg/dl": common.Unit.MG_DL, } -_DATE_CSV_KEY = 'Date' -_TIME_CSV_KEY = 'Time' -_RESULT_CSV_KEY = 'Result' -_UNIT_CSV_KEY = 'Unit' -_TEMPWARNING_CSV_KEY = 'Temperature warning' # ignored -_OUTRANGE_CSV_KEY = 'Out of target range' # ignored -_OTHER_CSV_KEY = 'Other' # ignored -_BEFORE_MEAL_CSV_KEY = 'Before meal' -_AFTER_MEAL_CSV_KEY = 'After meal' +_DATE_CSV_KEY = "Date" +_TIME_CSV_KEY = "Time" +_RESULT_CSV_KEY = "Result" +_UNIT_CSV_KEY = "Unit" +_TEMPWARNING_CSV_KEY = "Temperature warning" # ignored +_OUTRANGE_CSV_KEY = "Out of target range" # ignored +_OTHER_CSV_KEY = "Other" # ignored +_BEFORE_MEAL_CSV_KEY = "Before meal" +_AFTER_MEAL_CSV_KEY = "After meal" # Control test has extra whitespace which is not ignored. -_CONTROL_CSV_KEY = 'Control test' + ' '*197 +_CONTROL_CSV_KEY = "Control test" + " " * 197 -_DATE_FORMAT = '%d.%m.%Y' -_TIME_FORMAT = '%H:%M' +_DATE_FORMAT = "%d.%m.%Y" +_TIME_FORMAT = "%H:%M" -_DATETIME_FORMAT = ' '.join((_DATE_FORMAT, _TIME_FORMAT)) +_DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT)) class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device or not os.path.isdir(device): raise exceptions.CommandLineError( - '--device parameter is required, should point to mount path ' - 'for the meter.') + "--device parameter is required, should point to mount path " + "for the meter." + ) - reports_path = os.path.join(device, '*', 'Reports', '*.csv') + reports_path = os.path.join(device, "*", "Reports", "*.csv") report_files = glob.glob(reports_path) if not report_files: raise exceptions.ConnectionFailed( - 'No report file found in path "%s".' % reports_path) + 'No report file found in path "%s".' % reports_path + ) self.report_file = report_files[0] @@ -68,35 +69,32 @@ class Device(driver_base.GlucometerDriver): next(self.report) return csv.DictReader( - self.report, - delimiter=';', - skipinitialspace=True, - quoting=csv.QUOTE_NONE) + self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE + ) def connect(self): - self.report = open( - self.report_file, 'r', newline='\r\n', encoding='utf-8') + self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8") def disconnect(self): self.report.close() def get_meter_info(self): return common.MeterInfo( - '%s glucometer' % self.get_model(), + "%s glucometer" % self.get_model(), serial_number=self.get_serial_number(), - native_unit=self.get_glucose_unit()) + native_unit=self.get_glucose_unit(), + ) def get_model(self): # $device/MODEL/Reports/*.csv - return os.path.basename( - os.path.dirname(os.path.dirname(self.report_file))) + return os.path.basename(os.path.dirname(os.path.dirname(self.report_file))) def get_serial_number(self): self.report.seek(0) # ignore the first line. next(self.report) # The second line of the CSV is serial-no;report-date;report-time;;;;;;; - return next(self.report).split(';')[0] + return next(self.report).split(";")[0] def get_glucose_unit(self): # Get the first record available and parse that. @@ -115,13 +113,12 @@ class Device(driver_base.GlucometerDriver): def _extract_datetime(self, record): # pylint: disable=no-self-use # Date and time are in separate column, but we want to parse them # together. - date_and_time = ' '.join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) + date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT) def _extract_meal(self, record): # pylint: disable=no-self-use if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]: - raise exceptions.InvalidResponse( - 'Reading cannot be before and after meal.') + raise exceptions.InvalidResponse("Reading cannot be before and after meal.") elif record[_AFTER_MEAL_CSV_KEY]: return common.Meal.AFTER elif record[_BEFORE_MEAL_CSV_KEY]: @@ -139,5 +136,7 @@ class Device(driver_base.GlucometerDriver): common.convert_glucose_unit( float(record[_RESULT_CSV_KEY]), _UNIT_MAP[record[_UNIT_CSV_KEY]], - common.Unit.MG_DL), - meal=self._extract_meal(record)) + common.Unit.MG_DL, + ), + meal=self._extract_meal(record), + ) diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 397eb4f..5c9ed11 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -24,43 +24,44 @@ from glucometerutils import common from glucometerutils.support import contourusb, driver_base -def _extract_timestamp(parsed_record, prefix=''): +def _extract_timestamp(parsed_record, prefix=""): """Extract the timestamp from a parsed record. This leverages the fact that all the reading records have the same base structure. """ - datetime_str = parsed_record['datetime'] + datetime_str = parsed_record["datetime"] return datetime.datetime( - int(datetime_str[0:4]), #year - int(datetime_str[4:6]), #month - int(datetime_str[6:8]), #day - int(datetime_str[8:10]), #hour - int(datetime_str[10:12]), #minute - 0) + int(datetime_str[0:4]), # year + int(datetime_str[4:6]), # month + int(datetime_str[6:8]), # day + int(datetime_str[8:10]), # hour + int(datetime_str[10:12]), # minute + 0, + ) class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver): """Glucometer driver for FreeStyle Libre devices.""" - USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int def get_meter_info(self): self._get_info_record() return common.MeterInfo( - 'Contour USB', + "Contour USB", serial_number=self._get_serial_number(), - version_info=( - 'Meter versions: ' + self._get_version(),), - native_unit= self.get_glucose_unit()) + version_info=("Meter versions: " + self._get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_glucose_unit(self): # pylint: disable=no-self-use - if self._get_glucose_unit() == '0': + if self._get_glucose_unit() == "0": return common.Unit.MG_DL else: return common.Unit.MMOL_L - + def get_readings(self): """ Get reading dump from download data mode(all readings stored) @@ -69,10 +70,10 @@ class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver): for parsed_record in self._get_multirecord(): yield common.GlucoseReading( _extract_timestamp(parsed_record), - int(parsed_record['value']), - comment=parsed_record['markers'], - measure_method=common.MeasurementMethod.BLOOD_SAMPLE - ) + int(parsed_record["value"]), + comment=parsed_record["markers"], + measure_method=common.MeasurementMethod.BLOOD_SAMPLE, + ) def get_serial_number(self): raise NotImplementedError diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py index f3cf043..5465b3a 100644 --- a/glucometerutils/drivers/fsinsulinx.py +++ b/glucometerutils/drivers/fsinsulinx.py @@ -22,20 +22,30 @@ import datetime from glucometerutils import common from glucometerutils.support import freestyle - # The type is a string because it precedes the parsing of the object. -_TYPE_GLUCOSE_READING = '0' - -_InsulinxReading = collections.namedtuple('_InsulinxReading', ( - 'type', # 0 = blood glucose - 'id', - 'month', 'day', 'year', # year is two-digits - 'hour', 'minute', - 'unknown1', 'unknown2', 'unknown3', - 'unknown4', 'unknown5', 'unknown6', - 'value', - 'unknown7', 'unknown8', -)) +_TYPE_GLUCOSE_READING = "0" + +_InsulinxReading = collections.namedtuple( + "_InsulinxReading", + ( + "type", # 0 = blood glucose + "id", + "month", + "day", + "year", # year is two-digits + "hour", + "minute", + "unknown1", + "unknown2", + "unknown3", + "unknown4", + "unknown5", + "unknown6", + "value", + "unknown7", + "unknown8", + ), +) class Device(freestyle.FreeStyleHidDevice): @@ -46,11 +56,11 @@ class Device(freestyle.FreeStyleHidDevice): def get_meter_info(self): """Return the device information in structured form.""" return common.MeterInfo( - 'FreeStyle InsuLinx', + "FreeStyle InsuLinx", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self._get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self._get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -58,7 +68,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_readings(self): """Iterate through the reading records in the device.""" - for record in self._get_multirecord(b'$result?'): + for record in self._get_multirecord(b"$result?"): if not record or record[0] != _TYPE_GLUCOSE_READING: continue @@ -67,11 +77,14 @@ class Device(freestyle.FreeStyleHidDevice): raw_reading = _InsulinxReading._make([int(v) for v in record]) timestamp = datetime.datetime( - raw_reading.year + 2000, raw_reading.month, raw_reading.day, - raw_reading.hour, raw_reading.minute) + raw_reading.year + 2000, + raw_reading.month, + raw_reading.day, + raw_reading.hour, + raw_reading.minute, + ) yield common.GlucoseReading(timestamp, raw_reading.value) def zero_log(self): raise NotImplementedError - diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index f1ac525..29e821a 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -27,52 +27,47 @@ from glucometerutils.support import freestyle # Fields of the records returned by both $history and $arresult? # Tuple of pairs of idx and field name _BASE_ENTRY_MAP = ( - (0, 'device_id'), - (1, 'type'), - (2, 'month'), - (3, 'day'), - (4, 'year'), # 2-digits - (5, 'hour'), - (6, 'minute'), - (7, 'second'), + (0, "device_id"), + (1, "type"), + (2, "month"), + (3, "day"), + (4, "year"), # 2-digits + (5, "hour"), + (6, "minute"), + (7, "second"), ) # Fields of the records returned by $history? -_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ( - (13, 'value'), - (15, 'errors'), -) +_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ((13, "value"), (15, "errors"),) # Fields of the results returned by $arresult? where type = 2 _ARRESULT_TYPE2_ENTRY_MAP = ( - (9, 'reading-type'), # 0 = glucose blood strip, - # 1 = ketone blood strip, - # 2 = glucose sensor - (12, 'value'), - (15, 'sport-flag'), - (16, 'medication-flag'), - (17, 'rapid-acting-flag'), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP - (18, 'long-acting-flag'), - (19, 'custom-comments-bitfield'), - (23, 'double-long-acting-insulin'), - (25, 'food-flag'), - (26, 'food-carbs-grams'), - (28, 'errors'), + (9, "reading-type"), # 0 = glucose blood strip, + # 1 = ketone blood strip, + # 2 = glucose sensor + (12, "value"), + (15, "sport-flag"), + (16, "medication-flag"), + (17, "rapid-acting-flag"), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP + (18, "long-acting-flag"), + (19, "custom-comments-bitfield"), + (23, "double-long-acting-insulin"), + (25, "food-flag"), + (26, "food-carbs-grams"), + (28, "errors"), ) _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = ( - (9, 'old_month'), - (10, 'old_day'), - (11, 'old_year'), - (12, 'old_hour'), - (13, 'old_minute'), - (14, 'old_second'), + (9, "old_month"), + (10, "old_day"), + (11, "old_year"), + (12, "old_hour"), + (13, "old_minute"), + (14, "old_second"), ) # Fields only valid when rapid-acting-flag is "1" -_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ( - (43, 'double-rapid-acting-insulin'), -) +_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),) def _parse_record(record, entry_map): @@ -82,26 +77,25 @@ def _parse_record(record, entry_map): return {} try: - return { - key: int(record[idx]) for idx, key in entry_map - } + return {key: int(record[idx]) for idx, key in entry_map} except IndexError: return {} -def _extract_timestamp(parsed_record, prefix=''): +def _extract_timestamp(parsed_record, prefix=""): """Extract the timestamp from a parsed record. This leverages the fact that all the records have the same base structure. """ return datetime.datetime( - parsed_record[prefix + 'year'] + 2000, - parsed_record[prefix + 'month'], - parsed_record[prefix + 'day'], - parsed_record[prefix + 'hour'], - parsed_record[prefix + 'minute'], - parsed_record[prefix + 'second']) + parsed_record[prefix + "year"] + 2000, + parsed_record[prefix + "month"], + parsed_record[prefix + "day"], + parsed_record[prefix + "hour"], + parsed_record[prefix + "minute"], + parsed_record[prefix + "second"], + ) def _parse_arresult(record): @@ -112,24 +106,23 @@ def _parse_arresult(record): # There are other record types, but we don't currently need to expose these. if not parsed_record: return None - elif parsed_record['type'] == 2: + elif parsed_record["type"] == 2: parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP)) - elif parsed_record['type'] == 5: + elif parsed_record["type"] == 5: parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP)) return common.TimeAdjustment( _extract_timestamp(parsed_record), - _extract_timestamp(parsed_record, 'old_'), - extra_data={'device_id': parsed_record['device_id']}, + _extract_timestamp(parsed_record, "old_"), + extra_data={"device_id": parsed_record["device_id"]}, ) else: return None # Check right away if we have rapid insulin - if parsed_record['rapid-acting-flag']: - parsed_record.update( - _parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP)) + if parsed_record["rapid-acting-flag"]: + parsed_record.update(_parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP)) - if parsed_record['errors']: + if parsed_record["errors"]: return None comment_parts = [] @@ -137,68 +130,69 @@ def _parse_arresult(record): cls = None value = None - if parsed_record['reading-type'] == 2: - comment_parts.append('(Scan)') + if parsed_record["reading-type"] == 2: + comment_parts.append("(Scan)") measure_method = common.MeasurementMethod.CGM cls = common.GlucoseReading - value = parsed_record['value'] - elif parsed_record['reading-type'] == 0: - comment_parts.append('(Blood)') + value = parsed_record["value"] + elif parsed_record["reading-type"] == 0: + comment_parts.append("(Blood)") measure_method = common.MeasurementMethod.BLOOD_SAMPLE cls = common.GlucoseReading - value = parsed_record['value'] - elif parsed_record['reading-type'] == 1: - comment_parts.append('(Ketone)') + value = parsed_record["value"] + elif parsed_record["reading-type"] == 1: + comment_parts.append("(Ketone)") measure_method = common.MeasurementMethod.BLOOD_SAMPLE cls = common.KetoneReading # automatically convert the raw value in mmol/L - value = freestyle.convert_ketone_unit(parsed_record['value']) + value = freestyle.convert_ketone_unit(parsed_record["value"]) else: # unknown reading return None custom_comments = record[29:35] for comment_index in range(6): - if parsed_record['custom-comments-bitfield'] & (1 << comment_index): + if parsed_record["custom-comments-bitfield"] & (1 << comment_index): comment_parts.append(custom_comments[comment_index][1:-1]) - if parsed_record['sport-flag']: - comment_parts.append('Sport') + if parsed_record["sport-flag"]: + comment_parts.append("Sport") - if parsed_record['medication-flag']: - comment_parts.append('Medication') + if parsed_record["medication-flag"]: + comment_parts.append("Medication") - if parsed_record['food-flag']: - if parsed_record['food-carbs-grams']: - comment_parts.append( - 'Food (%d g)' % parsed_record['food-carbs-grams']) + if parsed_record["food-flag"]: + if parsed_record["food-carbs-grams"]: + comment_parts.append("Food (%d g)" % parsed_record["food-carbs-grams"]) else: - comment_parts.append('Food') + comment_parts.append("Food") - if parsed_record['long-acting-flag']: - if parsed_record['double-long-acting-insulin']: + if parsed_record["long-acting-flag"]: + if parsed_record["double-long-acting-insulin"]: comment_parts.append( - 'Long-acting insulin (%.1f)' % - (parsed_record['double-long-acting-insulin']/2.)) + "Long-acting insulin (%.1f)" + % (parsed_record["double-long-acting-insulin"] / 2.0) + ) else: - comment_parts.append('Long-acting insulin') + comment_parts.append("Long-acting insulin") - if parsed_record['rapid-acting-flag']: + if parsed_record["rapid-acting-flag"]: # provide default value, as this record does not always exist # (even if rapid-acting-flag is set) - if parsed_record.get('double-rapid-acting-insulin', 0): + if parsed_record.get("double-rapid-acting-insulin", 0): comment_parts.append( - 'Rapid-acting insulin (%.1f)' % - (parsed_record['double-rapid-acting-insulin']/2.)) + "Rapid-acting insulin (%.1f)" + % (parsed_record["double-rapid-acting-insulin"] / 2.0) + ) else: - comment_parts.append('Rapid-acting insulin') + comment_parts.append("Rapid-acting insulin") return cls( _extract_timestamp(parsed_record), value, - comment='; '.join(comment_parts), + comment="; ".join(comment_parts), measure_method=measure_method, - extra_data={'device_id': parsed_record['device_id']}, + extra_data={"device_id": parsed_record["device_id"]}, ) @@ -210,16 +204,16 @@ class Device(freestyle.FreeStyleHidDevice): def get_meter_info(self): """Return the device information in structured form.""" return common.MeterInfo( - 'FreeStyle Libre', + "FreeStyle Libre", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self._get_version(),), + version_info=("Software version: " + self._get_version(),), native_unit=self.get_glucose_unit(), - patient_name=self.get_patient_name()) + patient_name=self.get_patient_name(), + ) def get_serial_number(self): """Overridden function as the command is not compatible.""" - return self._send_text_command(b'$sn?').rstrip('\r\n') + return self._send_text_command(b"$sn?").rstrip("\r\n") def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -231,27 +225,27 @@ class Device(freestyle.FreeStyleHidDevice): # First of all get the usually longer list of sensor readings, and # convert them to Readings objects. - for record in self._get_multirecord(b'$history?'): + for record in self._get_multirecord(b"$history?"): parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP) - if not parsed_record or parsed_record['errors'] != 0: + if not parsed_record or parsed_record["errors"] != 0: # The reading is considered invalid, so ignore it. continue yield common.GlucoseReading( _extract_timestamp(parsed_record), - parsed_record['value'], - comment='(Sensor)', + parsed_record["value"], + comment="(Sensor)", measure_method=common.MeasurementMethod.CGM, - extra_data={'device_id': parsed_record['device_id']}, + extra_data={"device_id": parsed_record["device_id"]}, ) # Then get the results of explicit scans and blood tests (and other # events). - for record in self._get_multirecord(b'$arresult?'): + for record in self._get_multirecord(b"$arresult?"): reading = _parse_arresult(record) if reading: yield reading def zero_log(self): - self._send_text_command(b'$resetpatient') + self._send_text_command(b"$resetpatient") diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py index 66b23ca..5c3971e 100644 --- a/glucometerutils/drivers/fsoptium.py +++ b/glucometerutils/drivers/fsoptium.py @@ -20,13 +20,13 @@ import datetime import logging import re -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial _CLOCK_RE = re.compile( - r'^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t' - r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$') + r"^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t" + r"(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$" +) # The reading can be HI (padded to three-characters by a space) if the value was # over what the meter was supposed to read. Unlike the "Clock:" line, the months @@ -34,33 +34,33 @@ _CLOCK_RE = re.compile( # characters, so accept a space or 'e'/'y' at the end of the month name. Also, # the time does *not* include seconds. _READING_RE = re.compile( - r'^(?P<reading>HI |[0-9]{3}) ' - r'(?P<month>[A-Z][a-z]{2})[ ey] ' - r'(?P<day>[0-9]{2}) ' - r'(?P<year>[0-9]{4}) ' - r'(?P<time>[0-9]{2}:[0-9]{2}) ' - r'(?P<type>[GK]) 0x00$') + r"^(?P<reading>HI |[0-9]{3}) " + r"(?P<month>[A-Z][a-z]{2})[ ey] " + r"(?P<day>[0-9]{2}) " + r"(?P<year>[0-9]{4}) " + r"(?P<time>[0-9]{2}:[0-9]{2}) " + r"(?P<type>[GK]) 0x00$" +) -_CHECKSUM_RE = re.compile( - r'^(?P<checksum>0x[0-9A-F]{4}) END$') +_CHECKSUM_RE = re.compile(r"^(?P<checksum>0x[0-9A-F]{4}) END$") # There are two date format used by the device. One uses three-letters month # names, and that's easy enough. The other uses three-letters month names, # except for (at least) July. So ignore the fourth character. # explicit mapping. Note that the mapping *requires* a trailing whitespace. _MONTH_MATCHES = { - 'Jan': 1, - 'Feb': 2, - 'Mar': 3, - 'Apr': 4, - 'May': 5, - 'Jun': 6, - 'Jul': 7, - 'Aug': 8, - 'Sep': 9, - 'Oct': 10, - 'Nov': 11, - 'Dec': 12 + "Jan": 1, + "Feb": 2, + "Mar": 3, + "Apr": 4, + "May": 5, + "Jun": 6, + "Jul": 7, + "Aug": 8, + "Sep": 9, + "Oct": 10, + "Nov": 11, + "Dec": 12, } @@ -75,60 +75,59 @@ def _parse_clock(datestr): raise exceptions.InvalidResponse(datestr) # int() parses numbers in decimal, so we don't have to worry about '08' - day = int(match.group('day')) - month = _MONTH_MATCHES[match.group('month')] - year = int(match.group('year')) + day = int(match.group("day")) + month = _MONTH_MATCHES[match.group("month")] + year = int(match.group("year")) - hour, minute, second = (int (x) for x in match.group('time').split(':')) + hour, minute, second = (int(x) for x in match.group("time").split(":")) return datetime.datetime(year, month, day, hour, minute, second) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 - DEFAULT_CABLE_ID = '1a61:3420' + DEFAULT_CABLE_ID = "1a61:3420" def _send_command(self, command): - cmd_bytes = bytes('$%s\r\n' % command, 'ascii') - logging.debug('Sending command: %r', cmd_bytes) + cmd_bytes = bytes("$%s\r\n" % command, "ascii") + logging.debug("Sending command: %r", cmd_bytes) self.serial_.write(cmd_bytes) self.serial_.flush() response = self.serial_.readlines() - logging.debug('Received response: %r', response) + logging.debug("Received response: %r", response) # We always want to decode the output, and remove stray \r\n. Any # failure in decoding means the output is invalid anyway. - decoded_response = [line.decode('ascii').rstrip('\r\n') - for line in response] + decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response] return decoded_response def connect(self): - self._send_command('xmem') # ignore output this time + self._send_command("xmem") # ignore output this time self._fetch_device_information() def disconnect(self): # pylint: disable=no-self-use return def _fetch_device_information(self): - data = self._send_command('colq') + data = self._send_command("colq") for line in data: - parsed_line = line.split('\t') + parsed_line = line.split("\t") - if parsed_line[0] == 'S/N:': + if parsed_line[0] == "S/N:": self.device_serialno_ = parsed_line[1] - elif parsed_line[0] == 'Ver:': + elif parsed_line[0] == "Ver:": self.device_version_ = parsed_line[1] - if parsed_line[2] == 'MMOL': + if parsed_line[2] == "MMOL": self.device_glucose_unit_ = common.Unit.MMOL_L else: # I only have a mmol/l device, so I can't be sure. self.device_glucose_unit_ = common.Unit.MG_DL # There are more entries: Clock, Market, ROM and Usage, but we don't # care for those here. - elif parsed_line[0] == 'CMD OK': + elif parsed_line[0] == "CMD OK": return # I have not figured out why this happens, but sometimes it's echoing @@ -142,11 +141,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): A common.MeterInfo object. """ return common.MeterInfo( - 'Freestyle Optium glucometer', + "Freestyle Optium glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): """Returns an identifier of the firmware version of the glucometer. @@ -179,21 +178,21 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): Returns: A datetime object built according to the returned response. """ - data = self._send_command('colq') + data = self._send_command("colq") for line in data: - if not line.startswith('Clock:'): + if not line.startswith("Clock:"): continue return _parse_clock(line) - raise exceptions.InvalidResponse('\n'.join(data)) + raise exceptions.InvalidResponse("\n".join(data)) def _set_device_datetime(self, date): - data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M')) + data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M")) - parsed_data = ''.join(data) - if parsed_data != 'CMD OK': + parsed_data = "".join(data) + if parsed_data != "CMD OK": raise exceptions.InvalidResponse(parsed_data) return self.get_datetime() @@ -216,50 +215,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): expected. """ - data = self._send_command('xmem') + data = self._send_command("xmem") # The first line is empty, the second is the serial number, the third # the version, the fourth the current time, and the fifth the record # count.. The last line has a checksum and the end. count = int(data[4]) if count != (len(data) - 6): - raise exceptions.InvalidResponse('\n'.join(data)) + raise exceptions.InvalidResponse("\n".join(data)) # Extract the checksum from the last line. checksum_match = _CHECKSUM_RE.match(data[-1]) if not checksum_match: - raise exceptions.InvalidResponse('\n'.join(data)) + raise exceptions.InvalidResponse("\n".join(data)) - expected_checksum = int(checksum_match.group('checksum'), 16) + expected_checksum = int(checksum_match.group("checksum"), 16) # exclude the last line in the checksum calculation, as that's the # checksum itself. The final \r\n is added separately. - calculated_checksum = sum( - ord(c) for c in '\r\n'.join(data[:-1])) + 0xd + 0xa + calculated_checksum = sum(ord(c) for c in "\r\n".join(data[:-1])) + 0xD + 0xA if expected_checksum != calculated_checksum: - raise exceptions.InvalidChecksum( - expected_checksum, calculated_checksum) + raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) for line in data[5:-1]: match = _READING_RE.match(line) if not match: raise exceptions.InvalidResponse(line) - if match.group('type') != 'G': - logging.warning( - 'Non-glucose readings are not supported, ignoring.') + if match.group("type") != "G": + logging.warning("Non-glucose readings are not supported, ignoring.") continue - if match.group('reading') == 'HI ': + if match.group("reading") == "HI ": value = float("inf") else: - value = float(match.group('reading')) + value = float(match.group("reading")) - day = int(match.group('day')) - month = _MONTH_MATCHES[match.group('month')] - year = int(match.group('year')) + day = int(match.group("day")) + month = _MONTH_MATCHES[match.group("month")] + year = int(match.group("year")) - hour, minute = map(int, match.group('time').split(':')) + hour, minute = map(int, match.group("time").split(":")) timestamp = datetime.datetime(year, month, day, hour, minute) diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py index 58564e5..909fed8 100644 --- a/glucometerutils/drivers/fsprecisionneo.py +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -27,25 +27,30 @@ import datetime from glucometerutils import common from glucometerutils.support import freestyle - # The type is a string because it precedes the parsing of the object. -_TYPE_GLUCOSE_READING = '7' -_TYPE_KETONE_READING = '9' - -_NeoReading = collections.namedtuple('_NeoReading', ( - 'type', # 7 = blood glucose, 9 = blood ketone - 'id', - 'month', 'day', 'year', # year is two-digits - 'hour', 'minute', - 'unknown2', - 'value', - # Extra trailing and so-far-unused fields; so discard them: - # * for blood glucose: 10 unknown trailing fields - #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', - #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', - # * for blood ketone: 2 unknown trailing fields - #'unknown3', 'unknown4', -)) +_TYPE_GLUCOSE_READING = "7" +_TYPE_KETONE_READING = "9" + +_NeoReading = collections.namedtuple( + "_NeoReading", + ( + "type", # 7 = blood glucose, 9 = blood ketone + "id", + "month", + "day", + "year", # year is two-digits + "hour", + "minute", + "unknown2", + "value", + # Extra trailing and so-far-unused fields; so discard them: + # * for blood glucose: 10 unknown trailing fields + #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', + #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', + # * for blood ketone: 2 unknown trailing fields + #'unknown3', 'unknown4', + ), +) class Device(freestyle.FreeStyleHidDevice): @@ -56,12 +61,12 @@ class Device(freestyle.FreeStyleHidDevice): def get_meter_info(self): """Return the device information in structured form.""" return common.MeterInfo( - 'FreeStyle Precision Neo', + "FreeStyle Precision Neo", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self._get_version(),), + version_info=("Software version: " + self._get_version(),), native_unit=self.get_glucose_unit(), - patient_name=self.get_patient_name()) + patient_name=self.get_patient_name(), + ) def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -69,7 +74,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_readings(self): """Iterate through the reading records in the device.""" - for record in self._get_multirecord(b'$result?'): + for record in self._get_multirecord(b"$result?"): cls = None if record and record[0] == _TYPE_GLUCOSE_READING: cls = common.GlucoseReading @@ -85,11 +90,15 @@ class Device(freestyle.FreeStyleHidDevice): if value == "HI": value = float("inf") values.append(int(value)) - raw_reading = _NeoReading._make(values[:len(_NeoReading._fields)]) + raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)]) timestamp = datetime.datetime( - raw_reading.year + 2000, raw_reading.month, raw_reading.day, - raw_reading.hour, raw_reading.minute) + raw_reading.year + 2000, + raw_reading.month, + raw_reading.day, + raw_reading.hour, + raw_reading.minute, + ) if record and record[0] == _TYPE_KETONE_READING: value = freestyle.convert_ketone_unit(raw_reading.value) diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py index 39be859..5e90b87 100644 --- a/glucometerutils/drivers/otultra2.py +++ b/glucometerutils/drivers/otultra2.py @@ -16,40 +16,40 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import datetime import re -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, serial # The following two hashes are taken directly from LifeScan's documentation _MEAL_CODES = { - 'N': common.Meal.NONE, - 'B': common.Meal.BEFORE, - 'A': common.Meal.AFTER, + "N": common.Meal.NONE, + "B": common.Meal.BEFORE, + "A": common.Meal.AFTER, } _COMMENT_CODES = { - '00': '', # would be 'No Comment' - '01': 'Not Enough Food', - '02': 'Too Much Food', - '03': 'Mild Exercise', - '04': 'Hard Exercise', - '05': 'Medication', - '06': 'Stress', - '07': 'Illness', - '08': 'Feel Hypo', - '09': 'Menses', - '10': 'Vacation', - '11': 'Other', + "00": "", # would be 'No Comment' + "01": "Not Enough Food", + "02": "Too Much Food", + "03": "Mild Exercise", + "04": "Hard Exercise", + "05": "Medication", + "06": "Stress", + "07": "Illness", + "08": "Feel Hypo", + "09": "Menses", + "10": "Vacation", + "11": "Other", } -_DUMP_HEADER_RE = re.compile( - r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"') +_DUMP_HEADER_RE = re.compile(r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"') _DUMP_LINE_RE = re.compile( r'P (?P<datetime>"[A-Z]{3}","[0-9/]{8}","[0-9:]{8} "),' r'"(?P<control>[C ]) (?P<value>[0-9]{3})(?P<parityerror>[\? ])",' - r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00') + r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00' +) + +_RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$") -_RESPONSE_MATCH = re.compile(r'^(.+) ([0-9A-F]{4})\r$') def _calculate_checksum(bytestring): """Calculate the checksum used by OneTouch Ultra and Ultra2 devices @@ -66,10 +66,11 @@ def _calculate_checksum(bytestring): checksum = 0 for byte in bytestring: - checksum = (checksum + byte) & 0xffff + checksum = (checksum + byte) & 0xFFFF return checksum + def _validate_and_strip_checksum(line): """Verify the simple 16-bit checksum and remove it from the line. @@ -88,20 +89,19 @@ def _validate_and_strip_checksum(line): try: checksum_given = int(checksum_string, 16) - checksum_calculated = _calculate_checksum( - bytes(response, 'ascii')) + checksum_calculated = _calculate_checksum(bytes(response, "ascii")) if checksum_given != checksum_calculated: - raise exceptions.InvalidChecksum(checksum_given, - checksum_calculated) + raise exceptions.InvalidChecksum(checksum_given, checksum_calculated) except ValueError: raise exceptions.InvalidChecksum(checksum_given, None) return response + _DATETIME_RE = re.compile( - r'^"[A-Z]{3}",' - r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$') + r'^"[A-Z]{3}",' r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$' +) def _parse_datetime(response): @@ -121,8 +121,8 @@ def _parse_datetime(response): raise exceptions.InvalidResponse(response) date, time = match.groups() - month, day, year = map(int, date.split('/')) - hour, minute, second = map(int, time.split(':')) + month, day, year = map(int, date.split("/")) + hour, minute, second = map(int, time.split(":")) # Yes, OneTouch2's firmware is not Y2K safe. return datetime.datetime(2000 + year, month, day, hour, minute, second) @@ -130,7 +130,7 @@ def _parse_datetime(response): class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. def connect(self): # pylint: disable=no-self-use return @@ -144,7 +144,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): Args: cmd: command and parameters to send (without newline) """ - cmdstring = bytes('\x11\r' + cmd + '\r', 'ascii') + cmdstring = bytes("\x11\r" + cmd + "\r", "ascii") self.serial_.write(cmdstring) self.serial_.flush() @@ -160,7 +160,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ self._send_command(cmd) - line = self.serial_.readline().decode('ascii') + line = self.serial_.readline().decode("ascii") return _validate_and_strip_checksum(line) def get_meter_info(self): @@ -170,11 +170,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): A common.MeterInfo object. """ return common.MeterInfo( - 'OneTouch Ultra 2 glucometer', + "OneTouch Ultra 2 glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): """Returns an identifier of the firmware version of the glucometer. @@ -183,9 +183,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): The software version returned by the glucometer, such as "P02.00.00 30/08/06". """ - response = self._send_oneliner_command('DM?') + response = self._send_oneliner_command("DM?") - if response[0] != '?': + if response[0] != "?": raise exceptions.InvalidResponse(response) return response[1:] @@ -204,7 +204,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): InvalidSerialNumber: if the returned serial number does not match the OneTouch2 device as per specs. """ - response = self._send_oneliner_command('DM@') + response = self._send_oneliner_command("DM@") match = self._SERIAL_NUMBER_RE.match(response) if not match: @@ -214,7 +214,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): # 'Y' at the far right of the serial number is the indication of a # OneTouch Ultra2 device, as per specs. - if serial_number[-1] != 'Y': + if serial_number[-1] != "Y": raise lifescan.InvalidSerialNumber(serial_number) return serial_number @@ -225,12 +225,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): Returns: A datetime object built according to the returned response. """ - response = self._send_oneliner_command('DMF') + response = self._send_oneliner_command("DMF") return _parse_datetime(response[2:]) def _set_device_datetime(self, date): response = self._send_oneliner_command( - 'DMT' + date.strftime('%m/%d/%y %H:%M:%S')) + "DMT" + date.strftime("%m/%d/%y %H:%M:%S") + ) return _parse_datetime(response[2:]) def zero_log(self): @@ -239,8 +240,8 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): This function will clear the memory of the device deleting all the readings in an irrecoverable way. """ - response = self._send_oneliner_command('DMZ') - if response != 'Z': + response = self._send_oneliner_command("DMZ") + if response != "Z": raise exceptions.InvalidResponse(response) _GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"') @@ -260,15 +261,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): unit used for display. This is not settable by the user in all modern meters. """ - response = self._send_oneliner_command('DMSU?') + response = self._send_oneliner_command("DMSU?") match = self._GLUCOSE_UNIT_RE.match(response) unit = match.group(1) - if unit == 'MG/DL ': + if unit == "MG/DL ": return common.Unit.MG_DL - if unit == 'MMOL/L': + if unit == "MMOL/L": return common.Unit.MMOL_L raise exceptions.InvalidGlucoseUnit(response) @@ -287,10 +288,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): expected. """ - self._send_command('DMP') + self._send_command("DMP") data = self.serial_.readlines() - header = data.pop(0).decode('ascii') + header = data.pop(0).decode("ascii") match = _DUMP_HEADER_RE.match(header) if not match: raise exceptions.InvalidResponse(header) @@ -299,7 +300,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): assert count == len(data) for line in data: - line = _validate_and_strip_checksum(line.decode('ascii')) + line = _validate_and_strip_checksum(line.decode("ascii")) match = _DUMP_LINE_RE.match(line) if not match: @@ -307,11 +308,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): line_data = match.groupdict() - date = _parse_datetime(line_data['datetime']) - meal = _MEAL_CODES[line_data['meal']] - comment = _COMMENT_CODES[line_data['comment']] + date = _parse_datetime(line_data["datetime"]) + meal = _MEAL_CODES[line_data["meal"]] + comment = _COMMENT_CODES[line_data["comment"]] # OneTouch2 always returns the data in mg/dL even if the glucometer # is set to mmol/L, so there is no conversion required. yield common.GlucoseReading( - date, float(line_data['value']), meal=meal, comment=comment) + date, float(line_data["value"]), meal=meal, comment=comment + ) diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 7f4934e..0d1e7a9 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -22,87 +22,97 @@ import logging import construct from glucometerutils import common -from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial +from glucometerutils.support import ( + construct_extras, + driver_base, + lifescan, + lifescan_binary_protocol, + serial, +) _PACKET = lifescan_binary_protocol.LifeScanPacket(True) _INVALID_RECORD = 501 -_COMMAND_SUCCESS = construct.Const(b'\x05\x06') +_COMMAND_SUCCESS = construct.Const(b"\x05\x06") -_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') +_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02") _VERSION_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'version' / construct.PascalString(construct.Byte, encoding='ascii'), + "version" / construct.PascalString(construct.Byte, encoding="ascii"), ) _SERIAL_NUMBER_REQUEST = construct.Const( - b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00') + b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" +) _SERIAL_NUMBER_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'serial_number' / construct.GreedyString(encoding='ascii'), + _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"), ) _DATETIME_REQUEST = construct.Struct( - construct.Const(b'\x05\x20'), # 0x20 is the datetime - 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), - 'timestamp' / construct.Default( + construct.Const(b"\x05\x20"), # 0x20 is the datetime + "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02), + "timestamp" + / construct.Default( construct_extras.Timestamp(construct.Int32ul), # type: ignore - datetime.datetime(1970, 1, 1, 0, 0)), + datetime.datetime(1970, 1, 1, 0, 0), + ), ) _DATETIME_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore + "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore ) -_GLUCOSE_UNIT_REQUEST = construct.Const( - b'\x05\x09\x02\x09\x00\x00\x00\x00') +_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00") _GLUCOSE_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x05\x1A') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A") _READING_COUNT_RESPONSE = construct.Struct( - construct.Const(b'\x0f'), - 'count' / construct.Int16ul, + construct.Const(b"\x0f"), "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x05\x1f'), - 'record_id' / construct.Int16ul, + construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul, ) _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore - 'value' / construct.Int32ul, + "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore + "value" / construct.Int32ul, ) -def _make_packet( - message, sequence_number, expect_receive, acknowledge, disconnect): + +def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect): return _PACKET.build( - {'data': {'value': { - 'message': message, - 'link_control': { - 'sequence_number': sequence_number, - 'expect_receive': expect_receive, - 'acknowledge': acknowledge, - 'disconnect': disconnect, - }, - }}}) + { + "data": { + "value": { + "message": message, + "link_control": { + "sequence_number": sequence_number, + "expect_receive": expect_receive, + "acknowledge": acknowledge, + "disconnect": disconnect, + }, + } + } + } + ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 def __init__(self, device): @@ -110,12 +120,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.sent_counter_ = False self.expect_receive_ = False - self.buffered_reader_ = construct.Rebuffered( - _PACKET, tailcutoff=1024) + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def connect(self): try: - self._send_packet(b'', disconnect=True) + self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) @@ -125,33 +134,32 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _send_packet(self, message, acknowledge=False, disconnect=False): pkt = _make_packet( - message, - self.sent_counter_, - self.expect_receive_, - acknowledge, - disconnect) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect + ) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self): raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data - logging.debug('received packet: %r', raw_pkt) + logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value if not pkt.link_control.disconnect and ( - pkt.link_control.sequence_number != self.expect_receive_): + pkt.link_control.sequence_number != self.expect_receive_ + ): raise lifescan.MalformedCommand( - 'at position 2[0b] expected %02x, received %02x' % ( - self.expect_receive_, pkt.link_connect.sequence_count)) + "at position 2[0b] expected %02x, received %02x" + % (self.expect_receive_, pkt.link_connect.sequence_count) + ) return pkt def _send_ack(self): - self._send_packet(b'', acknowledge=True, disconnect=False) + self._send_packet(b"", acknowledge=True, disconnect=False) def _read_ack(self): pkt = self._read_packet() @@ -177,62 +185,63 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def get_meter_info(self): return common.MeterInfo( - 'OneTouch Ultra Easy glucometer', + "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): - response = self._send_request( - _VERSION_REQUEST, None, _VERSION_RESPONSE) + response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self): response = self._send_request( - _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE) + _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE + ) return response.serial_number def get_datetime(self): response = self._send_request( - _DATETIME_REQUEST, {'request_type': 'read'}, - _DATETIME_RESPONSE) + _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE + ) return response.timestamp def _set_device_datetime(self, date): response = self._send_request( - _DATETIME_REQUEST, { - 'request_type': 'write', - 'timestamp': date, - }, _DATETIME_RESPONSE) + _DATETIME_REQUEST, + {"request_type": "write", "timestamp": date,}, + _DATETIME_RESPONSE, + ) return response.timestamp def zero_log(self): - self._send_request( - _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE) + _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD}, - _READING_COUNT_RESPONSE) + _READ_RECORD_REQUEST, + {"record_id": _INVALID_RECORD}, + _READING_COUNT_RESPONSE, + ) return response.count def _get_reading(self, record_id): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE) + _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE + ) - return common.GlucoseReading( - response.timestamp, float(response.value)) + return common.GlucoseReading(response.timestamp, float(response.value)) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index bde0af3..b8429ea 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -29,67 +29,61 @@ import construct from pyscsi.pyscsi.scsi import SCSI from pyscsi.pyscsi.scsi_device import SCSIDevice -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol # This device uses SCSI blocks as registers. _REGISTER_SIZE = 512 _PACKET = construct.Padded( - _REGISTER_SIZE, - lifescan_binary_protocol.LifeScanPacket(False)) + _REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False) +) -_COMMAND_SUCCESS = construct.Const(b'\x03\x06') +_COMMAND_SUCCESS = construct.Const(b"\x03\x06") _QUERY_REQUEST = construct.Struct( - construct.Const(b'\x03\xe6\x02'), - 'selector' / construct.Enum( - construct.Byte, serial=0x00, model=0x01, software=0x02), + construct.Const(b"\x03\xe6\x02"), + "selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02), ) _QUERY_RESPONSE = construct.Struct( - construct.Const(b'\x03\x06'), - 'value' / construct.CString(encoding='utf-16-le'), + construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"), ) _READ_PARAMETER_REQUEST = construct.Struct( - construct.Const(b'\x03'), - 'selector' / construct.Enum( - construct.Byte, unit=0x04), + construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04), ) _READ_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02') +_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02") _READ_RTC_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) _WRITE_RTC_REQUEST = construct.Struct( - construct.Const(b'\x03\x20\x01'), - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + construct.Const(b"\x03\x20\x01"), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a") -_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00') +_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00") _READ_RECORD_COUNT_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'count' / construct.Int16ul, + _COMMAND_SUCCESS, "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x03\x31\x02'), - 'record_id' / construct.Int16ul, - construct.Const(b'\x00'), + construct.Const(b"\x03\x31\x02"), + "record_id" / construct.Int16ul, + construct.Const(b"\x00"), ) _MEAL_FLAG = { @@ -100,13 +94,12 @@ _MEAL_FLAG = { _READ_RECORD_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'inverse_counter' / construct.Int16ul, + "inverse_counter" / construct.Int16ul, construct.Padding(1), - 'lifetime_counter' / construct.Int16ul, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore - 'value' / construct.Int16ul, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "lifetime_counter" / construct.Int16ul, + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "value" / construct.Int16ul, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Padding(4), ) @@ -115,8 +108,9 @@ class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device: raise exceptions.CommandLineError( - '--device parameter is required, should point to the disk ' - 'device representing the meter.') + "--device parameter is required, should point to the disk " + "device representing the meter." + ) self.device_name_ = device self.scsi_device_ = SCSIDevice(device, readwrite=True) @@ -125,11 +119,12 @@ class Device(driver_base.GlucometerDriver): def connect(self): inq = self.scsi_.inquiry() - logging.debug('Device connected: %r', inq.result) - vendor = inq.result['t10_vendor_identification'][:32] - if vendor != b'LifeScan': + logging.debug("Device connected: %r", inq.result) + vendor = inq.result["t10_vendor_identification"][:32] + if vendor != b"LifeScan": raise exceptions.ConnectionFailed( - 'Device %s is not a LifeScan glucometer.' % self.device_name_) + "Device %s is not a LifeScan glucometer." % self.device_name_ + ) def disconnect(self): # pylint: disable=no-self-use return @@ -155,21 +150,19 @@ class Device(driver_base.GlucometerDriver): """ try: request = request_format.build(request_obj) - request_raw = _PACKET.build({'data': {'value': { - 'message': request, - }}}) - logging.debug( - 'Request sent: %s', binascii.hexlify(request_raw)) + request_raw = _PACKET.build({"data": {"value": {"message": request,}}}) + logging.debug("Request sent: %s", binascii.hexlify(request_raw)) self.scsi_.write10(lba, 1, request_raw) response_raw = self.scsi_.read10(lba, 1) logging.debug( - 'Response received: %s', binascii.hexlify(response_raw.datain)) + "Response received: %s", binascii.hexlify(response_raw.datain) + ) response_pkt = _PACKET.parse(response_raw.datain).data - logging.debug('Response packet: %r', response_pkt) + logging.debug("Response packet: %r", response_pkt) response = response_format.parse(response_pkt.value.message) - logging.debug('Response parsed: %r', response) + logging.debug("Response parsed: %r", response) return response except construct.ConstructError as e: @@ -177,60 +170,58 @@ class Device(driver_base.GlucometerDriver): def _query_string(self, selector): response = self._send_request( - 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE) + 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE + ) return response.value def get_meter_info(self): return common.MeterInfo( - 'OneTouch %s glucometer' % self._query_string('model'), + "OneTouch %s glucometer" % self._query_string("model"), serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_serial_number(self): - return self._query_string('serial') + return self._query_string("serial") def get_version(self): - return self._query_string('software') + return self._query_string("software") def get_datetime(self): - response = self._send_request( - 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) + response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp def _set_device_datetime(self, date): - self._send_request( - 3, _WRITE_RTC_REQUEST, {'timestamp': date}, - _COMMAND_SUCCESS) + self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() def zero_log(self): - self._send_request( - 3, _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'}, - _READ_UNIT_RESPONSE) + 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE) + 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE + ) return response.count def _get_reading(self, record_id): response = self._send_request( - 3, _READ_RECORD_REQUEST, {'record_id': record_id}, - _READ_RECORD_RESPONSE) + 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE + ) return common.GlucoseReading( - response.timestamp, float(response.value), meal=response.meal) + response.timestamp, float(response.value), meal=response.meal + ) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py index 69bdac9..24327ef 100644 --- a/glucometerutils/drivers/otverioiq.py +++ b/glucometerutils/drivers/otverioiq.py @@ -21,63 +21,63 @@ import logging import construct from glucometerutils import common -from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol, serial +from glucometerutils.support import ( + driver_base, + lifescan, + lifescan_binary_protocol, + serial, +) _PACKET = lifescan_binary_protocol.LifeScanPacket(False) -_COMMAND_SUCCESS = construct.Const(b'\x03\x06') +_COMMAND_SUCCESS = construct.Const(b"\x03\x06") -_VERSION_REQUEST = construct.Const(b'\x03\x0d\x01') +_VERSION_REQUEST = construct.Const(b"\x03\x0d\x01") _VERSION_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'version' / construct.PascalString(construct.Byte, encoding='ascii'), + "version" / construct.PascalString(construct.Byte, encoding="ascii"), # NULL-termination is not included in string length. - construct.Const(b'\x00'), + construct.Const(b"\x00"), ) -_SERIAL_NUMBER_REQUEST = construct.Const( - b'\x03\x0b\x01\x02') +_SERIAL_NUMBER_REQUEST = construct.Const(b"\x03\x0b\x01\x02") _SERIAL_NUMBER_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'serial_number' / construct.CString(encoding='ascii'), + _COMMAND_SUCCESS, "serial_number" / construct.CString(encoding="ascii"), ) -_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02') +_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02") _READ_RTC_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) _WRITE_RTC_REQUEST = construct.Struct( - construct.Const(b'\x03\x20\x01'), - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + construct.Const(b"\x03\x20\x01"), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) -_GLUCOSE_UNIT_REQUEST = construct.Const( - b'\x03\x09\x02\x02') +_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x03\x09\x02\x02") _GLUCOSE_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a") -_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00') +_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00") _READ_RECORD_COUNT_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'count' / construct.Int16ul, + _COMMAND_SUCCESS, "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x03\x21'), - 'record_id' / construct.Int16ul, + construct.Const(b"\x03\x21"), "record_id" / construct.Int16ul, ) _MEAL_FLAG = { @@ -88,18 +88,17 @@ _MEAL_FLAG = { _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore - 'value' / construct.Int16ul, - 'control_test' / construct.Flag, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "value" / construct.Int16ul, + "control_test" / construct.Flag, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Padding(2), # unknown ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 - DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x + DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x TIMEOUT = 0.5 def __init__(self, device): @@ -107,18 +106,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def _send_packet(self, message): - pkt = _PACKET.build( - {'data': {'value': { - 'message': message, - }}}) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + pkt = _PACKET.build({"data": {"value": {"message": message,}}}) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self): raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data - logging.debug('received packet: %r', raw_pkt) + logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value @@ -138,66 +134,64 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def get_meter_info(self): return common.MeterInfo( - 'OneTouch Verio IQ glucometer', + "OneTouch Verio IQ glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): - response = self._send_request( - _VERSION_REQUEST, None, _VERSION_RESPONSE) + response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self): response = self._send_request( - _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE) + _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE + ) return response.serial_number def get_datetime(self): - response = self._send_request( - _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) + response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp def _set_device_datetime(self, date): - self._send_request( - _WRITE_RTC_REQUEST, { - 'timestamp': date, - }, _COMMAND_SUCCESS) + self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date,}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() def zero_log(self): - self._send_request( - _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE) + _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE) + _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE + ) return response.count def _get_reading(self, record_id): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE) + _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE + ) if response.control_test: - logging.debug('control solution test, ignoring.') + logging.debug("control solution test, ignoring.") return None return common.GlucoseReading( - response.timestamp, float(response.value), meal=response.meal) + response.timestamp, float(response.value), meal=response.meal + ) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index a6e2ce5..47dd9ca 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -24,46 +24,44 @@ import operator import construct -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial def xor_checksum(msg): return functools.reduce(operator.xor, msg) + class Direction(enum.Enum): In = 0x20 Out = 0x10 + _PACKET = construct.Struct( - 'stx' / construct.Const(0x53, construct.Byte), - 'direction' / construct.Mapping( - construct.Byte, - {e: e.value for e in Direction}), - 'length' / construct.Rebuild( - construct.Byte, lambda this: len(this.message) + 2), - 'message' / construct.Bytes(lambda this: this.length - 2), - 'checksum' / construct.Checksum( - construct.Byte, xor_checksum, construct.this.message), - 'etx' / construct.Const(0xAA, construct.Byte) + "stx" / construct.Const(0x53, construct.Byte), + "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}), + "length" / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 2), + "message" / construct.Bytes(lambda this: this.length - 2), + "checksum" + / construct.Checksum(construct.Byte, xor_checksum, construct.this.message), + "etx" / construct.Const(0xAA, construct.Byte), ) _FIRST_MESSAGE = construct.Struct( construct.Const(0x30, construct.Byte), - 'count' / construct.Int16ub, + "count" / construct.Int16ub, construct.Const(0xAA, construct.Byte)[19], ) -_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA' -_RESPONSE_MESSAGE = b'\x10\x40' +_CHALLENGE_PACKET_FULL = b"\x53\x20\x04\x10\x30\x20\xAA" +_RESPONSE_MESSAGE = b"\x10\x40" -_DATE_SET_MESSAGE = b'\x10\x10' +_DATE_SET_MESSAGE = b"\x10\x10" -_DISCONNECT_MESSAGE = b'\x10\x60' -_DISCONNECTED_MESSAGE = b'\x10\x70' +_DISCONNECT_MESSAGE = b"\x10\x60" +_DISCONNECTED_MESSAGE = b"\x10\x70" -_FETCH_MESSAGE = b'\x10\x60' +_FETCH_MESSAGE = b"\x10\x60" _MEAL_FLAG = { common.Meal.NONE: 0x00, @@ -73,67 +71,64 @@ _MEAL_FLAG = { _READING = construct.Struct( construct.Byte[2], - 'year' / construct.Byte, - 'month' / construct.Byte, - 'day' / construct.Byte, - 'hour' / construct.Byte, - 'minute' / construct.Byte, - 'value' / construct.Int16ub, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "year" / construct.Byte, + "month" / construct.Byte, + "day" / construct.Byte, + "hour" / construct.Byte, + "minute" / construct.Byte, + "value" / construct.Int16ub, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Byte[7], ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 - DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable. + DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. def read_message(self): pkt = _PACKET.parse_stream(self.serial_) - logging.debug('received packet: %r', pkt) + logging.debug("received packet: %r", pkt) return pkt.message def wait_and_ready(self): - challenge = b'\0' - while challenge == b'\0': + challenge = b"\0" + while challenge == b"\0": challenge = self.serial_.read(1) # The first packet read may have a prefixed zero, it might be a bug # in the cp210x driver or device, but discard it if found. - if challenge == b'\0': - logging.debug('spurious null byte received') + if challenge == b"\0": + logging.debug("spurious null byte received") continue - if challenge != b'\x53': + if challenge != b"\x53": raise exceptions.ConnectionFailed( - message='Unexpected starting bytes %r' % challenge) + message="Unexpected starting bytes %r" % challenge + ) challenge += self.serial_.read(6) if challenge != _CHALLENGE_PACKET_FULL: raise exceptions.ConnectionFailed( - message='Unexpected challenge %r' % challenge) + message="Unexpected challenge %r" % challenge + ) - logging.debug( - 'challenge packet received: %s', binascii.hexlify(challenge)) + logging.debug("challenge packet received: %s", binascii.hexlify(challenge)) self.send_message(_RESPONSE_MESSAGE) # The first packet only contains the counter of how many readings are # available. first_message = _FIRST_MESSAGE.parse(self.read_message()) - logging.debug('received first message: %r', first_message) + logging.debug("received first message: %r", first_message) return first_message.count def send_message(self, message): - pkt = _PACKET.build({ - 'message': message, - 'direction': Direction.Out - }) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + pkt = _PACKET.build({"message": message, "direction": Direction.Out}) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) def connect(self): # pylint: disable=no-self-use @@ -146,7 +141,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse(response=response) def get_meter_info(self): # pylint: disable=no-self-use - return common.MeterInfo('SD CodeFree glucometer') + return common.MeterInfo("SD CodeFree glucometer") def get_version(self): # pylint: disable=no-self-use raise NotImplementedError @@ -162,7 +157,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise NotImplementedError def _set_device_datetime(self, date): - setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') + setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii") # Ignore the readings count. self.wait_and_ready() @@ -173,8 +168,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse(response=response) # The date we return should only include up to minute, unfortunately. - return datetime.datetime(date.year, date.month, date.day, - date.hour, date.minute) + return datetime.datetime( + date.year, date.month, date.day, date.hour, date.minute + ) def zero_log(self): raise NotImplementedError @@ -187,10 +183,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): message = self.read_message() reading = _READING.parse(message) - logging.debug('received reading: %r', reading) + logging.debug("received reading: %r", reading) yield common.GlucoseReading( datetime.datetime( - 2000 + reading.year, reading.month, - reading.day, reading.hour, reading.minute), - reading.value, meal=reading.meal) + 2000 + reading.year, + reading.month, + reading.day, + reading.hour, + reading.minute, + ), + reading.value, + meal=reading.meal, + ) diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py index 4ab25ee..0385299 100644 --- a/glucometerutils/drivers/td4277.py +++ b/glucometerutils/drivers/td4277.py @@ -21,14 +21,13 @@ import operator import construct -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial class Direction(enum.Enum): - In = 0xa5 - Out = 0xa3 + In = 0xA5 + Out = 0xA3 def byte_checksum(data): @@ -36,18 +35,18 @@ def byte_checksum(data): _PACKET = construct.Struct( - 'data' / construct.RawCopy( + "data" + / construct.RawCopy( construct.Struct( - construct.Const(b'\x51'), - 'command' / construct.Byte, - 'message' / construct.Bytes(4), - 'direction' / construct.Mapping( - construct.Byte, - {e: e.value for e in Direction}), + construct.Const(b"\x51"), + "command" / construct.Byte, + "message" / construct.Bytes(4), + "direction" + / construct.Mapping(construct.Byte, {e: e.value for e in Direction}), ), ), - 'checksum' / construct.Checksum( - construct.Byte, byte_checksum, construct.this.data.data), + "checksum" + / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data), ) _EMPTY_MESSAGE = 0 @@ -60,38 +59,32 @@ _SET_DATETIME = 0x33 _GET_MODEL = 0x24 -_GET_READING_COUNT = 0x2b +_GET_READING_COUNT = 0x2B _GET_READING_DATETIME = 0x25 _GET_READING_VALUE = 0x26 _CLEAR_MEMORY = 0x52 _MODEL_STRUCT = construct.Struct( - construct.Const(b'\x77\x42'), - construct.Byte, - construct.Byte, + construct.Const(b"\x77\x42"), construct.Byte, construct.Byte, ) _DATETIME_STRUCT = construct.Struct( - 'day' / construct.Int16ul, - 'minute' / construct.Byte, - 'hour' / construct.Byte, + "day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte, ) _DAY_BITSTRUCT = construct.BitStruct( - 'year' / construct.BitsInteger(7), - 'month' / construct.BitsInteger(4), - 'day' / construct.BitsInteger(5), + "year" / construct.BitsInteger(7), + "month" / construct.BitsInteger(4), + "day" / construct.BitsInteger(5), ) _READING_COUNT_STRUCT = construct.Struct( - 'count' / construct.Int16ul, - construct.Int16ul, + "count" / construct.Int16ul, construct.Int16ul, ) _READING_SELECTION_STRUCT = construct.Struct( - 'record_id' / construct.Int16ul, - construct.Const(b'\x00\x00'), + "record_id" / construct.Int16ul, construct.Const(b"\x00\x00"), ) _MEAL_FLAG = { @@ -101,21 +94,24 @@ _MEAL_FLAG = { } _READING_VALUE_STRUCT = construct.Struct( - 'value' / construct.Int16ul, - construct.Const(b'\x06'), - 'meal'/ construct.Mapping( - construct.Byte, _MEAL_FLAG), + "value" / construct.Int16ul, + construct.Const(b"\x06"), + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), ) + def _make_packet(command, message, direction=Direction.Out): return _PACKET.build( - {'data': { - 'value': { - 'command': command, - 'message': message, - 'direction': direction, - }, - }}) + { + "data": { + "value": { + "command": command, + "message": message, + "direction": direction, + }, + } + } + ) def _parse_datetime(message): @@ -124,11 +120,12 @@ def _parse_datetime(message): # unfortunately. day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day)) return datetime.datetime( - 2000+day.year, day.month, day.day, date.hour, date.minute) + 2000 + day.year, day.month, day.day, date.hour, date.minute + ) def _select_record(record_id): - return _READING_SELECTION_STRUCT.build({'record_id': record_id}) + return _READING_SELECTION_STRUCT.build({"record_id": record_id}) class Device(serial.SerialDevice, driver_base.GlucometerDriver): @@ -137,19 +134,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): TIMEOUT = 0.5 def __init__(self, device): - super(Device, self).__init__('cp2110://' + device) - self.buffered_reader_ = construct.Rebuffered( - _PACKET, tailcutoff=1024) + super(Device, self).__init__("cp2110://" + device) + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def _send_command( - self, command, message=_EMPTY_MESSAGE, validate_response=True): + def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True): pkt = _make_packet(command, message) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() response = self.buffered_reader_.parse_stream(self.serial_) - logging.debug('received packet: %r', response) + logging.debug("received packet: %r", response) if validate_response and response.data.value.command != command: raise InvalidResponse(response) @@ -158,24 +153,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def connect(self): response_command, message = self._send_command( - _CONNECT_REQUEST, validate_response=False) + _CONNECT_REQUEST, validate_response=False + ) if response_command not in _VALID_CONNECT_RESPONSE: raise exceptions.ConnectionFailed( - 'Invalid response received: %2x %r' % ( - response_command, message)) + "Invalid response received: %2x %r" % (response_command, message) + ) _, model_message = self._send_command(_GET_MODEL) try: _MODEL_STRUCT.parse(model_message) except construct.ConstructError: raise exceptions.ConnectionFailed( - 'Invalid model identified: %r' % model_message) + "Invalid model identified: %r" % model_message + ) def disconnect(self): pass def get_meter_info(self): - return common.MeterInfo('TaiDoc TD-4277 glucometer') + return common.MeterInfo("TaiDoc TD-4277 glucometer") def get_version(self): # pylint: disable=no-self-use raise NotImplementedError @@ -191,18 +188,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _set_device_datetime(self, date): assert date.year >= 2000 - day_struct = _DAY_BITSTRUCT.build({ - 'year': date.year - 2000, - 'month': date.month, - 'day': date.day, - }) + day_struct = _DAY_BITSTRUCT.build( + {"year": date.year - 2000, "month": date.month, "day": date.day,} + ) day_word = construct.Int16ub.parse(day_struct) - date_message = _DATETIME_STRUCT.build({ - 'day': day_word, - 'minute': date.minute, - 'hour': date.hour}) + date_message = _DATETIME_STRUCT.build( + {"day": day_word, "minute": date.minute, "hour": date.hour} + ) _, message = self._send_command(_SET_DATETIME, message=date_message) @@ -215,17 +209,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _get_reading(self, record_id): _, reading_date_message = self._send_command( - _GET_READING_DATETIME, - _select_record(record_id)) + _GET_READING_DATETIME, _select_record(record_id) + ) reading_date = _parse_datetime(reading_date_message) _, reading_value_message = self._send_command( - _GET_READING_VALUE, - _select_record(record_id)) + _GET_READING_VALUE, _select_record(record_id) + ) reading_value = _READING_VALUE_STRUCT.parse(reading_value_message) return common.GlucoseReading( - reading_date, reading_value.value, meal=reading_value.meal) + reading_date, reading_value.value, meal=reading_value.meal + ) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/exceptions.py b/glucometerutils/exceptions.py index 53aa980..52e4d22 100644 --- a/glucometerutils/exceptions.py +++ b/glucometerutils/exceptions.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: MIT """Common exceptions for glucometerutils.""" + class Error(Exception): """Base class for the errors.""" @@ -14,7 +15,7 @@ class CommandLineError(Error): class ConnectionFailed(Error): """It was not possible to connect to the meter.""" - def __init__(self, message='Unable to connect to the meter.'): + def __init__(self, message="Unable to connect to the meter."): super(ConnectionFailed, self).__init__(message) @@ -30,14 +31,16 @@ class InvalidResponse(Error): def __init__(self, response): super(InvalidResponse, self).__init__( - 'Invalid response received:\n%s' % response) + "Invalid response received:\n%s" % response + ) class InvalidChecksum(InvalidResponse): def __init__(self, wire, calculated): super(InvalidChecksum, self).__init__( - 'Response checksum not matching: %08x (wire) != %08x (calculated)' % - (wire, calculated)) + "Response checksum not matching: %08x (wire) != %08x (calculated)" + % (wire, calculated) + ) class InvalidGlucoseUnit(Error): @@ -45,12 +48,12 @@ class InvalidGlucoseUnit(Error): def __init__(self, unit): super(InvalidGlucoseUnit, self).__init__( - 'Invalid glucose unit received:\n%s' % unit) + "Invalid glucose unit received:\n%s" % unit + ) class InvalidDateTime(Error): """The device has an invalid date/time setting.""" def __init__(self): - super(InvalidDateTime, self).__init__( - 'Invalid date and time for device') + super(InvalidDateTime, self).__init__("Invalid date and time for device") diff --git a/glucometerutils/glucometer.py b/glucometerutils/glucometer.py index 29502fd..012bdb1 100755 --- a/glucometerutils/glucometer.py +++ b/glucometerutils/glucometer.py @@ -10,76 +10,108 @@ import inspect import logging import sys -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions + def main(): if sys.version_info < (3, 5): - raise Exception( - 'Unsupported Python version, please use at least Python 3.5') + raise Exception("Unsupported Python version, please use at least Python 3.5") parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="action") parser.add_argument( - '--driver', action='store', required=True, - help='Select the driver to use for connecting to the glucometer.') + "--driver", + action="store", + required=True, + help="Select the driver to use for connecting to the glucometer.", + ) parser.add_argument( - '--device', action='store', required=False, - help=('Select the path to the glucometer device. Some devices require ' - 'this argument, others will try autodetection.')) + "--device", + action="store", + required=False, + help=( + "Select the path to the glucometer device. Some devices require " + "this argument, others will try autodetection." + ), + ) parser.add_argument( - '--vlog', action='store', required=False, type=int, - help=('Python logging level. See the levels at ' - 'https://docs.python.org/3/library/logging.html#logging-levels')) + "--vlog", + action="store", + required=False, + type=int, + help=( + "Python logging level. See the levels at " + "https://docs.python.org/3/library/logging.html#logging-levels" + ), + ) subparsers.add_parser( - 'help', help=('Display a description of the driver, including ' - 'supported features and known quirks.')) - subparsers.add_parser( - 'info', help='Display information about the meter.') - subparsers.add_parser( - 'zero', help='Zero out the data log of the meter.') + "help", + help=( + "Display a description of the driver, including " + "supported features and known quirks." + ), + ) + subparsers.add_parser("info", help="Display information about the meter.") + subparsers.add_parser("zero", help="Zero out the data log of the meter.") parser_dump = subparsers.add_parser( - 'dump', help='Dump the readings stored in the device.') + "dump", help="Dump the readings stored in the device." + ) parser_dump.add_argument( - '--unit', action='store', + "--unit", + action="store", choices=[unit.value for unit in common.Unit], - help='Select the unit to use for the dumped data.') + help="Select the unit to use for the dumped data.", + ) parser_dump.add_argument( - '--with-ketone', action='store_true', default=False, - help='Enable ketone reading if available on the glucometer.') + "--with-ketone", + action="store_true", + default=False, + help="Enable ketone reading if available on the glucometer.", + ) parser_date = subparsers.add_parser( - 'datetime', help='Reads or sets the date and time of the glucometer.') + "datetime", help="Reads or sets the date and time of the glucometer." + ) parser_date.add_argument( - '--set', action='store', nargs='?', const='now', default=None, - help='Set the date rather than just reading it from the device.') + "--set", + action="store", + nargs="?", + const="now", + default=None, + help="Set the date rather than just reading it from the device.", + ) parser_patient = subparsers.add_parser( - 'patient', help='Reads or sets the patient information.') + "patient", help="Reads or sets the patient information." + ) parser_patient.add_argument( - '--set_name', action='store', required=False, - help='Set the patient name, if the meter supports it.') + "--set_name", + action="store", + required=False, + help="Set the patient name, if the meter supports it.", + ) args = parser.parse_args() logging.basicConfig(level=args.vlog) try: - driver = importlib.import_module( - 'glucometerutils.drivers.' + args.driver) + driver = importlib.import_module("glucometerutils.drivers." + args.driver) except ImportError as e: logging.error( - 'Error importing driver "%s", please check your --driver ' - 'parameter:\n%s', args.driver, e) + 'Error importing driver "%s", please check your --driver ' "parameter:\n%s", + args.driver, + e, + ) return 1 # This check needs to happen before we try to initialize the device, as the # help action does not require a --device at all. - if args.action == 'help': + if args.action == "help": print(inspect.getdoc(driver)) return 0 @@ -89,17 +121,20 @@ def main(): device_info = device.get_meter_info() try: - if args.action == 'info': + if args.action == "info": try: time_str = device.get_datetime() except exceptions.InvalidDateTime: - time_str = 'INVALID' + time_str = "INVALID" # Also catch any leftover ValueErrors. except (NotImplementedError, ValueError): - time_str = 'N/A' - print("{device_info}Time: {time}".format( - device_info=str(device_info), time=time_str)) - elif args.action == 'dump': + time_str = "N/A" + print( + "{device_info}Time: {time}".format( + device_info=str(device_info), time=time_str + ) + ) + elif args.action == "dump": unit = args.unit if unit is None: unit = device_info.native_unit @@ -107,57 +142,58 @@ def main(): readings = device.get_readings() if not args.with_ketone: - readings = (reading for reading in readings - if not isinstance(reading, common.KetoneReading)) + readings = ( + reading + for reading in readings + if not isinstance(reading, common.KetoneReading) + ) for reading in sorted(readings, key=lambda r: r.timestamp): print(reading.as_csv(unit)) - elif args.action == 'datetime': - if args.set == 'now': + elif args.action == "datetime": + if args.set == "now": print(device.set_datetime()) elif args.set: try: from dateutil import parser as date_parser + new_date = date_parser.parse(args.set) except ImportError: logging.error( - 'Unable to import module "dateutil", ' - 'please install it.') + 'Unable to import module "dateutil", ' "please install it." + ) return 1 except ValueError: - logging.error('%s: not a valid date', args.set) + logging.error("%s: not a valid date", args.set) return 1 print(device.set_datetime(new_date)) else: print(device.get_datetime()) - elif args.action == 'patient': + elif args.action == "patient": if args.set_name != None: try: device.set_patient_name(args.set_name) except NotImplementedError: - print( - 'The glucometer does not support setting patient name.') + print("The glucometer does not support setting patient name.") try: patient_name = device.get_patient_name() if patient_name is None: - patient_name = '[N/A]' - print('Patient Name: {patient_name}'.format( - patient_name=patient_name)) + patient_name = "[N/A]" + print("Patient Name: {patient_name}".format(patient_name=patient_name)) except NotImplementedError: - print( - 'The glucometer does not support retrieving patient name.') - elif args.action == 'zero': - confirm = input('Delete the device data log? (y/N) ') - if confirm.lower() in ['y', 'ye', 'yes']: + print("The glucometer does not support retrieving patient name.") + elif args.action == "zero": + confirm = input("Delete the device data log? (y/N) ") + if confirm.lower() in ["y", "ye", "yes"]: device.zero_log() - print('\nDevice data log zeroed.') + print("\nDevice data log zeroed.") else: - print('\nDevice data log not zeroed.') + print("\nDevice data log not zeroed.") return 1 else: return 1 except exceptions.Error as err: - print('Error while executing \'%s\': %s' % (args.action, str(err))) + print("Error while executing '%s': %s" % (args.action, str(err))) return 1 device.disconnect() diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py index 7abcd9e..b44ee84 100644 --- a/glucometerutils/support/construct_extras.py +++ b/glucometerutils/support/construct_extras.py @@ -7,6 +7,7 @@ import datetime import construct + class Timestamp(construct.Adapter): """Adapter for converting datetime object into timestamps. @@ -14,6 +15,7 @@ class Timestamp(construct.Adapter): and an optional epoch offset to the UNIX Epoch. """ + __slots__ = ["epoch"] def __init__(self, subcon, epoch=0): diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py index b4db4eb..3b0dc80 100644 --- a/glucometerutils/support/contourusb.py +++ b/glucometerutils/support/contourusb.py @@ -43,25 +43,31 @@ _HEADER_RECORD_RE = re.compile( "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})" "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|" "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|" - "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)") + "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)" +) _RESULT_RECORD_RE = re.compile( "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^" "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|" - "(?P<datetime>[0-9]+)") + "(?P<datetime>[0-9]+)" +) _RECORD_FORMAT = re.compile( - '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)' - '\x0d(?P<end>[\x03\x17]))' - '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a') + "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)" + "\x0d(?P<end>[\x03\x17]))" + "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a" +) + class FrameError(Exception): pass + class ContourHidDevice(hiddevice.HidDevice): """Base class implementing the ContourUSB HID common protocol. """ + blocksize = 64 # Operation modes @@ -77,85 +83,84 @@ class ContourHidDevice(hiddevice.HidDevice): while True: data = self._read() dstr = data - result.append(dstr[4:data[3]+4]) - if data[3] != self.blocksize-4: + result.append(dstr[4 : data[3] + 4]) + if data[3] != self.blocksize - 4: break - return (b"".join(result)) + return b"".join(result) def write(self, data): - data = b'ABC' + chr(len(data)).encode() + data.encode() + data = b"ABC" + chr(len(data)).encode() + data.encode() pad_length = self.blocksize - len(data) - data += pad_length * b'\x00' + data += pad_length * b"\x00" self._write(data) - USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int def parse_header_record(self, text): header = _HEADER_RECORD_RE.search(text) - self.field_del = header.group('field_del') - self.repeat_del = header.group('repeat_del') - self.component_del = header.group('component_del') - self.escape_del = header.group('escape_del') - - self.product_code = header.group('product_code') - self.dig_ver = header.group('dig_ver') - self.anlg_ver = header.group('anlg_ver') - self.agp_ver = header.group('agp_ver') - - self.serial_num = header.group('serial_num') - self.sku_id = header.group('sku_id') - self.res_marking = header.group('res_marking') - self.config_bits = header.group('config_bits') - self.lang = header.group('lang') - self.interv = header.group('interv') - self.ref_method = header.group('ref_method') - self.internal = header.group('internal') + self.field_del = header.group("field_del") + self.repeat_del = header.group("repeat_del") + self.component_del = header.group("component_del") + self.escape_del = header.group("escape_del") + + self.product_code = header.group("product_code") + self.dig_ver = header.group("dig_ver") + self.anlg_ver = header.group("anlg_ver") + self.agp_ver = header.group("agp_ver") + + self.serial_num = header.group("serial_num") + self.sku_id = header.group("sku_id") + self.res_marking = header.group("res_marking") + self.config_bits = header.group("config_bits") + self.lang = header.group("lang") + self.interv = header.group("interv") + self.ref_method = header.group("ref_method") + self.internal = header.group("internal") # U limit - self.unit = header.group('unit') - self.lo_bound = header.group('lo_bound') - self.hi_bound = header.group('hi_bound') + self.unit = header.group("unit") + self.lo_bound = header.group("lo_bound") + self.hi_bound = header.group("hi_bound") # X field - self.hypo_limit = header.group('hypo_limit') - self.overall_low = header.group('overall_low') - self.pre_food_low = header.group('pre_food_low') - self.post_food_low = header.group('post_food_low') - self.overall_high = header.group('overall_high') - self.pre_food_high = header.group('pre_food_high') - self.post_food_high = header.group('post_food_high') - self.hyper_limit = header.group('hyper_limit') + self.hypo_limit = header.group("hypo_limit") + self.overall_low = header.group("overall_low") + self.pre_food_low = header.group("pre_food_low") + self.post_food_low = header.group("post_food_low") + self.overall_high = header.group("overall_high") + self.pre_food_high = header.group("pre_food_high") + self.post_food_high = header.group("post_food_high") + self.hyper_limit = header.group("hyper_limit") # Y field - self.upp_hyper = header.group('upp_hyper') - self.low_hyper = header.group('low_hyper') - self.upp_hypo = header.group('upp_hypo') - self.low_hypo = header.group('low_hypo') - self.upp_low_target = header.group('upp_low_target') - self.low_low_target = header.group('low_low_target') - self.upp_hi_target = header.group('upp_hi_target') - self.low_hi_target = header.group('low_hi_target') + self.upp_hyper = header.group("upp_hyper") + self.low_hyper = header.group("low_hyper") + self.upp_hypo = header.group("upp_hypo") + self.low_hypo = header.group("low_hypo") + self.upp_low_target = header.group("upp_low_target") + self.low_low_target = header.group("low_low_target") + self.upp_hi_target = header.group("upp_hi_target") + self.low_hi_target = header.group("low_hi_target") # Z field - self.trends = header.group('trends') + self.trends = header.group("trends") - self.total = header.group('total') - self.spec_ver = header.group('spec_ver') + self.total = header.group("total") + self.spec_ver = header.group("spec_ver") # Datetime string in YYYYMMDDHHMM format - self.datetime = header.group('datetime') - + self.datetime = header.group("datetime") def checksum(self, text): """ Implemented by Anders Hammarquist for glucodump project More info: https://bitbucket.org/iko/glucodump/src/default/ """ - checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1] - return ('00' + checksum)[-2:] + checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1] + return ("00" + checksum)[-2:] def checkframe(self, frame): """ @@ -166,7 +171,7 @@ class ContourHidDevice(hiddevice.HidDevice): if not match: raise FrameError("Couldn't parse frame", frame) - recno = int(match.group('recno')) + recno = int(match.group("recno")) if self.currecno is None: self.currecno = recno @@ -174,18 +179,20 @@ class ContourHidDevice(hiddevice.HidDevice): return None if recno != self.currecno: - raise FrameError("Bad recno, got %r expected %r" % - (recno, self.currecno), - frame) - - checksum = self.checksum(match.group('check')) - if checksum != match.group('checksum'): - raise FrameError("Checksum error: got %s expected %s" % - (match.group('checksum'), checksum), - frame) + raise FrameError( + "Bad recno, got %r expected %r" % (recno, self.currecno), frame + ) + + checksum = self.checksum(match.group("check")) + if checksum != match.group("checksum"): + raise FrameError( + "Checksum error: got %s expected %s" + % (match.group("checksum"), checksum), + frame, + ) self.currecno = (self.currecno + 1) % 8 - return match.group('text') + return match.group("text") def connect(self): """Connecting the device, nothing to be done. @@ -198,15 +205,14 @@ class ContourHidDevice(hiddevice.HidDevice): self.state = self.mode_establish try: while True: - self.write('\x04') + self.write("\x04") res = self.read() if res[0] == 4 and res[-1] == 5: # we are connected and just got a header header_record = res.decode() - stx = header_record.find('\x02') + stx = header_record.find("\x02") if stx != -1: - result = _RECORD_FORMAT.match( - header_record[stx:]).group('text') + result = _RECORD_FORMAT.match(header_record[stx:]).group("text") self.parse_header_record(result) break else: @@ -251,7 +257,8 @@ class ContourHidDevice(hiddevice.HidDevice): int(datetime_str[6:8]), # day int(datetime_str[8:10]), # hour int(datetime_str[10:12]), # minute - 0) + 0, + ) def sync(self): """ @@ -261,7 +268,7 @@ class ContourHidDevice(hiddevice.HidDevice): """ self.state = self.mode_establish try: - tometer = '\x04' + tometer = "\x04" result = None foo = 0 while True: @@ -281,7 +288,7 @@ class ContourHidDevice(hiddevice.HidDevice): continue if data_bytes[-1] == 5: # got an <ENQ>, send <ACK> - tometer = '\x06' + tometer = "\x06" self.currecno = None continue if self.state == self.mode_data: @@ -289,18 +296,18 @@ class ContourHidDevice(hiddevice.HidDevice): # got an <EOT>, done self.state = self.mode_precommand break - stx = data.find('\x02') + stx = data.find("\x02") if stx != -1: # got <STX>, parse frame try: result = self.checkframe(data[stx:]) - tometer = '\x06' + tometer = "\x06" self.state = self.mode_data except FrameError as e: - tometer = '\x15' # Couldn't parse, <NAK> + tometer = "\x15" # Couldn't parse, <NAK> else: # Got something we don't understand, <NAK> it - tometer = '\x15' + tometer = "\x15" except Exception as e: raise e @@ -321,7 +328,7 @@ class ContourHidDevice(hiddevice.HidDevice): """ records_arr = [] for rec in self.sync(): - if rec[0] == 'R': + if rec[0] == "R": # parse using result record regular expression rec_text = self.parse_result_record(rec) # get dictionary to use in main driver module without import re diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py index b7b3d0f..1caa960 100644 --- a/glucometerutils/support/driver_base.py +++ b/glucometerutils/support/driver_base.py @@ -3,7 +3,6 @@ from datetime import datetime class GlucometerDriver(ABC): - def connect(self): pass diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index d48ac04..341e978 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -18,7 +18,7 @@ from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple import construct from glucometerutils import exceptions -from glucometerutils.support import hiddevice, driver_base +from glucometerutils.support import driver_base, hiddevice _INIT_COMMAND = 0x01 _INIT_RESPONSE = 0x71 @@ -30,54 +30,66 @@ _ENCRYPTION_SETUP_COMMAND = 0x14 _ENCRYPTION_SETUP_RESPONSE = 0x33 _ALWAYS_UNENCRYPTED_MESSAGES = ( - _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d, - _ENCRYPTION_SETUP_COMMAND, 0x15, - _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35, + _INIT_COMMAND, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + _ENCRYPTION_SETUP_COMMAND, + 0x15, + _ENCRYPTION_SETUP_RESPONSE, + 0x34, + 0x35, _INIT_RESPONSE, _KEEPALIVE_RESPONSE, ) + def _create_matcher(message_type, content): # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool] def _matcher(message): - return ( - message[0] == message_type and - (content is None or content == message[1])) + return message[0] == message_type and (content is None or content == message[1]) return _matcher -_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01') + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01") _is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None) -_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85') -_is_encryption_missing_error = _create_matcher( - _ENCRYPTION_SETUP_RESPONSE, b'\x15') -_is_encryption_setup_error = _create_matcher( - _ENCRYPTION_SETUP_RESPONSE, b'\x14') +_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") +_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") +_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") _FREESTYLE_MESSAGE = construct.Struct( - 'hid_report' / construct.Const(0, construct.Byte), - 'message_type' / construct.Byte, - 'command' / construct.Padded( + "hid_report" / construct.Const(0, construct.Byte), + "message_type" / construct.Byte, + "command" + / construct.Padded( 63, # command can only be up to 62 bytes, but one is used for length. - construct.Prefixed(construct.Byte, construct.GreedyBytes)), + construct.Prefixed(construct.Byte, construct.GreedyBytes), + ), ) _FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( - 'hid_report' / construct.Const(0, construct.Byte), - 'message_type' / construct.Byte, - 'command' / construct.Padded( + "hid_report" / construct.Const(0, construct.Byte), + "message_type" / construct.Byte, + "command" + / construct.Padded( 63, # command can only be up to 62 bytes, but one is used for length. - construct.GreedyBytes), + construct.GreedyBytes, + ), ) -_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)') +_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") _TEXT_REPLY_FORMAT = re.compile( - b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n' - b'CMD (?P<status>OK|Fail!)\r\n$', re.DOTALL) + b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n" + b"CMD (?P<status>OK|Fail!)\r\n$", + re.DOTALL, +) _MULTIRECORDS_FORMAT = re.compile( - '^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$', - re.DOTALL) + "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL +) def _verify_checksum(message, expected_checksum_hex): @@ -131,17 +143,18 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) TEXT_CMD = 0x60 TEXT_REPLY_CMD = 0x60 - USB_VENDOR_ID = 0x1a61 # type: int # Abbott Diabetes Care + USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care USB_PRODUCT_ID = None # type: int def connect(self): """Open connection to the device, starting the knocking sequence.""" - self._send_command(_INIT_COMMAND, b'') + self._send_command(_INIT_COMMAND, b"") response = self._read_response() if not _is_init_reply(response): raise exceptions.ConnectionFailed( - 'Connection error: unexpected message %02x:%s' % ( - response[0], response[1].hex())) + "Connection error: unexpected message %02x:%s" + % (response[0], response[1].hex()) + ) def disconnect(self): """Disconnect the device, nothing to be done.""" @@ -162,9 +175,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) meta_construct = _FREESTYLE_MESSAGE usb_packet = meta_construct.build( - {'message_type': message_type, 'command': command}) + {"message_type": message_type, "command": command} + ) - logging.debug('Sending packet: %r', usb_packet) + logging.debug("Sending packet: %r", usb_packet) self._write(usb_packet) def _read_response(self, encrypted=False): @@ -172,14 +186,14 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) """Read the response from the device and extracts it.""" usb_packet = self._read() - logging.debug('Read packet: %r', usb_packet) + logging.debug("Read packet: %r", usb_packet) assert usb_packet message_type = usb_packet[0] if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: message_length = usb_packet[1] - message_content = usb_packet[2:2+message_length] + message_content = usb_packet[2 : 2 + message_length] else: message_content = usb_packet[1:] @@ -195,15 +209,13 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) return self._read_response(encrypted=encrypted) if _is_unknown_message_error(message): - raise exceptions.CommandError('Invalid command') + raise exceptions.CommandError("Invalid command") if _is_encryption_missing_error(message): - raise exceptions.CommandError( - 'Device encryption not initialized.') + raise exceptions.CommandError("Device encryption not initialized.") if _is_encryption_setup_error(message): - raise exceptions.CommandError( - 'Device encryption initialization failed.') + raise exceptions.CommandError("Device encryption initialization failed.") return message @@ -213,18 +225,19 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) self._send_command(self.TEXT_CMD, command) # Reply can stretch multiple buffers - full_content = b'' + full_content = b"" while True: message_type, content = self._read_response() logging.debug( - 'Received message: type %02x content %s', - message_type, content.hex()) + "Received message: type %02x content %s", message_type, content.hex() + ) if message_type != self.TEXT_REPLY_CMD: raise exceptions.InvalidResponse( - 'Message type %02x does not match expectations: %r' % - (message_type, content)) + "Message type %02x does not match expectations: %r" + % (message_type, content) + ) full_content += content @@ -235,17 +248,17 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) if not match: raise exceptions.InvalidResponse(full_content) - message = match.group('message') - _verify_checksum(message, match.group('checksum')) + message = match.group("message") + _verify_checksum(message, match.group("checksum")) - if match.group('status') != b'OK': + if match.group("status") != b"OK": raise exceptions.InvalidResponse(message or "Command failed") # If there is anything in the response that is not ASCII-safe, this is # probably in the patient name. The Windows utility does not seem to # validate those, so just replace anything non-ASCII with the correct # unknown codepoint. - return message.decode('ascii', 'replace') + return message.decode("ascii", "replace") # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change @@ -253,16 +266,16 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) def _get_version(self): # type: () -> Text """Return the software version of the device.""" - return self._send_text_command(b'$swver?').rstrip('\r\n') + return self._send_text_command(b"$swver?").rstrip("\r\n") def get_serial_number(self): # type: () -> Text """Returns the serial number of the device.""" - return self._send_text_command(b'$serlnum?').rstrip('\r\n') + return self._send_text_command(b"$serlnum?").rstrip("\r\n") def get_patient_name(self): # type: () -> Optional[Text] - patient_name = self._send_text_command(b'$ptname?').rstrip('\r\n') + patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n") if not patient_name: return None return patient_name @@ -270,11 +283,11 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) def set_patient_name(self, name): # type: (Text) -> None try: - encoded_name = name.encode('ascii') + encoded_name = name.encode("ascii") except UnicodeDecodeError: - raise ValueError('Only ASCII-safe names are tested working') + raise ValueError("Only ASCII-safe names are tested working") - result = self._send_text_command(b'$ptname,' + encoded_name) + result = self._send_text_command(b"$ptname," + encoded_name) def get_datetime(self): # type: () -> datetime.datetime @@ -283,12 +296,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) This is one of the few commands that appear common to many of the FreeStyle devices that use the HID framing protocol. """ - date = self._send_text_command(b'$date?').rstrip('\r\n') - time = self._send_text_command(b'$time?').rstrip('\r\n') + date = self._send_text_command(b"$date?").rstrip("\r\n") + time = self._send_text_command(b"$time?").rstrip("\r\n") # Year is returned as an offset to 2000. - month, day, year = (int(x) for x in date.split(',')) - hour, minute = (int(x) for x in time.split(',')) + month, day, year = (int(x) for x in date.split(",")) + hour, minute = (int(x) for x in time.split(",")) # At least Precision Neo devices can have an invalid date (bad RTC?), # and report 255 for each field, which is not valid for @@ -304,10 +317,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) # The format used by the FreeStyle devices is not composable based on # standard strftime() (namely it includes no leading zeros), so we need # to build it manually. - date_cmd = '$date,{month},{day},{year}'.format( - month=date.month, day=date.day, year=(date.year-2000)) - time_cmd = '$time,{hour},{minute}'.format( - hour=date.hour, minute=date.minute) + date_cmd = "$date,{month},{day},{year}".format( + month=date.month, day=date.day, year=(date.year - 2000) + ) + time_cmd = "$time,{hour},{minute}".format(hour=date.hour, minute=date.minute) self._send_text_command(bytes(date_cmd, "ascii")) self._send_text_command(bytes(time_cmd, "ascii")) @@ -333,7 +346,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) in the record file. """ message = self._send_text_command(command) - logging.debug('Received multirecord message:\n%s', message) + logging.debug("Received multirecord message:\n%s", message) if message == "Log Empty\r\n": return iter(()) @@ -341,9 +354,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) if not match: raise exceptions.InvalidResponse(message) - records_str = match.group('message') - _verify_checksum(records_str, match.group('checksum')) + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) - logging.debug('Received multi-record string: %s', records_str) + logging.debug("Received multi-record string: %s", records_str) - return csv.reader(records_str.split('\r\n')) + return csv.reader(records_str.split("\r\n")) diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py index b44e5c7..ad8f3a6 100644 --- a/glucometerutils/support/hiddevice.py +++ b/glucometerutils/support/hiddevice.py @@ -45,33 +45,36 @@ class HidDevice: # type: (Optional[Text]) -> None if None in (self.USB_VENDOR_ID, self.USB_PRODUCT_ID) and not device: raise exceptions.CommandLineError( - '--device parameter is required, should point to a /dev/hidraw ' - 'device node representing the meter.') + "--device parameter is required, should point to a /dev/hidraw " + "device node representing the meter." + ) # If the user passed a device path that does not exist, raise an # error. This is to avoid writing to a file instead of to a device node. if device and not os.path.exists(device): raise exceptions.ConnectionFailed( - message='Path %s does not exist.' % device) + message="Path %s does not exist." % device + ) # If the user passed a device, try opening it. if device: - self.handle_ = open(device, 'w+b') # type: Optional[BinaryIO] + self.handle_ = open(device, "w+b") # type: Optional[BinaryIO] else: self.handle_ = None - logging.info( - 'No --device parameter provided, using hidapi library.') + logging.info("No --device parameter provided, using hidapi library.") try: import hid + self.hidapi_handle_ = hid.device() - self.hidapi_handle_.open( - self.USB_VENDOR_ID, self.USB_PRODUCT_ID) + self.hidapi_handle_.open(self.USB_VENDOR_ID, self.USB_PRODUCT_ID) except ImportError: raise exceptions.ConnectionFailed( - message='Missing requied "hidapi" module.') + message='Missing requied "hidapi" module.' + ) except OSError as e: raise exceptions.ConnectionFailed( - message='Unable to connect to meter: %s.' % e) + message="Unable to connect to meter: %s." % e + ) def _write(self, report): # type: (bytes) -> None @@ -95,5 +98,4 @@ class HidDevice: if self.handle_: return bytes(self.handle_.read(size)) - return bytes(self.hidapi_handle_.read( - size, timeout_ms=self.TIMEOUT_MS)) + return bytes(self.hidapi_handle_.read(size, timeout_ms=self.TIMEOUT_MS)) diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py index 19155d4..9340e49 100644 --- a/glucometerutils/support/lifescan.py +++ b/glucometerutils/support/lifescan.py @@ -8,22 +8,25 @@ from glucometerutils import exceptions class MissingChecksum(exceptions.InvalidResponse): """The response misses the expected 4-digits checksum.""" + def __init__(self, response): super(MissingChecksum, self).__init__( - 'Response is missing checksum: %s' % response) + "Response is missing checksum: %s" % response + ) class InvalidSerialNumber(exceptions.Error): """The serial number is not as expected.""" + def __init__(self, serial_number): super(InvalidSerialNumber, self).__init__( - 'Serial number %s is invalid.' % serial_number) + "Serial number %s is invalid." % serial_number + ) class MalformedCommand(exceptions.InvalidResponse): def __init__(self, message): - super(MalformedCommand, self).__init__( - 'Malformed command: %s' % message) + super(MalformedCommand, self).__init__("Malformed command: %s" % message) def crc_ccitt(data): @@ -39,13 +42,13 @@ def crc_ccitt(data): This function uses the non-default 0xFFFF seed as used by multiple LifeScan meters. """ - crc = 0xffff + crc = 0xFFFF for byte in data: - crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff + crc = (crc >> 8) & 0xFFFF | (crc << 8) & 0xFFFF crc ^= byte - crc ^= (crc & 0xff) >> 4 - crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff - crc ^= (crc & 0xff) << 5 + crc ^= (crc & 0xFF) >> 4 + crc ^= (((crc << 8) & 0xFFFF) << 4) & 0xFFFF + crc ^= (crc & 0xFF) << 5 - return crc & 0xffff + return crc & 0xFFFF diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py index 288da83..441226e 100644 --- a/glucometerutils/support/lifescan_binary_protocol.py +++ b/glucometerutils/support/lifescan_binary_protocol.py @@ -12,48 +12,51 @@ This module implements an interface to send and receive these messages. import construct from glucometerutils import common -from glucometerutils.support import construct_extras -from glucometerutils.support import lifescan +from glucometerutils.support import construct_extras, lifescan _LINK_CONTROL = construct.BitStruct( construct.Padding(3), - 'more' / construct.Default(construct.Flag, False), - 'disconnect' / construct.Default(construct.Flag, False), - 'acknowledge' / construct.Default(construct.Flag, False), - 'expect_receive' / construct.Default(construct.Flag, False), - 'sequence_number' / construct.Default(construct.Flag, False), + "more" / construct.Default(construct.Flag, False), + "disconnect" / construct.Default(construct.Flag, False), + "acknowledge" / construct.Default(construct.Flag, False), + "expect_receive" / construct.Default(construct.Flag, False), + "sequence_number" / construct.Default(construct.Flag, False), ) + def LifeScanPacket(include_link_control): # pylint: disable=invalid-name # type: (bool) -> construct.Struct if include_link_control: link_control_construct = _LINK_CONTROL else: - link_control_construct = construct.Const(b'\x00') + link_control_construct = construct.Const(b"\x00") return construct.Struct( - 'data' / construct.RawCopy( + "data" + / construct.RawCopy( construct.Struct( - construct.Const(b'\x02'), # stx - 'length' / construct.Rebuild( - construct.Byte, lambda this: len(this.message) + 6), - 'link_control' / link_control_construct, - 'message' / construct.Bytes( - lambda this: this.length - 6), - construct.Const(b'\x03'), # etx + construct.Const(b"\x02"), # stx + "length" + / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 6), + "link_control" / link_control_construct, + "message" / construct.Bytes(lambda this: this.length - 6), + construct.Const(b"\x03"), # etx ), ), - 'checksum' / construct.Checksum( - construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data), + "checksum" + / construct.Checksum( + construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data + ), ) + VERIO_TIMESTAMP = construct_extras.Timestamp( - construct.Int32ul, epoch=946684800) # 2000-01-01 (not 2010) + construct.Int32ul, epoch=946684800 +) # 2000-01-01 (not 2010) _GLUCOSE_UNIT_MAPPING_TABLE = { common.Unit.MG_DL: 0x00, common.Unit.MMOL_L: 0x01, } -GLUCOSE_UNIT = construct.Mapping( - construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE) +GLUCOSE_UNIT = construct.Mapping(construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE) diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py index 6067cf7..d9e80ea 100644 --- a/glucometerutils/support/serial.py +++ b/glucometerutils/support/serial.py @@ -8,7 +8,6 @@ import logging from typing import Optional, Text import serial - from glucometerutils import exceptions @@ -48,19 +47,23 @@ class SerialDevice: assert self.BAUDRATE is not None if not device and self.DEFAULT_CABLE_ID: - logging.info( - 'No --device parameter provided, looking for default cable.') - device = 'hwgrep://' + self.DEFAULT_CABLE_ID + logging.info("No --device parameter provided, looking for default cable.") + device = "hwgrep://" + self.DEFAULT_CABLE_ID if not device: raise exceptions.CommandLineError( - 'No --device parameter provided, and no default cable known.') + "No --device parameter provided, and no default cable known." + ) self.serial_ = serial.serial_for_url( device, baudrate=self.BAUDRATE, timeout=self.TIMEOUT, writeTimeout=None, - bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, - xonxoff=False, rtscts=False, dsrdtr=False) + xonxoff=False, + rtscts=False, + dsrdtr=False, + ) diff --git a/reversing_tools/abbott/extract_freestyle.py b/reversing_tools/abbott/extract_freestyle.py index 1227de1..8e1c0d2 100755 --- a/reversing_tools/abbott/extract_freestyle.py +++ b/reversing_tools/abbott/extract_freestyle.py @@ -28,48 +28,76 @@ import usbmon.pcapng _KEEPALIVE_TYPE = 0x22 _UNENCRYPTED_TYPES = ( - 0x01, 0x04, 0x05, 0x06, 0x0c, 0x0d, - 0x14, 0x15, - 0x33, 0x34, 0x35, + 0x01, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + 0x14, + 0x15, + 0x33, + 0x34, + 0x35, 0x71, _KEEPALIVE_TYPE, ) -_ABBOTT_VENDOR_ID = 0x1a61 +_ABBOTT_VENDOR_ID = 0x1A61 _LIBRE2_PRODUCT_ID = 0x3950 - def main(): if sys.version_info < (3, 7): - raise Exception( - 'Unsupported Python version, please use at least Python 3.7.') + raise Exception("Unsupported Python version, please use at least Python 3.7.") parser = argparse.ArgumentParser() parser.add_argument( - '--device_address', action='store', type=str, - help=('Device address (busnum.devnum) of the device to extract capture' - 'of. If none provided, device descriptors will be relied on.')) + "--device_address", + action="store", + type=str, + help=( + "Device address (busnum.devnum) of the device to extract capture" + "of. If none provided, device descriptors will be relied on." + ), + ) parser.add_argument( - '--encrypted_protocol', action='store_true', - help=('Whether to expect encrypted protocol in the capture.' - ' Ignored if the device descriptors are present in the capture.')) + "--encrypted_protocol", + action="store_true", + help=( + "Whether to expect encrypted protocol in the capture." + " Ignored if the device descriptors are present in the capture." + ), + ) parser.add_argument( - '--vlog', action='store', required=False, type=int, - help=('Python logging level. See the levels at ' - 'https://docs.python.org/3/library/logging.html#logging-levels')) + "--vlog", + action="store", + required=False, + type=int, + help=( + "Python logging level. See the levels at " + "https://docs.python.org/3/library/logging.html#logging-levels" + ), + ) parser.add_argument( - '--print_keepalive', action='store_true', - help=('Whether to print the keepalive messages sent by the device. ' - 'Keepalive messages are usually safely ignored.')) + "--print_keepalive", + action="store_true", + help=( + "Whether to print the keepalive messages sent by the device. " + "Keepalive messages are usually safely ignored." + ), + ) parser.add_argument( - 'pcap_file', action='store', type=str, - help='Path to the pcapng file with the USB capture.') + "pcap_file", + action="store", + type=str, + help="Path to the pcapng file with the USB capture.", + ) args = parser.parse_args() @@ -82,15 +110,18 @@ def main(): if descriptor.vendor_id == _ABBOTT_VENDOR_ID: if args.device_address and args.device_address != descriptor.address: raise Exception( - 'Multiple Abbott device present in capture, please' - ' provide a --device_address flag.') + "Multiple Abbott device present in capture, please" + " provide a --device_address flag." + ) args.device_address = descriptor.address descriptor = session.device_descriptors.get(args.device_address, None) if not descriptor: logging.warning( f"Unable to find device %s in the capture's descriptors." - " Assuming non-encrypted protocol.", args.device_address) + " Assuming non-encrypted protocol.", + args.device_address, + ) else: assert descriptor.vendor_id == _ABBOTT_VENDOR_ID @@ -102,15 +133,17 @@ def main(): if not first.type == usbmon.constants.PacketType.SUBMISSION: continue - if not first.address.startswith(f'{args.device_address}.'): + if not first.address.startswith(f"{args.device_address}."): # No need to check second, they will be linked. continue if first.xfer_type == usbmon.constants.XferType.INTERRUPT: pass - elif (first.xfer_type == usbmon.constants.XferType.CONTROL and - not first.setup_packet or - first.setup_packet.type == usbmon.setup.Type.CLASS): + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS + ): pass else: continue @@ -133,19 +166,19 @@ def main(): if args.encrypted_protocol and message_type not in _UNENCRYPTED_TYPES: # When expecting encrypted communication), we ignore the # message_length and we keep it with the whole message. - message_type = f'x{message_type:02x}' + message_type = f"x{message_type:02x}" message = packet.payload[1:] else: message_length = packet.payload[1] - message_type = f' {message_type:02x}' - message = packet.payload[2:2+message_length] - - print(usbmon.chatter.dump_bytes( - packet.direction, - message, - prefix=f'[{message_type}]', - print_empty=True, - ), '\n') + message_type = f" {message_type:02x}" + message = packet.payload[2 : 2 + message_length] + + print( + usbmon.chatter.dump_bytes( + packet.direction, message, prefix=f"[{message_type}]", print_empty=True, + ), + "\n", + ) if __name__ == "__main__": diff --git a/reversing_tools/abbott/freestyle_hid_console.py b/reversing_tools/abbott/freestyle_hid_console.py index 4697da9..5005e4a 100755 --- a/reversing_tools/abbott/freestyle_hid_console.py +++ b/reversing_tools/abbott/freestyle_hid_console.py @@ -9,22 +9,37 @@ import sys from glucometerutils import exceptions from glucometerutils.support import freestyle + def main(): parser = argparse.ArgumentParser() parser.add_argument( - '--text_cmd_type', action='store', type=int, default=0x60, - help='Message type for text commands sent to the device.') + "--text_cmd_type", + action="store", + type=int, + default=0x60, + help="Message type for text commands sent to the device.", + ) parser.add_argument( - '--text_reply_type', action='store', type=int, default=0x60, - help='Message type for text replies received from the device.') + "--text_reply_type", + action="store", + type=int, + default=0x60, + help="Message type for text replies received from the device.", + ) parser.add_argument( - 'device', action='store', - help='Path to the HID device to open.') + "device", action="store", help="Path to the HID device to open." + ) parser.add_argument( - '--vlog', action='store', required=False, type=int, - help=('Python logging level. See the levels at ' - 'https://docs.python.org/3/library/logging.html#logging-levels')) + "--vlog", + action="store", + required=False, + type=int, + help=( + "Python logging level. See the levels at " + "https://docs.python.org/3/library/logging.html#logging-levels" + ), + ) args = parser.parse_args() @@ -38,15 +53,16 @@ def main(): while True: if sys.stdin.isatty(): - command = input('>>> ') + command = input(">>> ") else: command = input() - print('>>> {command}'.format(command=command)) + print(">>> {command}".format(command=command)) try: - print(device._send_text_command(bytes(command, 'ascii'))) + print(device._send_text_command(bytes(command, "ascii"))) except exceptions.InvalidResponse as error: - print('! {error}'.format(error=error)) + print("! {error}".format(error=error)) + if __name__ == "__main__": main() diff --git a/test/test_common.py b/test/test_common.py index daa83c9..3e733e3 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -8,39 +8,35 @@ import datetime from absl.testing import parameterized - from glucometerutils import common class TestGlucoseConversion(parameterized.TestCase): - def test_convert_to_mmol(self): self.assertEqual( - 5.56, common.convert_glucose_unit( - 100, common.Unit.MG_DL, common.Unit.MMOL_L)) + 5.56, + common.convert_glucose_unit(100, common.Unit.MG_DL, common.Unit.MMOL_L), + ) def test_convert_to_mgdl(self): self.assertEqual( - 180, common.convert_glucose_unit( - 10, common.Unit.MMOL_L, common.Unit.MG_DL)) + 180, common.convert_glucose_unit(10, common.Unit.MMOL_L, common.Unit.MG_DL) + ) @parameterized.parameters(list(common.Unit)) def test_convert_identity(self, unit): - self.assertEqual( - 100, common.convert_glucose_unit( - 100, unit, unit)) + self.assertEqual(100, common.convert_glucose_unit(100, unit, unit)) @parameterized.parameters([unit.value for unit in common.Unit]) def test_convert_identity_str(self, unit_str): - self.assertEqual( - 100, common.convert_glucose_unit( - 100, unit_str, unit_str)) + self.assertEqual(100, common.convert_glucose_unit(100, unit_str, unit_str)) @parameterized.parameters( - (common.Unit.MMOL_L, 'foo'), - ('foo', common.Unit.MG_DL), + (common.Unit.MMOL_L, "foo"), + ("foo", common.Unit.MG_DL), (None, common.Unit.MG_DL), - (common.Meal.NONE, common.Unit.MG_DL)) + (common.Meal.NONE, common.Unit.MG_DL), + ) def test_invalid_values(self, from_unit, to_unit): with self.assertRaises(Exception): common.convert_glucose_unit(100, from_unit, to_unit) @@ -52,67 +48,76 @@ class TestGlucoseReading(parameterized.TestCase): def test_minimal(self): reading = common.GlucoseReading(self.TEST_DATETIME, 100) - self.assertEqual(reading.as_csv(common.Unit.MG_DL), - '"2018-01-01 00:30:45","100.00","","blood sample",""') + self.assertEqual( + reading.as_csv(common.Unit.MG_DL), + '"2018-01-01 00:30:45","100.00","","blood sample",""', + ) @parameterized.named_parameters( - ('_mgdl', common.Unit.MG_DL, 100), - ('_mmoll', common.Unit.MMOL_L, 5.56)) + ("_mgdl", common.Unit.MG_DL, 100), ("_mmoll", common.Unit.MMOL_L, 5.56) + ) def test_value(self, unit, expected_value): reading = common.GlucoseReading(self.TEST_DATETIME, 100) - self.assertAlmostEqual( - reading.get_value_as(unit), expected_value, places=2) + self.assertAlmostEqual(reading.get_value_as(unit), expected_value, places=2) @parameterized.named_parameters( - ('_meal_none', - {'meal': common.Meal.NONE}, - '"2018-01-01 00:30:45","100.00","","blood sample",""'), - ('_meal_before', - {'meal': common.Meal.BEFORE}, - '"2018-01-01 00:30:45","100.00","Before Meal","blood sample",""'), - ('_meal_after', - {'meal': common.Meal.AFTER}, - '"2018-01-01 00:30:45","100.00","After Meal","blood sample",""'), - ('_measurement_blood', - {'measure_method': common.MeasurementMethod.BLOOD_SAMPLE}, - '"2018-01-01 00:30:45","100.00","","blood sample",""'), - ('_measurement_cgm', - {'measure_method': common.MeasurementMethod.CGM}, - '"2018-01-01 00:30:45","100.00","","CGM",""'), - ('_comment', - {'comment': 'too much'}, - '"2018-01-01 00:30:45","100.00","","blood sample","too much"'), - ('_comment_quoted', - {'comment': '"too" much'}, - '"2018-01-01 00:30:45","100.00","","blood sample","\"too\" much"'), + ( + "_meal_none", + {"meal": common.Meal.NONE}, + '"2018-01-01 00:30:45","100.00","","blood sample",""', + ), + ( + "_meal_before", + {"meal": common.Meal.BEFORE}, + '"2018-01-01 00:30:45","100.00","Before Meal","blood sample",""', + ), + ( + "_meal_after", + {"meal": common.Meal.AFTER}, + '"2018-01-01 00:30:45","100.00","After Meal","blood sample",""', + ), + ( + "_measurement_blood", + {"measure_method": common.MeasurementMethod.BLOOD_SAMPLE}, + '"2018-01-01 00:30:45","100.00","","blood sample",""', + ), + ( + "_measurement_cgm", + {"measure_method": common.MeasurementMethod.CGM}, + '"2018-01-01 00:30:45","100.00","","CGM",""', + ), + ( + "_comment", + {"comment": "too much"}, + '"2018-01-01 00:30:45","100.00","","blood sample","too much"', + ), + ( + "_comment_quoted", + {"comment": '"too" much'}, + '"2018-01-01 00:30:45","100.00","","blood sample",""too" much"', + ), ) def test_csv(self, kwargs_dict, expected_csv): - reading = common.GlucoseReading( - self.TEST_DATETIME, 100, **kwargs_dict) + reading = common.GlucoseReading(self.TEST_DATETIME, 100, **kwargs_dict) self.assertEqual(reading.as_csv(common.Unit.MG_DL), expected_csv) class TestMeterInfo(parameterized.TestCase): - @parameterized.named_parameters( - ('_no_serial_number', - {}, - 'Serial Number: N/A\n'), - ('_serial_number', - {'serial_number': 1234}, - 'Serial Number: 1234\n'), - ('_no_version_information', - {}, - 'Version Information:\n N/A\n'), - ('_version_information_1', - {'version_info': ['test']}, - 'Version Information:\n test\n'), - ('_version_information_2', - {'version_info': ['test', 'test2']}, - 'Version Information:\n test\n test2\n'), - ('_default_native_unit', - {}, - 'Native Unit: mg/dL\n'), + ("_no_serial_number", {}, "Serial Number: N/A\n"), + ("_serial_number", {"serial_number": 1234}, "Serial Number: 1234\n"), + ("_no_version_information", {}, "Version Information:\n N/A\n"), + ( + "_version_information_1", + {"version_info": ["test"]}, + "Version Information:\n test\n", + ), + ( + "_version_information_2", + {"version_info": ["test", "test2"]}, + "Version Information:\n test\n test2\n", + ), + ("_default_native_unit", {}, "Native Unit: mg/dL\n"), ) def test_meter_info(self, kwargs_dict, expected_fragment): info = common.MeterInfo(self.id(), **kwargs_dict) diff --git a/test/test_construct_extras.py b/test/test_construct_extras.py index 476d6f8..6bba873 100644 --- a/test/test_construct_extras.py +++ b/test/test_construct_extras.py @@ -7,60 +7,66 @@ import datetime -from absl.testing import absltest import construct +from absl.testing import absltest from glucometerutils.support import construct_extras - _TEST_DATE1 = datetime.datetime(1970, 1, 2, 0, 0) _TEST_DATE2 = datetime.datetime(1971, 1, 1, 0, 0) _TEST_DATE3 = datetime.datetime(1970, 1, 1, 0, 0) _NEW_EPOCH = 31536000 # datetime.datetime(1971, 1, 1, 0, 0) -class TestTimestamp(absltest.TestCase): +class TestTimestamp(absltest.TestCase): def test_build_unix_epoch(self): self.assertEqual( construct_extras.Timestamp(construct.Int32ul).build(_TEST_DATE1), - b'\x80\x51\x01\x00') + b"\x80\x51\x01\x00", + ) def test_parse_unix_epoch(self): self.assertEqual( - construct_extras.Timestamp(construct.Int32ul).parse( - b'\x803\xe1\x01'), - _TEST_DATE2) + construct_extras.Timestamp(construct.Int32ul).parse(b"\x803\xe1\x01"), + _TEST_DATE2, + ) def test_build_custom_epoch(self): self.assertEqual( - construct_extras.Timestamp( - construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE2), - b'\x00\x00\x00\x00') + construct_extras.Timestamp(construct.Int32ul, epoch=_NEW_EPOCH).build( + _TEST_DATE2 + ), + b"\x00\x00\x00\x00", + ) def test_parse_custom_epoch(self): self.assertEqual( - construct_extras.Timestamp( - construct.Int32ul, epoch=_NEW_EPOCH).parse( - b'\x00\x00\x00\x00'), - _TEST_DATE2) + construct_extras.Timestamp(construct.Int32ul, epoch=_NEW_EPOCH).parse( + b"\x00\x00\x00\x00" + ), + _TEST_DATE2, + ) def test_build_custom_epoch_negative_failure(self): with self.assertRaises(construct.core.FormatFieldError): - construct_extras.Timestamp( - construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE1) + construct_extras.Timestamp(construct.Int32ul, epoch=_NEW_EPOCH).build( + _TEST_DATE1 + ) def test_build_custom_epoch_negative_success(self): self.assertEqual( - construct_extras.Timestamp( - construct.Int32sl, epoch=_NEW_EPOCH).build(_TEST_DATE1), - b'\x00\x1e\x20\xfe') + construct_extras.Timestamp(construct.Int32sl, epoch=_NEW_EPOCH).build( + _TEST_DATE1 + ), + b"\x00\x1e\x20\xfe", + ) def test_build_varint(self): self.assertEqual( - construct_extras.Timestamp(construct.VarInt).build(_TEST_DATE3), - b'\x00') + construct_extras.Timestamp(construct.VarInt).build(_TEST_DATE3), b"\x00" + ) def test_invalid_value(self): with self.assertRaises(AssertionError): - construct_extras.Timestamp(construct.Int32ul).build('foo') + construct_extras.Timestamp(construct.Int32ul).build("foo") diff --git a/test/test_contourusb.py b/test/test_contourusb.py index e2fb6cb..3b8b547 100644 --- a/test/test_contourusb.py +++ b/test/test_contourusb.py @@ -5,62 +5,57 @@ # pylint: disable=protected-access,missing-docstring -from absl.testing import absltest - -from glucometerutils.support import contourusb - from unittest.mock import Mock +from absl.testing import absltest +from glucometerutils.support import contourusb class TestContourUSB(absltest.TestCase): - header_record = b'\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05' - + header_record = b"\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05" + mock_dev = Mock() def test_get_datetime(self): import datetime - self.datetime = "201908071315" # returned by + self.datetime = "201908071315" # returned by self.assertEqual( - datetime.datetime(2019,8,7,13,15), - contourusb.ContourHidDevice.get_datetime(self) + datetime.datetime(2019, 8, 7, 13, 15), + contourusb.ContourHidDevice.get_datetime(self), ) - def test_RECORD_FORMAT_match(self): - #first decode the header record frame + # first decode the header record frame header_record_decoded = self.header_record.decode() - stx = header_record_decoded.find('\x02') + stx = header_record_decoded.find("\x02") _RECORD_FORMAT = contourusb._RECORD_FORMAT - result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text') + result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group("text") self.assertEqual( - "H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304", - result + "H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304", + result, ) def test_parse_header_record(self): - + _HEADER_RECORD_RE = contourusb._HEADER_RECORD_RE _RECORD_FORMAT = contourusb._RECORD_FORMAT - header_record_decoded = self.header_record.decode() - stx = header_record_decoded.find('\x02') + stx = header_record_decoded.find("\x02") - - result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text') - contourusb.ContourHidDevice.parse_header_record(self.mock_dev,result) + result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group("text") + contourusb.ContourHidDevice.parse_header_record(self.mock_dev, result) self.assertEqual(self.mock_dev.field_del, "\\") self.assertEqual(self.mock_dev.repeat_del, "^") self.assertEqual(self.mock_dev.component_del, "&") self.assertEqual(self.mock_dev.escape_del, "|") - self.assertEqual(self.mock_dev.product_code, "Bayer7390") + self.assertEqual(self.mock_dev.product_code, "Bayer7390") self.assertEqual(self.mock_dev.dig_ver, "01.24") self.assertEqual(self.mock_dev.anlg_ver, "01.04") @@ -100,18 +95,20 @@ class TestContourUSB(absltest.TestCase): self.assertEqual(self.mock_dev.datetime, "201909221304") - #TO-DO checksum and checkframe unit tests + # TO-DO checksum and checkframe unit tests def test_parse_result_record(self): - #first decode the header record frame + # first decode the header record frame result_record = "R|8|^^^Glucose|133|mg/dL^P||B/X||201202052034" - result_dict = contourusb.ContourHidDevice.parse_result_record(self.mock_dev, result_record) - - self.assertEqual(result_dict['record_type'], 'R') - self.assertEqual(result_dict['seq_num'], '8') - self.assertEqual(result_dict['test_id'], 'Glucose') - self.assertEqual(result_dict['value'], '133') - self.assertEqual(result_dict['unit'], 'mg/dL') - self.assertEqual(result_dict['ref_method'], 'P') - self.assertEqual(result_dict['markers'], 'B/X') - self.assertEqual(result_dict['datetime'], '201202052034')
\ No newline at end of file + result_dict = contourusb.ContourHidDevice.parse_result_record( + self.mock_dev, result_record + ) + + self.assertEqual(result_dict["record_type"], "R") + self.assertEqual(result_dict["seq_num"], "8") + self.assertEqual(result_dict["test_id"], "Glucose") + self.assertEqual(result_dict["value"], "133") + self.assertEqual(result_dict["unit"], "mg/dL") + self.assertEqual(result_dict["ref_method"], "P") + self.assertEqual(result_dict["markers"], "B/X") + self.assertEqual(result_dict["datetime"], "201202052034") diff --git a/test/test_freestyle.py b/test/test_freestyle.py index 0ff669d..fb3f3b9 100644 --- a/test/test_freestyle.py +++ b/test/test_freestyle.py @@ -6,18 +6,17 @@ # pylint: disable=protected-access,missing-docstring from absl.testing import absltest - from glucometerutils.support import freestyle class TestFreeStyle(absltest.TestCase): - def test_outgoing_command(self): """Test the generation of a new outgoing message.""" self.assertEqual( - b'\0\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0' - b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0', + b"\0\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" + b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", freestyle._FREESTYLE_MESSAGE.build( - {'message_type': 23, 'command': b'command'}), + {"message_type": 23, "command": b"command"} + ), ) diff --git a/test/test_fsoptium.py b/test/test_fsoptium.py index d9a1ccb..bdc76f8 100644 --- a/test/test_fsoptium.py +++ b/test/test_fsoptium.py @@ -8,31 +8,24 @@ import datetime from absl.testing import parameterized - -from glucometerutils.drivers import fsoptium from glucometerutils import exceptions +from glucometerutils.drivers import fsoptium class TestFreeStyleOptium(parameterized.TestCase): - @parameterized.parameters( - ('Clock:\tApr 22 2014\t02:14:37', - datetime.datetime(2014, 4, 22, 2, 14, 37)), - ('Clock:\tJul 10 2013\t14:26:44', - datetime.datetime(2013, 7, 10, 14, 26, 44)), - ('Clock:\tSep 29 2013\t17:35:34', - datetime.datetime(2013, 9, 29, 17, 35, 34)), + ("Clock:\tApr 22 2014\t02:14:37", datetime.datetime(2014, 4, 22, 2, 14, 37)), + ("Clock:\tJul 10 2013\t14:26:44", datetime.datetime(2013, 7, 10, 14, 26, 44)), + ("Clock:\tSep 29 2013\t17:35:34", datetime.datetime(2013, 9, 29, 17, 35, 34)), ) def test_parse_clock(self, datestr, datevalue): - self.assertEqual( - fsoptium._parse_clock(datestr), - datevalue) + self.assertEqual(fsoptium._parse_clock(datestr), datevalue) @parameterized.parameters( - ('Apr 22 2014 02:14:37',), - ('Clock:\tXxx 10 2013\t14:26',), - ('Clock:\tSep 29 2013\t17:35:22.34',), - ('Foo',) + ("Apr 22 2014 02:14:37",), + ("Clock:\tXxx 10 2013\t14:26",), + ("Clock:\tSep 29 2013\t17:35:22.34",), + ("Foo",), ) def test_parse_clock_invalid(self, datestr): with self.assertRaises(exceptions.InvalidResponse): diff --git a/test/test_lifescan.py b/test/test_lifescan.py index 3dc0aa6..b50b5d6 100755 --- a/test/test_lifescan.py +++ b/test/test_lifescan.py @@ -8,18 +8,13 @@ import array from absl.testing import absltest - from glucometerutils.support import lifescan class TestChecksum(absltest.TestCase): def test_crc(self): - self.assertEqual( - 0x41cd, - lifescan.crc_ccitt(b'\x02\x06\x06\x03')) + self.assertEqual(0x41CD, lifescan.crc_ccitt(b"\x02\x06\x06\x03")) def test_crc_array(self): - cmd_array = array.array('B', b'\x02\x06\x08\x03') - self.assertEqual( - 0x62C2, - lifescan.crc_ccitt(cmd_array)) + cmd_array = array.array("B", b"\x02\x06\x08\x03") + self.assertEqual(0x62C2, lifescan.crc_ccitt(cmd_array)) diff --git a/test/test_otultra2.py b/test/test_otultra2.py index 6bea440..6b36602 100644 --- a/test/test_otultra2.py +++ b/test/test_otultra2.py @@ -8,36 +8,36 @@ from unittest import mock from absl.testing import parameterized - +from glucometerutils import exceptions from glucometerutils.drivers import otultra2 from glucometerutils.support import lifescan -from glucometerutils import exceptions class TestOTUltra2(parameterized.TestCase): - def test_checksum(self): - checksum = otultra2._calculate_checksum(b'T') + checksum = otultra2._calculate_checksum(b"T") self.assertEqual(0x0054, checksum) def test_checksum_full(self): - checksum = otultra2._calculate_checksum( - b'T "SAT","08/03/13","22:12:00 "') + checksum = otultra2._calculate_checksum(b'T "SAT","08/03/13","22:12:00 "') self.assertEqual(0x0608, checksum) @parameterized.named_parameters( - ('_missing_checksum', b'INVALID', lifescan.MissingChecksum), - ('_short', b'.\r', exceptions.InvalidResponse), - ('_generic', b'% 2500\r', exceptions.InvalidResponse), - ('_invalid_serial_number', b'@ "12345678O" 0297\r', - lifescan.InvalidSerialNumber), - ('_invalid_checksum', b'% 1337\r', exceptions.InvalidChecksum), - ('_broken_checksum', b'% 13AZ\r', lifescan.MissingChecksum), + ("_missing_checksum", b"INVALID", lifescan.MissingChecksum), + ("_short", b".\r", exceptions.InvalidResponse), + ("_generic", b"% 2500\r", exceptions.InvalidResponse), + ( + "_invalid_serial_number", + b'@ "12345678O" 0297\r', + lifescan.InvalidSerialNumber, + ), + ("_invalid_checksum", b"% 1337\r", exceptions.InvalidChecksum), + ("_broken_checksum", b"% 13AZ\r", lifescan.MissingChecksum), ) def test_invalid_response(self, returned_string, expected_exception): - with mock.patch('serial.Serial') as mock_serial: + with mock.patch("serial.Serial") as mock_serial: mock_serial.return_value.readline.return_value = returned_string - device = otultra2.Device('mockdevice') + device = otultra2.Device("mockdevice") with self.assertRaises(expected_exception): device.get_serial_number() diff --git a/test/test_otultraeasy.py b/test/test_otultraeasy.py index 252b7c7..c6fce83 100644 --- a/test/test_otultraeasy.py +++ b/test/test_otultraeasy.py @@ -6,19 +6,18 @@ # pylint: disable=protected-access,missing-docstring from absl.testing import absltest - from glucometerutils.drivers import otultraeasy class ConstructTest(absltest.TestCase): - def test_make_packet_ack(self): self.assertEqual( - b'\x02\x06\x08\x03\xc2\x62', - otultraeasy._make_packet(b'', False, False, False, True)) + b"\x02\x06\x08\x03\xc2\x62", + otultraeasy._make_packet(b"", False, False, False, True), + ) def test_make_packet_version_request(self): self.assertEqual( - b'\x02\x09\x03\x05\x0d\x02\x03\x08\x9f', - otultraeasy._make_packet( - b'\x05\x0d\x02', True, True, False, False)) + b"\x02\x09\x03\x05\x0d\x02\x03\x08\x9f", + otultraeasy._make_packet(b"\x05\x0d\x02", True, True, False, False), + ) diff --git a/test/test_td4277.py b/test/test_td4277.py index 6f4ff9a..fbd4aa2 100644 --- a/test/test_td4277.py +++ b/test/test_td4277.py @@ -8,24 +8,21 @@ import datetime from absl.testing import parameterized - +from glucometerutils import exceptions from glucometerutils.drivers import td4277 from glucometerutils.support import lifescan -from glucometerutils import exceptions class TestTD4277Nexus(parameterized.TestCase): - @parameterized.parameters( - (b'\x21\x24\x0e\x15', datetime.datetime(2018, 1, 1, 21, 14)), - (b'\x21\x26\x0e\x15', datetime.datetime(2019, 1, 1, 21, 14)), - (b'\x04\x27\x25\x0d', datetime.datetime(2019, 8, 4, 13, 37)), + (b"\x21\x24\x0e\x15", datetime.datetime(2018, 1, 1, 21, 14)), + (b"\x21\x26\x0e\x15", datetime.datetime(2019, 1, 1, 21, 14)), + (b"\x04\x27\x25\x0d", datetime.datetime(2019, 8, 4, 13, 37)), ) def test_parse_datetime(self, message, date): - self.assertEqual(td4277._parse_datetime(message), - date) + self.assertEqual(td4277._parse_datetime(message), date) def test_making_message(self): self.assertEqual( - td4277._make_packet(0x22, 0), - b'\x51\x22\x00\x00\x00\x00\xa3\x16') + td4277._make_packet(0x22, 0), b"\x51\x22\x00\x00\x00\x00\xa3\x16" + ) |