2020 */
2121package eu .openanalytics .containerproxy .backend .dispatcher .proxysharing ;
2222
23+ import com .github .benmanes .caffeine .cache .Cache ;
24+ import com .github .benmanes .caffeine .cache .Caffeine ;
25+ import com .github .benmanes .caffeine .cache .Scheduler ;
2326import eu .openanalytics .containerproxy .backend .dispatcher .proxysharing .store .DelegateProxy ;
27+ import eu .openanalytics .containerproxy .backend .dispatcher .proxysharing .store .DelegateProxyStatus ;
28+ import eu .openanalytics .containerproxy .event .NewProxyEvent ;
2429import eu .openanalytics .containerproxy .model .runtime .Proxy ;
2530import eu .openanalytics .containerproxy .model .runtime .runtimevalues .BackendContainerName ;
2631import eu .openanalytics .containerproxy .model .runtime .runtimevalues .BackendContainerNameKey ;
3035import io .micrometer .core .instrument .Tags ;
3136import io .micrometer .core .instrument .search .MeterNotFoundException ;
3237import org .springframework .beans .factory .annotation .Autowired ;
38+ import org .springframework .context .event .EventListener ;
3339
3440import javax .annotation .PostConstruct ;
3541import javax .inject .Inject ;
4046import java .util .Map ;
4147import java .util .Timer ;
4248import java .util .TimerTask ;
49+ import java .util .concurrent .TimeUnit ;
4350import java .util .function .ToDoubleFunction ;
4451import java .util .stream .Collectors ;
4552
@@ -56,6 +63,16 @@ public class ProxySharingMicrometer implements IStatCollector {
5663 @ Autowired (required = false )
5764 private List <ProxySharingScaler > proxySharingScalers = new ArrayList <>();
5865
66+ private final List <String > specIds = new ArrayList <>();
67+
68+ private static final Map <DelegateProxyStatus , Integer > PROXY_STATUS_TO_INTEGER = Map .of (
69+ DelegateProxyStatus .Pending , 1 ,
70+ DelegateProxyStatus .Available , 10 ,
71+ DelegateProxyStatus .ToRemove , 20
72+ );
73+
74+ private Cache <String , String > recentProxies ;
75+
5976 /**
6077 * Wraps a function that returns an Integer into a function that returns a double.
6178 * When the provided Integer is null, the resulting function returns Double.NaN.
@@ -74,8 +91,13 @@ private static <T> ToDoubleFunction<T> wrapHandleNull(ToLongFunction<T> producer
7491
7592 @ PostConstruct
7693 public void init () {
94+ recentProxies = Caffeine .newBuilder ()
95+ .scheduler (Scheduler .systemScheduler ())
96+ .expireAfterWrite (2 , TimeUnit .MINUTES )
97+ .build ();
7798 for (ProxySharingDispatcher dispatcher : proxySharingDispatchers ) {
7899 String specId = dispatcher .getSpec ().getId ();
100+ specIds .add (specId );
79101 registry .timer ("seats_wait_time" , "spec.id" , specId );
80102 }
81103 for (ProxySharingScaler scaler : proxySharingScalers ) {
@@ -96,16 +118,40 @@ public void registerSeatWaitTime(String specId, Duration time) {
96118 registry .timer ("seats_wait_time" , "spec.id" , specId ).record (time );
97119 }
98120
121+ @ EventListener
122+ public void onNewProxyEvent (NewProxyEvent event ) {
123+ if (event .getUserId () != null || event .getBackendContainerName () == null ) {
124+ return ;
125+ }
126+ if (specIds .contains (event .getSpecId ())) {
127+ recentProxies .put (event .getProxyId (), event .getProxyId ());
128+ registry .gauge ("delegate_app_info" ,
129+ Tags .of (
130+ "spec.id" , event .getSpecId (),
131+ "proxy.id" , event .getProxyId (),
132+ "proxy.created.timestamp" , Long .toString (event .getCreatedTimestamp ()),
133+ "resource.id" , event .getBackendContainerName ().getName (),
134+ "proxy.namespace" , event .getBackendContainerName ().getNamespace ()),
135+ PROXY_STATUS_TO_INTEGER .get (DelegateProxyStatus .Pending )
136+ );
137+ }
138+ }
139+
99140 private void updateDelegateAppInfo () {
100141 Map <String , Gauge > existingGauges = getDelegateAppInfoGauges ();
101142 for (ProxySharingScaler scaler : proxySharingScalers ) {
102143 String specId = scaler .getSpec ().getId ();
103144 for (DelegateProxy delegateProxy : scaler .getAllDelegateProxies ()) {
104145 Proxy proxy = delegateProxy .getProxy ();
105- if (existingGauges .remove (proxy .getId ()) != null ) {
106- // gauge already exists, no need to re-create
146+ recentProxies .put (proxy .getId (), proxy .getId ());
147+ Gauge existingGauge = existingGauges .remove (proxy .getId ());
148+ if (existingGauge != null && existingGauge .value () == PROXY_STATUS_TO_INTEGER .get (delegateProxy .getDelegateProxyStatus ())) {
149+ // gauge already exists and value is correct
107150 continue ;
108151 }
152+ if (existingGauge != null ) {
153+ registry .remove (existingGauge );
154+ }
109155
110156 BackendContainerName backendContainerName = getBackendContainerName (proxy );
111157 if (backendContainerName == null ) {
@@ -120,11 +166,29 @@ private void updateDelegateAppInfo() {
120166 "proxy.created.timestamp" , Long .toString (proxy .getCreatedTimestamp ()),
121167 "resource.id" , backendContainerName .getName (),
122168 "proxy.namespace" , backendContainerName .getNamespace ()),
123- 1
169+ PROXY_STATUS_TO_INTEGER . get ( delegateProxy . getDelegateProxyStatus ())
124170 );
125171 }
126172 }
127173 for (Gauge gauge : existingGauges .values ()) {
174+ String proxyId = gauge .getId ().getTag ("proxy.id" );
175+ if (proxyId != null && recentProxies .getIfPresent (proxyId ) != null ) {
176+ // this DelegateProxy has been removed, mark it as ToRemove
177+ // when the TTL of this proxy in recentProxies expires, the gauge will be removed
178+ // this waiting period allows the metric system to pick up that the proxy is being removed
179+ registry .remove (gauge );
180+ registry .gauge ("delegate_app_info" ,
181+ Tags .of (
182+ "spec.id" , gauge .getId ().getTag ("spec.id" ),
183+ "proxy.id" , gauge .getId ().getTag ("proxy.id" ),
184+ "proxy.created.timestamp" , gauge .getId ().getTag ("proxy.created.timestamp" ),
185+ "resource.id" , gauge .getId ().getTag ("resource.id" ),
186+ "proxy.namespace" , gauge .getId ().getTag ("proxy.namespace" )),
187+ PROXY_STATUS_TO_INTEGER .get (DelegateProxyStatus .ToRemove )
188+ );
189+ continue ;
190+ }
191+
128192 // the proxy of this gauge no longer exists -> remove the gauge
129193 registry .remove (gauge );
130194 }
0 commit comments