diff --git a/canopen/sdo/client.py b/canopen/sdo/client.py index 82ee0c88..c442714c 100644 --- a/canopen/sdo/client.py +++ b/canopen/sdo/client.py @@ -316,20 +316,21 @@ def read(self, size=-1): self.pos += length return response[1:length + 1] - def readinto(self, b): - """ - Read bytes into a pre-allocated, writable bytes-like object b, - and return the number of bytes read. - """ - data = self.read(7) - b[:len(data)] = data - return len(data) + # For now not used, but if needed, a test is also required: + # def readinto(self, b): + # """ + # Read bytes into a pre-allocated, writable bytes-like object b, + # and return the number of bytes read. + # """ + # data = self.read(7) + # b[:len(data)] = data + # return len(data) - def readable(self): - return True + # def readable(self): + # return True - def tell(self): - return self.pos + # def tell(self): + # return self.pos class WritableStream(io.RawIOBase): @@ -366,6 +367,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False response = sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command != RESPONSE_DOWNLOAD: + self._done = True # prevent close() from sending a stray close segment self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X}") @@ -420,6 +422,7 @@ def write(self, b): response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: + self._done = True # prevent close() from sending a stray close segment self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X} " @@ -535,6 +538,7 @@ def read(self, size=-1): if seqno == self._ackseq + 1: self._ackseq = seqno else: + logger.debug('Wrong seqno') # Wrong sequence number response = self._retransmit() res_command, = struct.unpack_from("B", response) @@ -612,14 +616,15 @@ def close(self): def tell(self): return self.pos - def readinto(self, b): - """ - Read bytes into a pre-allocated, writable bytes-like object b, - and return the number of bytes read. - """ - data = self.read(7) - b[:len(data)] = data - return len(data) + # For now not used, but if needed, a test is also required: + # def readinto(self, b): + # """ + # Read bytes into a pre-allocated, writable bytes-like object b, + # and return the number of bytes read. + # """ + # data = self.read(7) + # b[:len(data)] = data + # return len(data) def readable(self): return True @@ -787,6 +792,8 @@ def _retransmit(self, ackseq, blksize): # Reset _seqno and update blksize self._seqno = 0 self._blksize = blksize + # Reset _done so the last segment can be re-sent + self._done = False # We are retransmitting self._retransmitting = True # Resend the block diff --git a/canopen/sdo/constants.py b/canopen/sdo/constants.py index e19c79a6..7e6ad4a8 100644 --- a/canopen/sdo/constants.py +++ b/canopen/sdo/constants.py @@ -4,7 +4,11 @@ # Command, index, subindex SDO_STRUCT = struct.Struct(" int: + code = list(SdoAbortedError.CODES.keys())[ + list(SdoAbortedError.CODES.values()).index(text) + ] + return SdoAbortedError(code) + class SdoCommunicationError(SdoError): """No or unexpected response from slave.""" diff --git a/canopen/sdo/server.py b/canopen/sdo/server.py index c26dc998..e36c0094 100644 --- a/canopen/sdo/server.py +++ b/canopen/sdo/server.py @@ -8,6 +8,10 @@ logger = logging.getLogger(__name__) +class SdoBlockException(SdoAbortedError): + """Dedicated SDO Block exception.""" + + class SdoServer(SdoBase): """Creates an SDO server.""" @@ -27,9 +31,24 @@ def __init__(self, rx_cobid, tx_cobid, node): self._index = None self._subindex = None self.last_received_error = 0x00000000 + self.sdo_block = None def on_request(self, can_id, data, timestamp): - command, = struct.unpack_from("B", data, 0) + logger.debug("on_request") + if self.sdo_block and self.sdo_block.state != BLOCK_STATE_NONE: + try: + self.process_block(data) + except SdoAbortedError as exc: + self.sdo_block = None + self.abort(exc.code) + raise + except Exception: + self.sdo_block = None + self.abort() + raise + return + + (command,) = struct.unpack_from("B", data, 0) ccs = command & 0xE0 try: @@ -57,6 +76,120 @@ def on_request(self, can_id, data, timestamp): self.abort() logger.exception(exc) + def process_block(self, request): + """ + Process a block request, using a state mechanisme from SdoBlock class + to handle the different states of the block transfer. + + :param request: + CAN message containing EMCY or SDO request. + """ + + logger.debug("process_block") + command, _, _, code = SDO_ABORT_STRUCT.unpack_from(request) + if command == 0x80: + # Abort received + logger.error("Abort: 0x%08X" % code) + self.sdo_block = None + return + + if BLOCK_STATE_UPLOAD < self.sdo_block.state < BLOCK_STATE_DOWNLOAD: + logger.debug("BLOCK_STATE_UPLOAD") + command, _, _ = SDO_STRUCT.unpack_from(request) + + # in upload state + if self.sdo_block.state == BLOCK_STATE_UP_INIT_RESP: + logger.debug("BLOCK_STATE_UP_INIT_RESP") + # init response was sent, client required to send new request + if (command & REQUEST_BLOCK_UPLOAD) != REQUEST_BLOCK_UPLOAD: + raise SdoBlockException("Unknown SDO command specified") # pragma: no cover + if (command & START_BLOCK_UPLOAD) != START_BLOCK_UPLOAD: + raise SdoBlockException("Unknown SDO command specified") # pragma: no cover + + # now start blasting data to client from server + self.sdo_block.update_state(BLOCK_STATE_UP_DATA) + + blocks = self.sdo_block.get_upload_blocks() + for block in blocks: + self.send_response(block) + + elif self.sdo_block.state == BLOCK_STATE_UP_DATA: + logger.debug("BLOCK_STATE_UP_DATA") + command, ackseq, newblk = SDO_BLOCKACK_STRUCT.unpack_from(request) + if (command & REQUEST_BLOCK_UPLOAD) != REQUEST_BLOCK_UPLOAD: + raise SdoBlockException("Unknown SDO command specified") + elif (command & BLOCK_TRANSFER_RESPONSE) != BLOCK_TRANSFER_RESPONSE: + raise SdoBlockException("Unknown SDO command specified") + elif ackseq != self.sdo_block.last_seqno: + self.sdo_block.data_uploaded = self.sdo_block.data_successful_upload + else: + self.sdo_block.data_successful_upload = self.sdo_block.data_uploaded + + if self.sdo_block.size == self.sdo_block.data_uploaded: + logger.debug("BLOCK_STATE_UP_DATA last data") + self.sdo_block.update_state(BLOCK_STATE_UP_END) + response = bytearray(8) + command = RESPONSE_BLOCK_UPLOAD + command |= END_BLOCK_TRANSFER + n = self.sdo_block.last_bytes << 2 + command |= n + logger.debug("Last no byte: %d, CRC: x%04X", self.sdo_block.last_bytes, self.sdo_block.crc_value) + SDO_BLOCKEND_STRUCT.pack_into(response, 0, command, self.sdo_block.crc_value) + self.send_response(response) + else: + blocks = self.sdo_block.get_upload_blocks() + for block in blocks: + self.send_response(block) + + elif self.sdo_block.state == BLOCK_STATE_UP_END: + self.sdo_block = None + + elif BLOCK_STATE_DOWNLOAD < self.sdo_block.state <= BLOCK_STATE_DL_END: + # in download state + logger.debug("BLOCK_STATE_DOWNLOAD") + if self.sdo_block.state == BLOCK_STATE_DL_DATA: + logger.debug("BLOCK_STATE_DL_DATA") + seqno = command & 0x7F + last_seg = bool(command & NO_MORE_BLOCKS) + # Accumulate data bytes (bytes 1-7 of each segment) + self.sdo_block.append_download_data(request[1:8]) + self.sdo_block.last_seqno = seqno + + if seqno >= self.sdo_block.req_blocksize or last_seg: + # Send block acknowledgement + response = bytearray(8) + response[0] = RESPONSE_BLOCK_DOWNLOAD | BLOCK_TRANSFER_RESPONSE + response[1] = seqno # ackseq + response[2] = self.sdo_block.req_blocksize # new blksize + self.send_response(response) + self.sdo_block.seqno = 0 + + if last_seg: + self.sdo_block.update_state(BLOCK_STATE_DL_END) + + elif self.sdo_block.state == BLOCK_STATE_DL_END: + logger.debug("BLOCK_STATE_DL_END") + if (command & REQUEST_BLOCK_DOWNLOAD) != REQUEST_BLOCK_DOWNLOAD: + raise SdoBlockException("Unknown SDO command specified") # pragma: no cover + if (command & SUB_COMMAND_MASK) != END_BLOCK_TRANSFER: + raise SdoBlockException("Unknown SDO command specified") # pragma: no cover + + # n = bytes NOT used in last segment + n = (command >> 2) & 0x7 + data = self.sdo_block.finalize_download(n) + + self._node.set_data(self.sdo_block.index, self.sdo_block.subindex, data, check_writable=True) + + response = bytearray(8) + response[0] = RESPONSE_BLOCK_DOWNLOAD | END_BLOCK_TRANSFER + self.send_response(response) + self.sdo_block = None + else: + # in neither + raise SdoBlockException( + "Data can not be transferred or stored to the application because of the present device state" + ) # pragma: no cover + def init_upload(self, request): _, index, subindex = SDO_STRUCT.unpack_from(request) self._index = index @@ -74,13 +207,12 @@ def init_upload(self, request): logger.info("Expedited upload for 0x%04X:%02X", index, subindex) res_command |= EXPEDITED res_command |= (4 - size) << 2 - response[4:4 + size] = data + response[4 : 4 + size] = data else: logger.info("Initiating segmented upload for 0x%04X:%02X", index, subindex) struct.pack_into("> 2) & 0x3) else: size = 4 - self._node.set_data(index, subindex, request[4:4 + size], check_writable=True) + self._node.set_data(index, subindex, request[4 : 4 + size], check_writable=True) else: logger.info("Initiating segmented download for 0x%04X:%02X", index, subindex) if command & SIZE_SPECIFIED: - size, = struct.unpack_from(" %X", self.state, new_state) + if new_state >= self.state: + self.state = new_state + else: + raise SdoBlockException( + "Data can not be transferred or stored to the application because of the present device state" + ) + + def get_upload_blocks(self): + """ + Get the blocks of data to be sent to the client. The blocks are + created in a messages list of bytearrays. + """ + + msgs = [] + + # seq no 1 - 127, not 0 -.. + for seqno in range(1, self.req_blocksize + 1): + logger.debug("SEQNO %d", seqno) + response = bytearray(8) + command = 0 + if self.size <= (self.data_uploaded + 7): + # no more segments after this + command |= NO_MORE_BLOCKS + + command |= seqno + response[0] = command + for i in range(7): + databyte = self.get_data_byte() + if databyte != None: + response[i + 1] = databyte + else: + self.last_bytes = 7 - i + break + msgs.append(response) + self.last_seqno = seqno + + if self.size == self.data_uploaded: + break + logger.debug(msgs) + return msgs + + def get_data_byte(self): + """Get the next byte of data to be sent to the client.""" + if self.data_uploaded < self.size: + self.data_uploaded += 1 + return self.data[self.data_uploaded - 1] + return None + + def append_download_data(self, segment): + """Append a 7-byte segment to the download data buffer. + + :param segment: + Bytes 1-7 of the received block segment message (always 7 bytes). + """ + self._data_buffer.extend(segment) + + def finalize_download(self, n): + """Return the accumulated download data, trimming the last n unused bytes. + + :param int n: + Number of bytes in the last segment that did not contain data + (as signalled by the client in the END_BLOCK_TRANSFER command). + + :returns: + The complete received data as bytes. + """ + if n > 0: + return bytes(self._data_buffer[:-n]) + return bytes(self._data_buffer) diff --git a/test/test_local.py b/test/test_local.py index 6ab94645..c53e3ffc 100644 --- a/test/test_local.py +++ b/test/test_local.py @@ -1,5 +1,7 @@ import time import unittest +import struct +from unittest.mock import MagicMock, patch import canopen @@ -37,22 +39,87 @@ def test_expedited_upload(self): vendor_id = self.remote_node.sdo[0x1400][1].raw self.assertEqual(vendor_id, 0x99) - def test_block_upload_switch_to_expedite_upload(self): - with self.assertRaises(canopen.SdoCommunicationError) as context: - with self.remote_node.sdo[0x1008].open('r', block_transfer=True) as fp: - pass - # We get this since the sdo client don't support the switch - # from block upload to expedite upload - self.assertEqual("Unexpected response 0x41", str(context.exception)) + def test_block_download(self): + data = b"BLOCK DOWNLOAD TEST DATA" + # Write data using block download + with self.remote_node.sdo[0x2000].open('wb', size=len(data), block_transfer=True) as fp: + fp.write(data) + # Read back using block upload (client requests upload from server) + with self.remote_node.sdo[0x2000].open('rb', block_transfer=True) as fp: + read_data = fp.read() + self.assertEqual(read_data, data) + + def test_block_upload_multi_block(self): + """Block tranfer of bulk data using multiple blocks. Each block can transfer up to 127 segments of 7 bytes (889 bytes)""" + # 70 * 28 = 1960 bytes, exceeds one block (127 segments * 7 bytes = 889 bytes) + data = b"Lorem ipsum dolor sit amet. " * 70 + self.local_node.sdo[0x2000].raw = data.decode("latin-1") + with self.remote_node.sdo[0x2000].open('rb', block_transfer=True) as fp: + read_data = fp.read() + self.assertEqual(read_data, data) + + def test_process_block_up_data_wrong_ackseq(self): + """ + Test that when client acks with wrong seqno, server rolls back data_uploaded to data_successful_upload + (the start of the current block) and asks for retransmit. + """ + server = self.local_node.sdo + server._index = 0x2000 + server._subindex = 0 + + mock_block = MagicMock() + mock_block.state = canopen.sdo.constants.BLOCK_STATE_UP_DATA # 0x12 + mock_block.last_seqno = 127 # server sent seqno 1..127 + mock_block.data_uploaded = 889 # 127 * 7, end of first block + mock_block.data_successful_upload = 0 + mock_block.size = 1960 # two blocks worth, transfer not done + mock_block.get_upload_blocks.return_value = [] + server.sdo_block = mock_block + + # command = REQUEST_BLOCK_UPLOAD (0xA0) | BLOCK_TRANSFER_RESPONSE (0x02) = 0xA2 + # ackseq = 0 (wrong: last_seqno was 127), newblk = 127 + request = bytearray(struct.pack("