1010import logging
1111import os
1212import queue
13+ import shutil
1314import subprocess
1415import threading
1516import time
@@ -61,33 +62,32 @@ def __init__(
6162 do_transfer : bool = True ,
6263 remove_files : bool = False ,
6364 required_substrings_for_removal : List [str ] = [],
65+ substrings_blacklist : dict [str , list [str ]] = {},
6466 notify : bool = True ,
6567 end_time : datetime | None = None ,
6668 ):
6769 super ().__init__ ()
6870 self ._basepath = basepath_local .absolute ()
6971 self ._basepath_remote = basepath_remote
7072 self ._rsync_module = rsync_module
73+ self ._server_url = server_url
74+ self ._stop_callback = stop_callback
75+ self ._local = local
7176 self ._do_transfer = do_transfer
7277 self ._remove_files = remove_files
7378 self ._required_substrings_for_removal = required_substrings_for_removal
74- self ._stop_callback = stop_callback
75- self ._local = local
76- self ._server_url = server_url
79+ self ._substrings_blacklist = substrings_blacklist
7780 self ._notify = notify
78- self ._finalised = False
7981 self ._end_time = end_time
80- self ._finalising = False
8182
8283 self ._skipped_files : List [Path ] = []
8384
8485 # Set rsync destination
85- if local :
86- self ._remote = str (basepath_remote )
87- else :
88- self ._remote = (
89- f"{ server_url .hostname } ::{ self ._rsync_module } /{ basepath_remote } /"
90- )
86+ self ._remote = (
87+ str (basepath_remote )
88+ if local
89+ else f"{ server_url .hostname } ::{ self ._rsync_module } /{ basepath_remote } /"
90+ )
9191 logger .debug (f"rsync destination path set to { self ._remote } " )
9292
9393 # For local tests you can use something along the lines of
@@ -105,9 +105,24 @@ def __init__(
105105 )
106106 self ._stopping = False
107107 self ._halt_thread = False
108+ self ._finalising = False
109+ self ._finalised = False
110+
111+ @property
112+ def status (self ) -> str :
113+ if self ._stopping :
114+ if self .thread .is_alive ():
115+ return "stopping"
116+ else :
117+ return "finished"
118+ else :
119+ if self .thread .is_alive ():
120+ return "running"
121+ else :
122+ return "ready"
108123
109124 def __repr__ (self ) -> str :
110- return f"<RSyncer ({ self ._basepath } → { self ._remote } ) [ { self . status } ] "
125+ return f"<RSyncer ({ self ._basepath } → { self ._remote } )> "
111126
112127 @classmethod
113128 def from_rsyncer (cls , rsyncer : RSyncer , ** kwargs ):
@@ -133,19 +148,6 @@ def from_rsyncer(cls, rsyncer: RSyncer, **kwargs):
133148 notify = kwarguments_from_rsyncer ["notify" ],
134149 )
135150
136- @property
137- def status (self ) -> str :
138- if self ._stopping :
139- if self .thread .is_alive ():
140- return "stopping"
141- else :
142- return "finished"
143- else :
144- if self .thread .is_alive ():
145- return "running"
146- else :
147- return "ready"
148-
149151 def notify (self , * args , secondary : bool = False , ** kwargs ) -> None :
150152 if self ._notify :
151153 super ().notify (* args , secondary = secondary , ** kwargs )
@@ -169,7 +171,7 @@ def restart(self):
169171 self .start ()
170172
171173 def stop (self ):
172- logger .debug ( " RSync thread stop requested " )
174+ logger .info ( f"Stopping RSync thread { self } " )
173175 self ._stopping = True
174176 if self .thread .is_alive ():
175177 logger .info ("Waiting for ongoing transfers to complete..." )
@@ -179,7 +181,7 @@ def stop(self):
179181 if self .thread .is_alive ():
180182 self .queue .put (None )
181183 self .thread .join ()
182- logger .debug ( "RSync thread successfully stopped" )
184+ logger .info ( f "RSync thread { self } successfully stopped" )
183185
184186 def request_stop (self ):
185187 self ._stopping = True
@@ -195,18 +197,52 @@ def finalise(
195197 self ._notify = False
196198 self ._end_time = None
197199 self ._finalising = True
200+
201+ # Perform recursive cleanup on current directory
202+ logger .info (f"Starting file cleanup for RSync thread { self } " )
203+ files_to_transfer : list [Path ] = []
204+
205+ def recursive_cleanup (dirpath : str | Path ):
206+ for entry in os .scandir (dirpath ):
207+ if entry .is_dir ():
208+ # Recursively delete directories with blacklisted substrings
209+ if any (
210+ pattern in entry .name
211+ for pattern in self ._substrings_blacklist .get ("directories" , [])
212+ ):
213+ logger .debug (f"Deleting blacklisted directory { entry .path } " )
214+ shutil .rmtree (entry .path )
215+ continue
216+ # Recursively search in whitelisted ones
217+ recursive_cleanup (entry .path )
218+ elif entry .is_file ():
219+ # Delete blacklisted files
220+ if any (
221+ pattern in entry .name
222+ for pattern in self ._substrings_blacklist .get ("files" , [])
223+ ):
224+ logger .debug (f"Deleting blacklisted file { entry .path } " )
225+ Path (entry .path ).unlink ()
226+ continue
227+ # Append others for transfer
228+ files_to_transfer .append (Path (entry .path ))
229+
230+ recursive_cleanup (self ._basepath )
231+ logger .debug (f"Number of files to transfer: { len (files_to_transfer )} " )
232+
198233 if thread :
199234 self .thread = threading .Thread (
200235 name = f"RSync finalisation { self ._basepath } :{ self ._remote } " ,
201236 target = self ._process ,
202237 daemon = True ,
203238 )
204- for f in self . _basepath . glob ( "**/*" ) :
239+ for f in files_to_transfer :
205240 self .queue .put (f )
206241 self .stop ()
207242 else :
208- self ._transfer (list ( self . _basepath . glob ( "**/*" )) )
243+ self ._transfer (files_to_transfer )
209244 self ._finalised = True
245+ logger .info (f"File cleanup for RSync thread { self } successfully completed" )
210246 if callback :
211247 callback ()
212248
@@ -221,7 +257,7 @@ def flush_skipped(self):
221257 self ._skipped_files = []
222258
223259 def _process (self ):
224- logger .info (" RSync thread starting " )
260+ logger .info (f"Starting main process loop for RSync thread { self } " )
225261 files_to_transfer : list [Path ]
226262 backoff = 0
227263 while not self ._halt_thread :
0 commit comments