3030import eu .openanalytics .containerproxy .event .PendingProxyEvent ;
3131import eu .openanalytics .containerproxy .event .ProxyStartFailedEvent ;
3232import eu .openanalytics .containerproxy .event .ProxyStopEvent ;
33+ import eu .openanalytics .containerproxy .event .RemoveDelegateProxiesEvent ;
3334import eu .openanalytics .containerproxy .event .SeatAvailableEvent ;
3435import eu .openanalytics .containerproxy .event .SeatClaimedEvent ;
3536import eu .openanalytics .containerproxy .event .SeatReleasedEvent ;
@@ -195,6 +196,26 @@ public void onProxyStartFailed(ProxyStartFailedEvent proxyStartFailedEvent) {
195196 pendingDelegatingProxies .remove (proxyStartFailedEvent .getProxyId ());
196197 }
197198
199+ @ EventListener
200+ public void onRemoveDelegateProxiesEvent (RemoveDelegateProxiesEvent event ) {
201+ if ((event .getSpecId () != null && !Objects .equals (event .getSpecId (), proxySpec .getId ()))
202+ || !leaderService .isLeader ()
203+ || (event .getId () != null && delegateProxyStore .getDelegateProxy (event .getId ()) == null )
204+ ) {
205+ // only handle events for this spec
206+ return ;
207+ }
208+ if (event .getId () != null ) {
209+ // remove single proxy
210+ logger .info ("[{} {}] Received external request to remove DelegateProxy" , kv ("specId" , proxySpec .getId ()), kv ("delegateProxyId" , event .getId ()));
211+ globalEventLoop .schedule (() -> markDelegateProxyForRemoval (event .getId ()));
212+ } else {
213+ // remove all proxies
214+ logger .info ("[{}] Received external request to remove all DelegateProxies" , kv ("specId" , proxySpec .getId ()));
215+ globalEventLoop .schedule (this ::markAllDelegateProxiesForRemoval );
216+ }
217+ }
218+
198219 /**
199220 * Processes the SeatReleasedEvent, should only process one event a a time (i.e. using the event loop),
200221 * since it modifies the Delegateproxy.
@@ -246,6 +267,9 @@ private void processReleasedSeat(SeatReleasedEvent seatReleasedEvent) {
246267 private void markDelegateProxyForRemoval (String delegateProxyId ) {
247268 // this delegateProxy will be (completely) removed by the cleanup function, not by scale-down
248269 DelegateProxy delegateProxy = delegateProxyStore .getDelegateProxy (delegateProxyId );
270+ if (delegateProxy == null ) {
271+ return ;
272+ }
249273 Set <String > seatIds = delegateProxy .getSeatIds ();
250274 DelegateProxy .DelegateProxyBuilder delegateProxyBuilder = delegateProxy .toBuilder ()
251275 .delegateProxyStatus (DelegateProxyStatus .ToRemove );
@@ -267,6 +291,12 @@ private void markDelegateProxyForRemoval(String delegateProxyId) {
267291 delegateProxyStore .updateDelegateProxy (delegateProxyBuilder .build ());
268292 }
269293
294+ private void markAllDelegateProxiesForRemoval () {
295+ for (DelegateProxy delegateProxy : delegateProxyStore .getAllDelegateProxies ()) {
296+ markDelegateProxyForRemoval (delegateProxy .getProxy ().getId ());
297+ }
298+ }
299+
270300 private void reconcile () {
271301 long numPendingSeats = getNumPendingSeats ();
272302 long num = seatStore .getNumUnclaimedSeats () + numPendingSeats - pendingDelegatingProxies .size ();
0 commit comments