3535import java .util .Collections ;
3636import java .util .HashMap ;
3737import java .util .Map ;
38+ import java .util .Timer ;
39+ import java .util .TimerTask ;
3840
3941/**
4042 * Service which 1) keeps track of active proxies by listening for heartbeats (created by {@link HeartbeatService})
4345@ Service
4446public class ActiveProxiesService implements IHeartbeatProcessor {
4547
46- public static final String PROP_ENABLED = "proxy.heartbeat-enabled" ;
4748 public static final String PROP_RATE = "proxy.heartbeat-rate" ;
4849 public static final Long DEFAULT_RATE = 10000L ;
4950 public static final String PROP_TIMEOUT = "proxy.heartbeat-timeout" ;
@@ -53,8 +54,7 @@ public class ActiveProxiesService implements IHeartbeatProcessor {
5354
5455 private final Map <String , Long > proxyHeartbeats = Collections .synchronizedMap (new HashMap <>());
5556
56- private long cleanupInterval ;
57- private long heartbeatTimeout ;
57+ private long defaultHeartbeatTimeout ;
5858
5959 @ Inject
6060 private Environment environment ;
@@ -64,23 +64,14 @@ public class ActiveProxiesService implements IHeartbeatProcessor {
6464
6565 @ PostConstruct
6666 public void init () {
67- Boolean enabled = environment .getProperty (PROP_ENABLED , Boolean .class );
68-
69- if (enabled == null ) {
70- enabled = environment .getProperty (PROP_RATE ) != null || environment .getProperty (PROP_TIMEOUT ) != null ;
71- }
72-
73- cleanupInterval = 2 * environment .getProperty (PROP_RATE , Long .class , DEFAULT_RATE );
74- heartbeatTimeout = environment .getProperty (PROP_TIMEOUT , Long .class , DEFAULT_TIMEOUT );
75-
76- if (enabled ) {
77- Thread cleanupThread = new Thread (new InactiveProxyKiller (), InactiveProxyKiller .class .getSimpleName ());
78- cleanupThread .setDaemon (true );
79- cleanupThread .start ();
80- log .debug ("Releasing of inactive proxies enabled." );
81- } else {
82- log .debug ("Releasing of inactive proxies disabled." );
83- }
67+ long cleanupInterval = 2 * environment .getProperty (PROP_RATE , Long .class , DEFAULT_RATE );
68+ defaultHeartbeatTimeout = environment .getProperty (PROP_TIMEOUT , Long .class , DEFAULT_TIMEOUT );
69+ new Timer ().schedule (new TimerTask () {
70+ @ Override
71+ public void run () {
72+ performCleanup ();
73+ }
74+ }, cleanupInterval , cleanupInterval );
8475 }
8576
8677 @ Override
@@ -95,33 +86,45 @@ public Long getLastHeartBeat(String proxyId) {
9586 return proxyHeartbeats .get (proxyId );
9687 }
9788
98- private class InactiveProxyKiller implements Runnable {
99- @ Override
100- public void run () {
101- // TODO this could be replaced with a java Timer
102- while (true ) {
103- try {
104- long currentTimestamp = System .currentTimeMillis ();
105- for (Proxy proxy : proxyService .getProxies (null , true )) {
106- if (proxy .getStatus () != ProxyStatus .Up ) continue ;
107-
108- Long lastHeartbeat = proxyHeartbeats .get (proxy .getId ());
109- if (lastHeartbeat == null ) lastHeartbeat = proxy .getStartupTimestamp ();
110- long proxySilence = currentTimestamp - lastHeartbeat ;
111- if (proxySilence > heartbeatTimeout ) {
112- log .info (String .format ("Releasing inactive proxy [user: %s] [spec: %s] [id: %s] [silence: %dms]" , proxy .getUserId (), proxy .getSpec ().getId (), proxy .getId (), proxySilence ));
113- proxyHeartbeats .remove (proxy .getId ());
114- proxyService .stopProxy (proxy , true , true );
115- }
116- }
117- } catch (Throwable t ) {
118- log .error ("Error in " + this .getClass ().getSimpleName (), t );
119- }
120- try {
121- Thread .sleep (cleanupInterval );
122- } catch (InterruptedException e ) {
123- }
89+ private void performCleanup () {
90+ try {
91+ long currentTimestamp = System .currentTimeMillis ();
92+ for (Proxy proxy : proxyService .getProxies (null , true )) {
93+ checkAndReleaseProxy (currentTimestamp , proxy );
12494 }
95+ } catch (Throwable t ) {
96+ log .error ("Error in " + this .getClass ().getSimpleName (), t );
97+ }
98+ }
99+
100+ private void checkAndReleaseProxy (long currentTimestamp , Proxy proxy ) {
101+ if (proxy .getStatus () != ProxyStatus .Up ) {
102+ return ;
103+ }
104+
105+ Long heartbeatTimeout ;
106+ if (proxy .getSpec ().getHeartbeatTimeout () != null ) {
107+ heartbeatTimeout = proxy .getSpec ().getHeartbeatTimeout ();
108+ } else {
109+ heartbeatTimeout = defaultHeartbeatTimeout ;
110+ }
111+
112+ if (heartbeatTimeout <= 0 ) {
113+ // heartbeats disabled for this app (or globally)
114+ return ;
115+ }
116+
117+ Long lastHeartbeat = proxyHeartbeats .get (proxy .getId ());
118+ if (lastHeartbeat == null ) {
119+ lastHeartbeat = proxy .getStartupTimestamp ();
120+ }
121+
122+ long proxySilence = currentTimestamp - lastHeartbeat ;
123+ if (proxySilence > heartbeatTimeout ) {
124+ log .info (String .format ("Releasing inactive proxy [user: %s] [spec: %s] [id: %s] [silence: %dms]" , proxy .getUserId (), proxy .getSpec ().getId (), proxy .getId (), proxySilence ));
125+ proxyHeartbeats .remove (proxy .getId ());
126+ proxyService .stopProxy (proxy , true , true );
125127 }
126128 }
129+
127130}
0 commit comments