Source code for percival.carrier.encoding

'''
Created on 4 Dec 2014

@author: Ulrik Pedersen
'''
from __future__ import unicode_literals, absolute_import
from builtins import bytes
try:
    import itertools.izip as zip
except ImportError:
    pass

import struct
import logging
logger = logging.getLogger(__name__)

SINGLE_MSG_FMT = b'!HI'

NUM_BYTES_PER_MSG = 6
"""Number of bytes per message. Each message consist of a 2 byte address field and a 4 byte data field"""

DATA_ENCODING='latin-1' # Just because latin-1 allow values from 0-255

END_OF_MESSAGE = bytes('\xFF\xFF\xAB\xBA\xBA\xC1', encoding=DATA_ENCODING)
"""End of message is used in some cases. The EOM word is 0xFFFFABBABAC1"""

msg_packer = struct.Struct(SINGLE_MSG_FMT)

[docs]def encode_message(addr, word): """Encode a single address and dataword into a bytearray. :param addr: UART address (16bit integer) :param word: data word (32bit integer) :returns: bytearray of 6 bytes with address and dataword encoded """ logger.debug("%s"%([addr, word])) encoded_msg = msg_packer.pack(addr, word) # Python 2 -> 3 compatibility workaround: # In python2 struct.pack() returns a string which we then # need to convert to a 'bytes' object. if isinstance(encoded_msg, str): encoded_msg = bytes( encoded_msg, encoding=DATA_ENCODING) logger.debug("encode_message returning: %s"%[encoded_msg]) return encoded_msg
[docs]def encode_multi_message(start_addr, words): """Encode multiple 32bit words as a multi-message. :param start_addr: The UART starting address (a 16bit integer word) :param words: A list of 32bit integer words to be encoded :returns: A list of encoded words, each of which consists of 6 bytes: 2 words of address and 4 words of data """ logger.debug("%s"%([start_addr, words])) addresses = range(start_addr, start_addr + len(words)) encoded_msg = [] assert len(addresses) == len(words) for addr, word in zip(*[addresses, words]): encoded_msg.append(encode_message(addr, word)) logger.debug("encode_multi_message returning: %s"%encoded_msg) return encoded_msg
[docs]def decode_message(msg): """Decode a byte array into a list of (address, dataword) tuples. The address field is a 16bit integer and the dataword is a 32bit integer. :param msg: The input message :type msg: Bytearray :returns: A list of (address, data) tuples """ logger.debug(msg) extra_bytes = len(msg)%NUM_BYTES_PER_MSG if (extra_bytes > 0): logger.warning("Too many (%d) bytes in message"%extra_bytes) msg = msg[:-extra_bytes] # WARNING: we are chopping away some bytes here... num_words = len(msg)//NUM_BYTES_PER_MSG fmt = b"!" fmt += b"HI" * num_words msg_unpacker = struct.Struct(fmt) addr_word_list = msg_unpacker.unpack(msg) # reshape the linear list of (addr, word, addr, word, addr, word...) into a # neat [(addr,word), (addr, word) ... ] list addr_word_sets = [ aw_set for aw_set in zip(*[iter(addr_word_list)]*2) ] logger.debug("Returning: %s"%str(addr_word_sets)) return addr_word_sets