3333from cli_proton_python import coreclient , options , utils
3434
3535
36+ class Timeout (object ): # pylint: disable=too-few-public-methods
37+ """ Scheduler object for timeout control """
38+
39+ def __init__ (self , parent , event ):
40+ """
41+ Timeout constructor
42+ saving parent_ptr to get current data
43+ :param parent: related sender object
44+ :type parent: cli_proton_python.Send
45+ :param event: reactor event
46+ :type event: proton.Event
47+ """
48+ self .parent_ptr = parent
49+ self .parent_event = event
50+
51+ def on_timer_task (self , _ ):
52+ """ on_timer_task action handler
53+ """
54+ # Stops container
55+ if self .parent_event .container :
56+ self .parent_event .container .stop ()
57+
58+ self .parent_ptr .tear_down (self .parent_event )
59+
60+
3661class Send (coreclient .CoreClient ):
3762 """ Proton reactive API python sender client
3863
@@ -207,6 +232,21 @@ def prepare_message(self):
207232
208233 return msg
209234
235+ def schedule_timeout (self , event ):
236+ """
237+ Cancels an existing timeout (if one exists).
238+ Next it schedules a new timeout if "--timeout" provided.
239+ :param event: Reactor event
240+ :type event: proton.Event
241+ """
242+ # Cancels an existing timeout
243+ if self .timeout :
244+ self .timeout .cancel ()
245+
246+ # Schedules a new timeout if --timeout provided.
247+ if self .opts .timeout :
248+ self .timeout = event .reactor .schedule (self .opts .timeout , Timeout (self , event ))
249+
210250 def send_message (self ):
211251 """ sends a message """
212252 # close the connection if nothing to send
@@ -221,6 +261,9 @@ def send_message(self):
221261
222262 self .print_message (self .msg )
223263
264+ # set up a new timeout (if one provided)
265+ self .schedule_timeout (self .event )
266+
224267 if self .opts .log_stats == 'endpoints' :
225268 utils .dump_event (self .event )
226269
@@ -252,6 +295,9 @@ def on_start(self, event):
252295 else :
253296 event .container .create_sender (self .url , options = self .link_opts )
254297
298+ # Schedule a timeout (if needed)
299+ self .schedule_timeout (event )
300+
255301 def on_sendable (self , event ):
256302 """
257303 called when sending can proceed
@@ -401,6 +447,9 @@ def transaction_process(self, event):
401447
402448 self .print_message (msg )
403449
450+ # Schedule a timeout (if needed)
451+ self .schedule_timeout (event )
452+
404453 if self .opts .duration != 0 and self .opts .duration_mode == 'after-send' :
405454 utils .sleep4next (self .start_tm , self .msg_total_cnt , self .opts .duration ,
406455 self .msg_processed_cnt + self .current_batch )
@@ -451,6 +500,9 @@ def on_start(self, event):
451500 self .sender = event .container .create_sender (conn , self .url .path , options = self .link_opts )
452501 event .container .declare_transaction (conn , handler = self )
453502
503+ # Schedule a timeout (if needed)
504+ self .schedule_timeout (event )
505+
454506 def on_transaction_declared (self , event ):
455507 """
456508 called when the transaction is declared
0 commit comments