diff options
author | Carl <Kraken.rf.inc@gmail.com> | 2021-12-23 03:10:19 +0100 |
---|---|---|
committer | Carl <Kraken.rf.inc@gmail.com> | 2021-12-23 03:10:19 +0100 |
commit | efaaf39f79d81cd740a9f741e6cf4815c49c3093 (patch) | |
tree | 5a4fb7caaddef5f228f6833260ba7e7b44345051 /_receiver | |
parent | Initial commit (diff) | |
download | krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.gz krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.bz2 krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.lz krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.xz krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.zst krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.zip |
Diffstat (limited to '_receiver')
-rwxr-xr-x | _receiver/iq_header.py | 152 | ||||
-rwxr-xr-x | _receiver/krakenSDR_receiver.py | 375 | ||||
-rwxr-xr-x | _receiver/shmemIface.py | 195 |
3 files changed, 722 insertions, 0 deletions
diff --git a/_receiver/iq_header.py b/_receiver/iq_header.py new file mode 100755 index 0000000..f439019 --- /dev/null +++ b/_receiver/iq_header.py @@ -0,0 +1,152 @@ +from struct import pack,unpack +import logging +import sys +""" + Desctiption: IQ Frame header definition + For header field description check the corresponding documentation + Total length: 1024 byte + Project: HeIMDALL RTL + Author: Tamás Pető + Status: Finished + Version history: + 1 : Initial version (2019 04 23) + 2 : Fixed 1024 byte length (2019 07 25) + 3 : Noise source state (2019 10 01) + 4 : IQ sync flag (2019 10 21) + 5 : Sync state (2019 11 10) + 6 : Unix Epoch timestamp (2019 12 17) + 6a: Frame type defines (2020 03 19) + 7 : Sync word (2020 05 03) +""" +class IQHeader(): + + FRAME_TYPE_DATA = 0 + FRAME_TYPE_DUMMY = 1 + FRAME_TYPE_RAMP = 2 + FRAME_TYPE_CAL = 3 + FRAME_TYPE_TRIGW = 4 + + SYNC_WORD = 0x2bf7b95a + + def __init__(self): + + self.logger = logging.getLogger(__name__) + self.header_size = 1024 # size in bytes + self.reserved_bytes = 192 + + self.sync_word=self.SYNC_WORD # uint32_t + self.frame_type=0 # uint32_t + self.hardware_id="" # char [16] + self.unit_id=0 # uint32_t + self.active_ant_chs=0 # uint32_t + self.ioo_type=0 # uint32_t + self.rf_center_freq=0 # uint64_t + self.adc_sampling_freq=0 # uint64_t + self.sampling_freq=0 # uint64_t + self.cpi_length=0 # uint32_t + self.time_stamp=0 # uint64_t + self.daq_block_index=0 # uint32_t + self.cpi_index=0 # uint32_t + self.ext_integration_cntr=0 # uint64_t + self.data_type=0 # uint32_t + self.sample_bit_depth=0 # uint32_t + self.adc_overdrive_flags=0 # uint32_t + self.if_gains=[0]*32 # uint32_t x 32 + self.delay_sync_flag=0 # uint32_t + self.iq_sync_flag=0 # uint32_t + self.sync_state=0 # uint32_t + self.noise_source_state=0 # uint32_t + self.reserved=[0]*self.reserved_bytes# uint32_t x reserverd_bytes + self.header_version=0 # uint32_t + + def decode_header(self, iq_header_byte_array): + """ + Unpack,decode and store the content of the iq header + """ + iq_header_list = unpack("II16sIIIQQQIQIIQIII"+"I"*32+"IIII"+"I"*self.reserved_bytes+"I", iq_header_byte_array) + + self.sync_word = iq_header_list[0] + self.frame_type = iq_header_list[1] + self.hardware_id = iq_header_list[2].decode() + self.unit_id = iq_header_list[3] + self.active_ant_chs = iq_header_list[4] + self.ioo_type = iq_header_list[5] + self.rf_center_freq = iq_header_list[6] + self.adc_sampling_freq = iq_header_list[7] + self.sampling_freq = iq_header_list[8] + self.cpi_length = iq_header_list[9] + self.time_stamp = iq_header_list[10] + self.daq_block_index = iq_header_list[11] + self.cpi_index = iq_header_list[12] + self.ext_integration_cntr = iq_header_list[13] + self.data_type = iq_header_list[14] + self.sample_bit_depth = iq_header_list[15] + self.adc_overdrive_flags = iq_header_list[16] + self.if_gains = iq_header_list[17:49] + self.delay_sync_flag = iq_header_list[49] + self.iq_sync_flag = iq_header_list[50] + self.sync_state = iq_header_list[51] + self.noise_source_state = iq_header_list[52] + self.header_version = iq_header_list[52+self.reserved_bytes+1] + + def encode_header(self): + """ + Pack the iq header information into a byte array + """ + iq_header_byte_array=pack("II", self.sync_word, self.frame_type) + iq_header_byte_array+=self.hardware_id.encode()+bytearray(16-len(self.hardware_id.encode())) + iq_header_byte_array+=pack("IIIQQQIQIIQIII", + self.unit_id, self.active_ant_chs, self.ioo_type, self.rf_center_freq, self.adc_sampling_freq, + self.sampling_freq, self.cpi_length, self.time_stamp, self.daq_block_index, self.cpi_index, + self.ext_integration_cntr, self.data_type, self.sample_bit_depth, self.adc_overdrive_flags) + for m in range(32): + iq_header_byte_array+=pack("I", self.if_gains[m]) + + iq_header_byte_array+=pack("I", self.delay_sync_flag) + iq_header_byte_array+=pack("I", self.iq_sync_flag) + iq_header_byte_array+=pack("I", self.sync_state) + iq_header_byte_array+=pack("I", self.noise_source_state) + + for m in range(self.reserved_bytes): + iq_header_byte_array+=pack("I",0) + + iq_header_byte_array+=pack("I", self.header_version) + return iq_header_byte_array + + def dump_header(self): + """ + Prints out the content of the header in human readable format + """ + self.logger.info("Sync word: {:d}".format(self.sync_word)) + self.logger.info("Header version: {:d}".format(self.header_version)) + self.logger.info("Frame type: {:d}".format(self.frame_type)) + self.logger.info("Hardware ID: {:16}".format(self.hardware_id)) + self.logger.info("Unit ID: {:d}".format(self.unit_id)) + self.logger.info("Active antenna channels: {:d}".format(self.active_ant_chs)) + self.logger.info("Illuminator type: {:d}".format(self.ioo_type)) + self.logger.info("RF center frequency: {:.2f} MHz".format(self.rf_center_freq/10**6)) + self.logger.info("ADC sampling frequency: {:.2f} MHz".format(self.adc_sampling_freq/10**6)) + self.logger.info("IQ sampling frequency {:.2f} MHz".format(self.sampling_freq/10**6)) + self.logger.info("CPI length: {:d}".format(self.cpi_length)) + self.logger.info("Unix Epoch timestamp: {:d}".format(self.time_stamp)) + self.logger.info("DAQ block index: {:d}".format(self.daq_block_index)) + self.logger.info("CPI index: {:d}".format(self.cpi_index)) + self.logger.info("Extended integration counter {:d}".format(self.ext_integration_cntr)) + self.logger.info("Data type: {:d}".format(self.data_type)) + self.logger.info("Sample bit depth: {:d}".format(self.sample_bit_depth)) + self.logger.info("ADC overdrive flags: {:d}".format(self.adc_overdrive_flags)) + for m in range(32): + self.logger.info("Ch: {:d} IF gain: {:.1f} dB".format(m, self.if_gains[m]/10)) + self.logger.info("Delay sync flag: {:d}".format(self.delay_sync_flag)) + self.logger.info("IQ sync flag: {:d}".format(self.iq_sync_flag)) + self.logger.info("Sync state: {:d}".format(self.sync_state)) + self.logger.info("Noise source state: {:d}".format(self.noise_source_state)) + + def check_sync_word(self): + """ + Check the sync word of the header + """ + if self.sync_word != self.SYNC_WORD: + return -1 + else: + return 0 diff --git a/_receiver/krakenSDR_receiver.py b/_receiver/krakenSDR_receiver.py new file mode 100755 index 0000000..a9e2d63 --- /dev/null +++ b/_receiver/krakenSDR_receiver.py @@ -0,0 +1,375 @@ +# KrakenSDR Receiver + +# Copyright (C) 2018-2021 Carl Laufer, Tamás Pető +# +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# + +# -*- coding: utf-8 -*- + +# Import built-in modules +import sys +import os +import time +from struct import pack, unpack +import socket +import _thread +from threading import Lock +import queue +import logging +#import copy + +# Import third party modules +import numpy as np +from scipy import signal +from iq_header import IQHeader +from shmemIface import inShmemIface +class ReceiverRTLSDR(): + + def __init__(self, data_que, data_interface = "eth", logging_level=10): + """ + Parameter: + ---------- + :param: data_que: Que to communicate with the UI (web iface/Qt GUI) + :param: data_interface: This field is configured by the GUI during instantiation. + Valid values are the followings: + "eth" : The module will receiver IQ frames through an Ethernet connection + "shmem": The module will receiver IQ frames through a shared memory interface + :type : data_interface: string + """ + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging_level) + + # DAQ parameters + # These values are used by default to configure the DAQ through the configuration interface + # Values are configured externally upon configuration request + self.daq_center_freq = 100 # MHz + self.daq_rx_gain = 0 # [dB] + self.daq_squelch_th_dB = 0 + + # UI interface + self.data_que = data_que + + # IQ data interface + self.data_interface = data_interface + + # -> Ethernet + self.receiver_connection_status = False + self.port = 5000 + self.rec_ip_addr = "127.0.0.1" # Configured by the GUI prior to connection request + self.socket_inst = socket.socket() + self.receiverBufferSize = 2 ** 18 # Size of the Ethernet receiver buffer measured in bytes + + # -> Shared memory + root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + daq_path = os.path.join(os.path.dirname(root_path),"heimdall_daq_fw") + self.daq_shmem_control_path = os.path.join(os.path.join(daq_path,"Firmware"),"_data_control/") + self.init_data_iface() + + # Control interface + self.ctr_iface_socket = socket.socket() + self.ctr_iface_port = 5001 + self.ctr_iface_thread_lock = Lock() # Used to synchronize the operation of the ctr_iface thread + + self.iq_frame_bytes = None + self.iq_samples = None + self.iq_header = IQHeader() + self.M = 0 # Number of receiver channels, updated after establishing connection + + def init_data_iface(self): + if self.data_interface == "shmem": + # Open shared memory interface to capture the DAQ firmware output + self.in_shmem_iface = inShmemIface("delay_sync_iq", self.daq_shmem_control_path) + if not self.in_shmem_iface.init_ok: + self.logger.critical("Shared memory initialization failed") + self.in_shmem_iface.destory_sm_buffer() + return -1 + return 0 + + + def eth_connect(self): + """ + Compatible only with DAQ firmwares that has the IQ streaming mode. + HeIMDALL DAQ Firmware version: 1.0 or later + """ + try: + if not self.receiver_connection_status: + if self.data_interface == "eth": + # Establlish IQ data interface connection + self.socket_inst.connect((self.rec_ip_addr, self.port)) + self.socket_inst.sendall(str.encode('streaming')) + test_iq = self.receive_iq_frame() + + self.M = self.iq_header.active_ant_chs + + # Establish control interface connection + self.ctr_iface_socket.connect((self.rec_ip_addr, self.ctr_iface_port)) + self.receiver_connection_status = True + self.ctr_iface_init() + self.logger.info("CTR INIT Center freq: {0}".format(self.daq_center_freq)) + self.set_center_freq(self.daq_center_freq) + self.set_if_gain(self.daq_rx_gain) + self.set_squelch_threshold(self.daq_squelch_th_dB) + except: + errorMsg = sys.exc_info()[0] + self.logger.error("Error message: "+str(errorMsg)) + self.receiver_connection_status = False + self.logger.error("Unexpected error: {0}".format(sys.exc_info()[0])) + + # Re-instantiating sockets + self.socket_inst = socket.socket() + self.ctr_iface_socket = socket.socket() + return -1 + + self.logger.info("Connection established") + que_data_packet = [] + que_data_packet.append(['conn-ok',]) + self.data_que.put(que_data_packet) + + def eth_close(self): + """ + Close Ethernet conenctions including the IQ data and the control interfaces + """ + try: + if self.receiver_connection_status: + if self.data_interface == "eth": + self.socket_inst.sendall(str.encode('q')) # Send exit message + self.socket_inst.close() + self.socket_inst = socket.socket() # Re-instantiating socket + + # Close control interface connection + exit_message_bytes=("EXIT".encode()+bytearray(124)) + self.ctr_iface_socket.send(exit_message_bytes) + self.ctr_iface_socket.close() + self.ctr_iface_socket = socket.socket() + + self.receiver_connection_status = False + que_data_packet = [] + que_data_packet.append(['disconn-ok',]) + self.data_que.put(que_data_packet) + except: + errorMsg = sys.exc_info()[0] + self.logger.error("Error message: {0}".format(errorMsg)) + return -1 + + if self.data_interface == "shmem": + self.in_shmem_iface.destory_sm_buffer() + + return 0 + + def get_iq_online(self): + """ + This function obtains a new IQ data frame through the Ethernet IQ data or the shared memory interface + """ + + # Check connection + if not self.receiver_connection_status: + fail = self.eth_connect() + if fail: + return -1 + + if self.data_interface == "eth": + self.socket_inst.sendall(str.encode("IQDownload")) # Send iq request command + self.iq_samples = self.receive_iq_frame() + + elif self.data_interface == "shmem": + active_buff_index = self.in_shmem_iface.wait_buff_free() + if active_buff_index < 0 or active_buff_index > 1: + self.logger.info("Terminating.., signal: {:d}".format(active_buff_index)) + return -1 + + buffer = self.in_shmem_iface.buffers[active_buff_index] + iq_header_bytes = buffer[0:1024].tobytes() + self.iq_header.decode_header(iq_header_bytes) + + # Inititalization from header - Set channel numbers + if self.M == 0: + self.M = self.iq_header.active_ant_chs + + incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8) + if incoming_payload_size > 0: + iq_samples_in = (buffer[1024:1024 + incoming_payload_size].view(dtype=np.complex64))\ + .reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length) + self.iq_samples = iq_samples_in.copy() # Must be .copy + + self.in_shmem_iface.send_ctr_buff_ready(active_buff_index) + + def receive_iq_frame(self): + """ + Called by the get_iq_online function. Receives IQ samples over the establed Ethernet connection + """ + total_received_bytes = 0 + recv_bytes_count = 0 + iq_header_bytes = bytearray(self.iq_header.header_size) # allocate array + view = memoryview(iq_header_bytes) # Get buffer + + self.logger.debug("Starting IQ header reception") + + while total_received_bytes < self.iq_header.header_size: + # Receive into buffer + recv_bytes_count = self.socket_inst.recv_into(view, self.iq_header.header_size-total_received_bytes) + view = view[recv_bytes_count:] # reset memory region + total_received_bytes += recv_bytes_count + + self.iq_header.decode_header(iq_header_bytes) + # Uncomment to check the content of the IQ header + #self.iq_header.dump_header() + + incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8) + if incoming_payload_size > 0: + # Calculate total bytes to receive from the iq header data + total_bytes_to_receive = incoming_payload_size + receiver_buffer_size = 2**18 + + self.logger.debug("Total bytes to receive: {:d}".format(total_bytes_to_receive)) + + total_received_bytes = 0 + recv_bytes_count = 0 + iq_data_bytes = bytearray(total_bytes_to_receive + receiver_buffer_size) # allocate array + view = memoryview(iq_data_bytes) # Get buffer + + while total_received_bytes < total_bytes_to_receive: + # Receive into buffer + recv_bytes_count = self.socket_inst.recv_into(view, receiver_buffer_size) + view = view[recv_bytes_count:] # reset memory region + total_received_bytes += recv_bytes_count + + self.logger.debug(" IQ data succesfully received") + + # Convert raw bytes to Complex float64 IQ samples + self.iq_samples = np.frombuffer(iq_data_bytes[0:total_bytes_to_receive], dtype=np.complex64).reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length) + + self.iq_frame_bytes = bytearray()+iq_header_bytes+iq_data_bytes + return self.iq_samples + else: + return 0 + + def set_squelch_threshold(self, threshold_dB): + """ + Configures the threshold level of the squelch module in the DAQ FW through the control interface + """ + if self.receiver_connection_status: # Check connection + self.daq_squelch_th_dB = threshold_dB + if threshold_dB == -80: threshold = 0 + else: threshold = 10**(threshold_dB/20) + + # Assembling message + cmd="STHU" + th_bytes=pack("f",threshold) + msg_bytes=(cmd.encode()+th_bytes+bytearray(120)) + try: + _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) + except: + errorMsg = sys.exc_info()[0] + self.logger.error("Unable to start communication thread") + self.logger.error("Error message: {:s}".format(errorMsg)) + + def ctr_iface_init(self): + """ + Initialize connection with the DAQ FW through the control interface + """ + if self.receiver_connection_status: # Check connection + # Assembling message + cmd="INIT" + msg_bytes=(cmd.encode()+bytearray(124)) + try: + _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) + except: + errorMsg = sys.exc_info()[0] + self.logger.error("Unable to start communication thread") + self.logger.error("Error message: {:s}".format(errorMsg)) + + def ctr_iface_communication(self, msg_bytes): + """ + Handles communication on the control interface with the DAQ FW + + Parameters: + ----------- + + :param: msg: Message bytes, that will be sent ont the control interface + :type: msg: Byte array + """ + self.ctr_iface_thread_lock.acquire() + self.logger.debug("Sending control message") + self.ctr_iface_socket.send(msg_bytes) + + # Waiting for the command to take effect + reply_msg_bytes = self.ctr_iface_socket.recv(128) + + self.logger.debug("Control interface communication finished") + self.ctr_iface_thread_lock.release() + + status = reply_msg_bytes[0:4].decode() + if status == "FNSD": + self.logger.info("Reconfiguration succesfully finished") + que_data_packet = [] + que_data_packet.append(['config-ok',]) + self.data_que.put(que_data_packet) + + else: + self.logger.error("Failed to set the requested parameter, reply: {0}".format(status)) + + def set_center_freq(self, center_freq): + """ + Configures the RF center frequency of the receiver through the control interface + + Paramters: + ---------- + :param: center_freq: Required center frequency to set [Hz] + :type: center_freq: float + """ + if self.receiver_connection_status: # Check connection + self.daq_center_freq = int(center_freq) + # Set center frequency + cmd="FREQ" + freq_bytes=pack("Q",int(center_freq)) + msg_bytes=(cmd.encode()+freq_bytes+bytearray(116)) + try: + _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) + except: + errorMsg = sys.exc_info()[0] + self.logger.error("Unable to start communication thread") + self.logger.error("Error message: {:s}".format(errorMsg)) + + def set_if_gain(self, gain): + """ + Configures the IF gain of the receiver through the control interface + + Paramters: + ---------- + :param: gain: IF gain value [dB] + :type: gain: int + """ + if self.receiver_connection_status: # Check connection + self.daq_rx_gain = gain + + # Set center frequency + cmd="GAIN" + gain_list=[297, 37] #[int(gain*10)]*self.M + gain_bytes=pack("I"*self.M, *gain_list) + msg_bytes=(cmd.encode()+gain_bytes+bytearray(128-(self.M+1)*4)) + try: + _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) + except: + errorMsg = sys.exc_info()[0] + self.logger.error("Unable to start communication thread") + self.logger.error("Error message: {:s}".format(errorMsg)) + + def close(self): + """ + Disconnet the receiver module and the DAQ FW + """ + self.eth_close() + diff --git a/_receiver/shmemIface.py b/_receiver/shmemIface.py new file mode 100755 index 0000000..fd301b1 --- /dev/null +++ b/_receiver/shmemIface.py @@ -0,0 +1,195 @@ +""" + HeIMDALL DAQ Firmware + Python based shared memory interface implementations + + Author: Tamás Pető + License: GNU GPL V3 + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. +""" +import logging +from struct import pack, unpack +from multiprocessing import shared_memory +import numpy as np +import os + +A_BUFF_READY = 1 +B_BUFF_READY = 2 +INIT_READY = 10 +TERMINATE = 255 +class outShmemIface(): + + + def __init__(self, shmem_name, shmem_size, drop_mode = False): + + self.init_ok = True + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + self.drop_mode = drop_mode + self.dropped_frame_cntr = 0 + + self.shmem_name = shmem_name + self.buffer_free = [True, True] + + self.memories = [] + self.buffers = [] + + # Try to remove shared memories if already exist + try: + shmem_A = shared_memory.SharedMemory(name=shmem_name+'_A',create=False, size=shmem_size) + shmem_A.close() + #shmem_A.unkink() + except FileNotFoundError as err: + self.logger.warning("Shared memory not exist") + try: + shmem_B = shared_memory.SharedMemory(name=shmem_name+'_B',create=False, size=shmem_size) + shmem_B.close() + #shmem_B.unkink() + except FileNotFoundError as err: + self.logger.warning("Shared memory not exist") + + # Create the shared memories + self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A',create=True, size=shmem_size)) + self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B',create=True, size=shmem_size)) + self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[0].buf)) + self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[1].buf)) + + # Opening control FIFOs + if self.drop_mode: + bw_fifo_flags = os.O_RDONLY | os.O_NONBLOCK + else: + bw_fifo_flags = os.O_RDONLY + try: + self.fw_ctr_fifo = os.open('_data_control/'+'fw_'+shmem_name, os.O_WRONLY) + self.bw_ctr_fifo = os.open('_data_control/'+'bw_'+shmem_name, bw_fifo_flags) + except OSError as err: + self.logger.critical("OS error: {0}".format(err)) + self.logger.critical("Failed to open control fifos") + self.bw_ctr_fifo = None + self.fw_ctr_fifo = None + self.init_ok = False + + # Send init ready signal + if self.init_ok: + os.write(self.fw_ctr_fifo, pack('B',INIT_READY)) + + def send_ctr_buff_ready(self, active_buffer_index): + # Send buffer ready signal on the forward FIFO + if active_buffer_index == 0: + os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY)) + elif active_buffer_index == 1: + os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY)) + + # Deassert buffer free flag + self.buffer_free[active_buffer_index] = False + + def send_ctr_terminate(self): + os.write(self.fw_ctr_fifo, pack('B',TERMINATE)) + self.logger.info("Terminate signal sent") + + def destory_sm_buffer(self): + for memory in self.memories: + memory.close() + memory.unlink() + + if self.fw_ctr_fifo is not None: + os.close(self.fw_ctr_fifo) + + if self.bw_ctr_fifo is not None: + os.close(self.bw_ctr_fifo) + + def wait_buff_free(self): + if self.buffer_free[0]: + return 0 + elif self.buffer_free[1]: + return 1 + else: + try: + buffer = os.read(self.bw_ctr_fifo, 1) + signal = unpack('B', buffer )[0] + + if signal == A_BUFF_READY: + self.buffer_free[0] = True + return 0 + if signal == B_BUFF_READY: + self.buffer_free[1] = True + return 1 + except BlockingIOError as err: + self.dropped_frame_cntr +=1 + self.logger.warning("Dropping frame.. Total: [{:d}] ".format(self.dropped_frame_cntr)) + return 3 + return -1 + + +class inShmemIface(): + + def __init__(self, shmem_name, ctr_fifo_path="_data_control/"): + + self.init_ok = True + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + self.drop_mode = False + + self.shmem_name = shmem_name + + self.memories = [] + self.buffers = [] + try: + self.fw_ctr_fifo = os.open(ctr_fifo_path+'fw_'+shmem_name, os.O_RDONLY) + self.bw_ctr_fifo = os.open(ctr_fifo_path+'bw_'+shmem_name, os.O_WRONLY) + except OSError as err: + self.logger.critical("OS error: {0}".format(err)) + self.logger.critical("Failed to open control fifos") + self.bw_ctr_fifo = None + self.fw_ctr_fifo = None + self.init_ok = False + + if self.fw_ctr_fifo is not None: + if unpack('B', os.read(self.fw_ctr_fifo, 1))[0] == INIT_READY: + self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A')) + self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B')) + self.buffers.append(np.ndarray((self.memories[0].size,), + dtype=np.uint8, + buffer=self.memories[0].buf)) + self.buffers.append(np.ndarray((self.memories[1].size,), + dtype=np.uint8, + buffer=self.memories[1].buf)) + else: + self.init_ok = False + + def send_ctr_buff_ready(self, active_buffer_index): + if active_buffer_index == 0: + os.write(self.bw_ctr_fifo, pack('B',A_BUFF_READY)) + elif active_buffer_index == 1: + os.write(self.bw_ctr_fifo, pack('B',B_BUFF_READY)) + + def destory_sm_buffer(self): + for memory in self.memories: + memory.close() + + if self.fw_ctr_fifo is not None: + os.close(self.fw_ctr_fifo) + + if self.bw_ctr_fifo is not None: + os.close(self.bw_ctr_fifo) + + def wait_buff_free(self): + signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0] + if signal == A_BUFF_READY: + return 0 + elif signal == B_BUFF_READY: + return 1 + elif signal == TERMINATE: + return TERMINATE + return -1 |