Source code for percival.carrier.txrx

'''
Created on 4 Dec 2014

@author: Ulrik Pedersen
'''
from __future__ import unicode_literals, absolute_import
from builtins import bytes

import logging
import binascii
import socket
from contextlib import contextmanager 

from percival.carrier.encoding import DATA_ENCODING, NUM_BYTES_PER_MSG, END_OF_MESSAGE

[docs]class TxMessage(object): """Encapsulate a Percival carrier board message and the number of messages to expect in response"""
[docs] def __init__(self, message, num_response_msg = 1, expect_eom = False): """ TxMessage constructor :param message: A Percival Carrier Board message contain address (2 bytes) and data (4 bytes) :type message: Byte array :param num_response_msg: Number of messages to expect in response :param expected_eom: set true if end-of-message is expected in response :type expected_eom: boolean """ self.num_response_msg = num_response_msg self._message = message self._expect_eom = expect_eom
@property def message(self): return self._message @property def expected_bytes(self): return self.num_response_msg * NUM_BYTES_PER_MSG @property def expected_response(self): if self._expect_eom: return END_OF_MESSAGE def __str__(self, *args, **kwargs): s = "<TxMessag msg=0x%s exp. resp.: %d bytes>"%(binascii.hexlify(self._message).upper(), self.num_response_msg) return s
[docs]class TxRx(object): ''' Transmit and receive data and commands to/from the Carrier Board through the XPort Ethernet '''
[docs] def __init__(self, fpga_addr, port = 10001, timeout = 2.0): '''TxRx Constructor :param fpga_addr: IP address or network name of the Carrier Board XPort device :type fpga_addr: string :param port: IP port number :type port: int :param timeout: Socket communication timeout (seconds) :type timeout: floating point ''' self.log = logging.getLogger(".".join([__name__, self.__class__.__name__])) self._fpga_addr = (fpga_addr, port) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(timeout) self.log.debug("connecting to FPGA: %s", str(self._fpga_addr)) self.sock.connect(self._fpga_addr)
@property def fpga_addr(self): return self._fpga_addr @property def timeout(self): return self.sock.gettimeout() @timeout.setter def timeout(self, value): self.sock.settimeout(value)
[docs] def tx_msg(self, msg): """Transmit a single message to the Carrier Board :param msg: Message to transmit :type msg: Bytearray of :const:`percival.carrier.encoding.NUM_BYTES_PER_MSG` bytes""" self.sock.sendall(msg)
[docs] def rx_msg(self, expected_bytes = None): """Receive messages of up to `expected_bytes` length :param expected_bytes: Number of bytes expected to be received. If `expected_bytes` is None, read at least one single message :raises RuntimeError: if a message of 0 bytes is received;indicating a broken socket connection :returns: The recieved message :rtype: byte array """ msg = bytes() block_read_bytes = expected_bytes expected_resp_len = expected_bytes if expected_bytes == None: expected_resp_len = NUM_BYTES_PER_MSG block_read_bytes = 1024 while len(msg) < expected_resp_len: if expected_bytes: block_read_bytes = expected_bytes-len(msg) chunk = self.sock.recv(block_read_bytes) chunk = bytes(chunk, encoding = DATA_ENCODING) if len(chunk) == 0: raise RuntimeError("socket connection broken (expected a multiple of 6 bytes)") msg = msg + chunk return msg
[docs] def send_recv(self, msg, expected_bytes = None): """Send `msg` and wait for receipt of `expected_bytes` in response or timeout :param msg: UART message to send :type msg: Bytearray :returns: Response from UART :rtype: Bytearray """ self.tx_msg(msg) resp = self.rx_msg(expected_bytes) return resp
[docs] def send_recv_message(self, message): """Send a message and wait for response :param message: a TxMessage object :retuns: Response from UART :rtype: Bytearray """ if not isinstance(message, TxMessage): raise TypeError("message must be of type TxMessage, not %s"%str(type(message))) self.tx_msg(message.message) resp = self.rx_msg(message.expected_bytes) # TODO: check for expected response return resp
[docs] def clean(self): """Shutdown and close the socket safely Sockets are normally closed down cleanly on exit from the interpreter, however this method may be used in case the socket need to be closed down temporarily.""" self.sock.shutdown(socket.SHUT_RDWR) self.sock.close()
@contextmanager
[docs]def TxRxContext(*args, **kwargs): """Provide a context which keeps a `TxRx` module alive with an open socket only for the duration of the context Minimal example: >>> msg = encode_message(0x0144, 0x00000000) # Header Info Readback >>> >>> # Start the context - create the TxRx object and open the socket >>> with TxRxContext("192.168.1.3") as trx: >>> response = trx.send_recv(msg) >>> # End of context - socket is closed down cleanly >>> >>> response = decode_message(response) >>> print response """ trx = TxRx(*args, **kwargs) yield trx trx.clean()