Skip to content

Commit 60c0c74

Browse files
birnevogel11aigarius
authored andcommitted
feat: support setting udp buffer size in run time
1 parent da7de2c commit 60c0c74

3 files changed

Lines changed: 101 additions & 5 deletions

File tree

dlt/dlt.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
DLT_EMPTY_FILE_ERROR = "DLT TRACE FILE IS EMPTY"
5757
cDLT_FILE_NOT_OPEN_ERROR = "Could not open DLT Trace file (libdlt)" # pylint: disable=invalid-name
5858

59+
DLT_UDP_MULTICAST_FD_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_FD_BUFFER_SIZE", 2 * (2**20))) # 2 Mb
60+
DLT_UDP_MULTICAST_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_BUFFER_SIZE", 8 * (2**20))) # 8 Mb
61+
5962

6063
class cached_property(object): # pylint: disable=invalid-name
6164
"""
@@ -865,13 +868,23 @@ def __len__(self):
865868

866869

867870
class DLTClient(cDltClient):
868-
"""DLTClient class takes care about correct initialization and
869-
cleanup
870-
"""
871+
"""DLTClient class takes care about correct initialization and cleanup"""
871872

872873
verbose = 0
873874

874875
def __init__(self, **kwords):
876+
"""Initialize a DLTClient.
877+
878+
:param servIP: Optional[str] - dlt server IP.
879+
:param hostIP: Optional[str] - Only available for udp multicast mode.
880+
Set host interface address.
881+
:param port: Optional[int] - dlt tcp daemon port.
882+
:param verbose: Optional[bool] - Enable verbose output.
883+
:param udp_fd_buffer_size_bytes: Optional[int] - Only available for udp
884+
multicast mode. Set the UDP buffer size through setsockopt (unit: bytes).
885+
:param udp_buffer_size_bytes: Optional[int] - Only available for udp
886+
multicast mode. Set the DltReceiver's buffer size (unit: bytes).
887+
"""
875888
self.is_udp_multicast = False
876889
self.verbose = kwords.pop("verbose", 0)
877890
if dltlib.dlt_client_init(ctypes.byref(self), self.verbose) == DLT_RETURN_ERROR:
@@ -915,6 +928,9 @@ def __init__(self, **kwords):
915928
# it ourselves elsewhere
916929
self.port = kwords.get("port", DLT_DAEMON_TCP_PORT)
917930

931+
self._udp_fd_buffer_size_bytes = kwords.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE)
932+
self._udp_buffer_size_bytes = kwords.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE)
933+
918934
def connect(self, timeout=None):
919935
"""Connect to the server
920936
@@ -973,7 +989,9 @@ def connect(self, timeout=None):
973989
else:
974990
if self.verbose:
975991
logger.info("Connecting DLTClient using UDP Connection")
992+
976993
connected = dltlib.dlt_client_connect(ctypes.byref(self), self.verbose)
994+
self._set_udp_multicast_buffer_size()
977995

978996
if self.verbose:
979997
logger.info("DLT Connection return: %s", connected)
@@ -1051,6 +1069,54 @@ def client_loop(self):
10511069
dltlib.dlt_client_register_message_callback(self.msg_callback)
10521070
dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose)
10531071

1072+
def _set_udp_multicast_buffer_size(self, custom_fd_buffer_size_bytes=None, custom_buffer_size_bytes=None) -> None:
1073+
fd_buffer_size = int(self._udp_fd_buffer_size_bytes or custom_fd_buffer_size_bytes or 0)
1074+
buffer_size_bytes = int(self._udp_buffer_size_bytes or custom_buffer_size_bytes or 0)
1075+
1076+
if fd_buffer_size:
1077+
# Socket options are associated with an open file description. This
1078+
# means that file descriptors duplicated as a consequence of dup()
1079+
# (or similar) or fork() share the same set of socket options.
1080+
# -- Chapter 61.9 Socket Options.
1081+
# The Linux Programming Interface, p.1279
1082+
#
1083+
# The buffer size can be changed with a new fd which is created by
1084+
# dup system call (it's the internal implementation in
1085+
# `socket.fromfd`), so the code creates a socket instance first
1086+
# configures it and directly close it.
1087+
with socket.fromfd(self.sock, socket.AF_INET, socket.SOCK_DGRAM) as conf_socket:
1088+
logger.debug("Set UDP Multicast socket buffer size: %s kbytes", fd_buffer_size / 1024)
1089+
conf_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, fd_buffer_size)
1090+
1091+
real_buffer_size = int(conf_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) / 2)
1092+
if real_buffer_size != fd_buffer_size:
1093+
logger.warning(
1094+
(
1095+
"Failed to set UDP Multicast buffer size. set_size: %s, real_size: %s. "
1096+
"Bypass the error and continue"
1097+
),
1098+
fd_buffer_size / 1024,
1099+
real_buffer_size / 1024,
1100+
)
1101+
logger.warning(
1102+
(
1103+
"Please run command `sysctl -w net.core.rmem_max=%s` with root permission to "
1104+
"set the maximum size and restart dlt again."
1105+
),
1106+
fd_buffer_size,
1107+
)
1108+
1109+
if buffer_size_bytes:
1110+
logger.debug("Set UDP Multicast DltReceiver buffer size: %s kbytes", buffer_size_bytes / 1024)
1111+
ret = dltlib.dlt_receiver_init(
1112+
ctypes.byref(self.receiver), self.sock, self.receiver.type, buffer_size_bytes
1113+
)
1114+
if ret < 0:
1115+
raise RuntimeError(
1116+
f"Failed to set UDP Multicast DltReceiver buffer size. return code: {ret}, "
1117+
f"buffer_size_bytes: {buffer_size_bytes}"
1118+
)
1119+
10541120

10551121
def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None):
10561122
"""Main loop to read dlt messages from dlt file."""

dlt/dlt_broker_handlers.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from dlt.dlt import (
1616
DLTClient,
1717
DLT_DAEMON_TCP_PORT,
18+
DLT_UDP_MULTICAST_BUFFER_SIZE,
19+
DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
1820
cDLT_FILE_NOT_OPEN_ERROR,
1921
load,
2022
py_dlt_client_main_loop,
@@ -402,6 +404,8 @@ def __init__(
402404
self.tracefile = None
403405
self.last_connected = time.time()
404406
self.last_message = time.time() - 120.0
407+
self._udp_fd_buffer_size_bytes = client_cfg.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE)
408+
self._udp_buffer_size_bytes = client_cfg.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE)
405409

406410
def is_valid_message(self, message):
407411
return message and (message.apid != "" or message.ctid != "")
@@ -420,7 +424,13 @@ def _client_connect(self):
420424
self._port,
421425
self._filename,
422426
)
423-
self._client = DLTClient(servIP=self._ip_address, port=self._port, verbose=self.verbose)
427+
self._client = DLTClient(
428+
servIP=self._ip_address,
429+
port=self._port,
430+
verbose=self.verbose,
431+
udp_fd_buffer_size_bytes=self._udp_fd_buffer_size_bytes,
432+
udp_buffer_size_bytes=self._udp_buffer_size_bytes,
433+
)
424434
connected = self._client.connect(self.timeout)
425435
if connected:
426436
logger.info("DLTClient connected to %s", self._client.servIP)

dlt/py_dlt_receive.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import time
77

8+
from dlt.dlt import DLT_UDP_MULTICAST_FD_BUFFER_SIZE, DLT_UDP_MULTICAST_BUFFER_SIZE
89
from dlt.dlt_broker import DLTBroker
910

1011
logging.basicConfig(format="%(asctime)s %(name)s %(levelname)-8s %(message)s")
@@ -18,13 +19,32 @@ def parse_args():
1819
parser = argparse.ArgumentParser(description="Receive DLT messages")
1920
parser.add_argument("--host", required=True, help="hostname or ip address to connect to")
2021
parser.add_argument("--file", required=True, help="The file into which the messages will be written")
22+
parser.add_argument(
23+
"--udp-fd-buffer-size",
24+
dest="udp_fd_buffer_size",
25+
default=DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
26+
type=int,
27+
help=f"Set the socket buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_FD_BUFFER_SIZE} bytes",
28+
)
29+
parser.add_argument(
30+
"--udp-buffer-size",
31+
dest="udp_buffer_size",
32+
default=DLT_UDP_MULTICAST_BUFFER_SIZE,
33+
type=int,
34+
help=f"Set the DltReceiver buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_BUFFER_SIZE} bytes",
35+
)
2136
return parser.parse_args()
2237

2338

2439
def dlt_receive(options):
2540
"""Receive DLT messages via DLTBroker"""
2641
logger.info("Creating DLTBroker instance")
27-
broker = DLTBroker(ip_address=options.host, filename=options.file)
42+
broker = DLTBroker(
43+
ip_address=options.host,
44+
filename=options.file,
45+
udp_fd_buffer_size_bytes=options.udp_buffer_size,
46+
udp_buffer_size_bytes=options.udp_fd_buffer_size,
47+
)
2848

2949
logger.info("Starting DLTBroker")
3050
broker.start() # start the loop

0 commit comments

Comments
 (0)