1313import logging
1414import struct
1515import time
16+ import unittest
1617from typing import TYPE_CHECKING , Callable
1718
1819from openlifu .io .LIFUConfig import OW_ERROR , OW_I2C_PASSTHRU
@@ -152,6 +153,134 @@ def parse_signed_package(pkg: bytes) -> dict:
152153 }
153154
154155
156+ # ---------------------------------------------------------------------------
157+ # Internal tests for DFU package parsing / CRC (deterministic, no hardware)
158+ # ---------------------------------------------------------------------------
159+
160+ def _build_synthetic_signed_package (
161+ fw : bytes ,
162+ meta : bytes ,
163+ fw_address : int = 0x08000000 ,
164+ meta_address : int = 0x08008000 ,
165+ ) -> bytes :
166+ """Construct a minimal, self-consistent signed DFU package for testing.
167+
168+ This uses the module's own header layout/CRC implementation so that tests
169+ validate :func:`stm32_crc32` and :func:`parse_signed_package` end to end.
170+ """
171+ hdr_size = struct .calcsize (_PKG_HDR_FULL )
172+ payload = fw + meta
173+ fw_len = len (fw )
174+ meta_len = len (meta )
175+
176+ payload_crc = stm32_crc32 (payload )
177+
178+ # First pack with a placeholder header CRC so we can compute the real one.
179+ header_crc_placeholder = 0
180+ header = struct .pack (
181+ _PKG_HDR_FULL ,
182+ _PKG_MAGIC ,
183+ _PKG_VERSION ,
184+ hdr_size ,
185+ fw_address ,
186+ fw_len ,
187+ meta_address ,
188+ meta_len ,
189+ payload_crc ,
190+ header_crc_placeholder ,
191+ )
192+
193+ header_crc = stm32_crc32 (header [:- 4 ])
194+ header = struct .pack (
195+ _PKG_HDR_FULL ,
196+ _PKG_MAGIC ,
197+ _PKG_VERSION ,
198+ hdr_size ,
199+ fw_address ,
200+ fw_len ,
201+ meta_address ,
202+ meta_len ,
203+ payload_crc ,
204+ header_crc ,
205+ )
206+
207+ return header + payload
208+
209+
210+ class TestSignedPackage (unittest .TestCase ):
211+ """Unit tests for :func:`stm32_crc32` and :func:`parse_signed_package`.
212+
213+ These tests are deterministic and require no hardware; they can be run by
214+ any standard Python test runner to guard against regressions that might
215+ otherwise risk bricking devices during DFU.
216+ """
217+
218+ def test_parse_signed_package_valid (self ) -> None :
219+ fw = b"\x01 \x02 \x03 \x04 "
220+ meta = b"\xAA \xBB "
221+ fw_addr = 0x08001000
222+ meta_addr = 0x08009000
223+
224+ pkg = _build_synthetic_signed_package (
225+ fw = fw ,
226+ meta = meta ,
227+ fw_address = fw_addr ,
228+ meta_address = meta_addr ,
229+ )
230+
231+ parsed = parse_signed_package (pkg )
232+
233+ assert parsed ["fw_address" ] == fw_addr
234+ assert parsed ["meta_address" ] == meta_addr
235+ assert parsed ["fw" ] == fw
236+ assert parsed ["meta" ] == meta
237+
238+ def test_parse_signed_package_header_crc_mismatch (self ) -> None :
239+ """Corrupt the header so header CRC verification fails."""
240+ fw = b"\x10 \x20 "
241+ meta = b"\x30 "
242+ pkg = _build_synthetic_signed_package (fw = fw , meta = meta )
243+
244+ # Flip a bit inside the header (but keep magic/version/size plausible).
245+ pkg_bytes = bytearray (pkg )
246+ if len (pkg_bytes ) < 8 :
247+ self .skipTest ("synthetic package unexpectedly small" )
248+ pkg_bytes [4 ] ^= 0x01
249+ corrupted = bytes (pkg_bytes )
250+
251+ exc_msg = None
252+ try :
253+ parse_signed_package (corrupted )
254+ raise AssertionError ("Expected ValueError was not raised" )
255+ except ValueError as exc :
256+ exc_msg = str (exc )
257+ assert exc_msg is not None
258+ assert "header CRC mismatch" in exc_msg
259+
260+ def test_parse_signed_package_payload_crc_mismatch (self ) -> None :
261+ """Corrupt the payload so payload CRC verification fails."""
262+ fw = b"\xDE \xAD \xBE \xEF "
263+ meta = b"\x00 \x01 "
264+ pkg = _build_synthetic_signed_package (fw = fw , meta = meta )
265+
266+ hdr_size = struct .calcsize (_PKG_HDR_FULL )
267+ pkg_bytes = bytearray (pkg )
268+ # Flip a bit in the first payload byte (after the header).
269+ if len (pkg_bytes ) <= hdr_size :
270+ self .skipTest ("synthetic package unexpectedly small" )
271+ pkg_bytes [hdr_size ] ^= 0x01
272+ corrupted = bytes (pkg_bytes )
273+
274+ exc_msg = None
275+ try :
276+ parse_signed_package (corrupted )
277+ raise AssertionError ("Expected ValueError was not raised" )
278+ except ValueError as exc :
279+ exc_msg = str (exc )
280+ assert exc_msg is not None
281+ assert "payload CRC mismatch" in exc_msg
282+
283+
155284# ---------------------------------------------------------------------------
156285# USB DFU client (module 0)
157286# ---------------------------------------------------------------------------
@@ -414,7 +543,6 @@ def __init__(self, uart: LIFUUart,
414543 write_read_delay_s : float = 0.005 ):
415544 self ._uart = uart
416545 self ._addr = i2c_addr
417- self ._wr_delay = write_read_delay_s
418546
419547 # --- low-level transport primitives ---
420548
@@ -437,11 +565,14 @@ def _write(self, payload: bytes) -> None:
437565
438566 def _exchange (self , payload : bytes , read_len : int ,
439567 pre_read_delay_s : float | None = None ) -> bytes :
440- """Write *payload* to the I2C slave, wait, then read *read_len* bytes back.
441-
442- The firmware inserts a fixed 5 ms gap between write and read.
443- An optional extra host-side delay can be added via *pre_read_delay_s*
444- (not usually needed).
568+ """Write *payload* to the I2C slave and read *read_len* bytes back.
569+
570+ The firmware executes a combined write+read transaction and inserts a
571+ fixed 5 ms gap between the write and read phases internally.
572+ The optional *pre_read_delay_s* parameter adds an extra host-side delay
573+ **before** issuing the passthrough transaction (i.e. before the
574+ firmware performs the write+read). This does *not* change the internal
575+ 5 ms gap handled by the firmware and is rarely needed.
445576 """
446577 if pre_read_delay_s and pre_read_delay_s > 0 :
447578 time .sleep (pre_read_delay_s )
@@ -784,16 +915,35 @@ def update_module(self,
784915 "Verifying I2C DFU entry (module %d, addr=0x%02X via master)..." ,
785916 module , i2c_addr ,
786917 )
787- try :
788- bl_version = self .get_bootloader_version_i2c (i2c_addr = i2c_addr )
789- except (RuntimeError , TimeoutError ) as e :
790- raise RuntimeError (
791- f"Module { module } did not enter I2C DFU mode at "
792- f"0x{ i2c_addr :02X} : { e } "
793- ) from e
918+ start_time = time .time ()
919+ bl_version = None
920+ last_error : Exception | None = None
921+ while True :
922+ elapsed = time .time () - start_time
923+ if elapsed >= dfu_enum_timeout_s :
924+ break
925+ try :
926+ candidate = self .get_bootloader_version_i2c (i2c_addr = i2c_addr )
927+ if candidate :
928+ bl_version = candidate
929+ break
930+ # Treat empty version string as a failure worth retrying.
931+ last_error = RuntimeError (
932+ "I2C DFU bootloader returned an empty version string"
933+ )
934+ except (RuntimeError , TimeoutError ) as e :
935+ last_error = e
936+ # Small delay before retrying to avoid busy-waiting.
937+ time .sleep (0.2 )
794938 if not bl_version :
939+ if last_error is not None :
940+ raise RuntimeError (
941+ f"Module { module } did not enter I2C DFU mode at "
942+ f"0x{ i2c_addr :02X} within { dfu_enum_timeout_s } s: { last_error } "
943+ ) from last_error
795944 raise RuntimeError (
796- f"Module { module } I2C DFU bootloader returned an empty version string"
945+ f"Module { module } did not enter I2C DFU mode at "
946+ f"0x{ i2c_addr :02X} within { dfu_enum_timeout_s } s"
797947 )
798948 logger .info ("I2C DFU bootloader version: %s" , bl_version )
799949 self .program_i2c (
0 commit comments