11package io .split .engine .segments ;
22
3- import com .google .common .collect .Maps ;
4- import io .split .Spec ;
53import io .split .client .LocalhostSegmentChangeFetcher ;
64import io .split .client .JsonLocalhostSplitChangeFetcher ;
75import io .split .client .interceptors .FlagSetsFilter ;
119import io .split .engine .common .FetchOptions ;
1210import io .split .engine .experiments .*;
1311import io .split .storages .*;
12+ import io .split .telemetry .storage .TelemetryRuntimeProducer ;
1413import io .split .storages .memory .InMemoryCacheImp ;
1514import io .split .storages .memory .RuleBasedSegmentCacheInMemoryImp ;
1615import io .split .storages .memory .SegmentCacheInMemoryImpl ;
17- import io .split .telemetry .storage .InMemoryTelemetryStorage ;
18- import io .split .telemetry .storage .NoopTelemetryStorage ;
19- import io .split .telemetry .storage .TelemetryStorage ;
2016import org .junit .Assert ;
2117import org .junit .Before ;
2218import org .junit .Ignore ;
3228import java .lang .reflect .Modifier ;
3329import java .util .HashSet ;
3430import java .util .List ;
31+ import java .util .concurrent .ConcurrentHashMap ;
3532import java .util .concurrent .ConcurrentMap ;
3633import java .util .concurrent .ExecutorService ;
3734import java .util .concurrent .Executors ;
4744 */
4845public class SegmentSynchronizationTaskImpTest {
4946 private static final Logger _log = LoggerFactory .getLogger (SegmentSynchronizationTaskImpTest .class );
50- private static final TelemetryStorage TELEMETRY_STORAGE = Mockito .mock (InMemoryTelemetryStorage .class );
51- private static final TelemetryStorage TELEMETRY_STORAGE_NOOP = Mockito .mock (NoopTelemetryStorage .class );
47+ private static final TelemetryListener TELEMETRY_STORAGE = t -> {};
48+ private static final TelemetryListener TELEMETRY_STORAGE_NOOP = t -> {};
49+ private static final ExecutorFactory EXECUTOR_FACTORY = (tf , name , n ) -> java .util .concurrent .Executors .newScheduledThreadPool (n );
5250
5351 private AtomicReference <SegmentFetcher > fetcher1 = null ;
5452 private AtomicReference <SegmentFetcher > fetcher2 = null ;
@@ -65,7 +63,7 @@ public void works() {
6563
6664 SegmentChangeFetcher segmentChangeFetcher = Mockito .mock (SegmentChangeFetcher .class );
6765 final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp (segmentChangeFetcher , 1L , 1 , segmentCacheProducer ,
68- TELEMETRY_STORAGE , Mockito .mock (SplitCacheConsumer .class ), null , Mockito .mock (RuleBasedSegmentCache .class ));
66+ TELEMETRY_STORAGE , Mockito .mock (SplitCacheConsumer .class ), EXECUTOR_FACTORY , null , Mockito .mock (RuleBasedSegmentCache .class ));
6967
7068
7169 // create two tasks that will separately call segment and make sure
@@ -104,13 +102,13 @@ public void run() {
104102 @ Test
105103 public void testFetchAllAsynchronousAndGetFalse () throws NoSuchFieldException , IllegalAccessException {
106104 SegmentCacheProducer segmentCacheProducer = Mockito .mock (SegmentCacheProducer .class );
107- ConcurrentMap <String , SegmentFetcher > _segmentFetchers = Maps . newConcurrentMap ();
105+ ConcurrentMap <String , SegmentFetcher > _segmentFetchers = new ConcurrentHashMap <> ();
108106
109107 SegmentChangeFetcher segmentChangeFetcher = Mockito .mock (SegmentChangeFetcher .class );
110108 SegmentFetcherImp segmentFetcher = Mockito .mock (SegmentFetcherImp .class );
111109 _segmentFetchers .put ("SF" , segmentFetcher );
112110 final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp (segmentChangeFetcher , 1L , 1 ,
113- segmentCacheProducer , TELEMETRY_STORAGE , Mockito .mock (SplitCacheConsumer .class ), null , Mockito .mock (RuleBasedSegmentCache .class ));
111+ segmentCacheProducer , TELEMETRY_STORAGE , Mockito .mock (SplitCacheConsumer .class ), EXECUTOR_FACTORY , null , Mockito .mock (RuleBasedSegmentCache .class ));
114112 Mockito .when (segmentFetcher .runWhitCacheHeader ()).thenReturn (false );
115113 Mockito .when (segmentFetcher .fetch (Mockito .anyObject ())).thenReturn (false );
116114
@@ -130,11 +128,11 @@ public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, I
130128 public void testFetchAllAsynchronousAndGetTrue () throws NoSuchFieldException , IllegalAccessException {
131129 SegmentCacheProducer segmentCacheProducer = Mockito .mock (SegmentCacheProducer .class );
132130
133- ConcurrentMap <String , SegmentFetcher > _segmentFetchers = Maps . newConcurrentMap ();
131+ ConcurrentMap <String , SegmentFetcher > _segmentFetchers = new ConcurrentHashMap <> ();
134132 SegmentChangeFetcher segmentChangeFetcher = Mockito .mock (SegmentChangeFetcher .class );
135133 SegmentFetcherImp segmentFetcher = Mockito .mock (SegmentFetcherImp .class );
136134 final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp (segmentChangeFetcher , 1L , 1 , segmentCacheProducer ,
137- TELEMETRY_STORAGE , Mockito .mock (SplitCacheConsumer .class ), null , Mockito .mock (RuleBasedSegmentCache .class ));
135+ TELEMETRY_STORAGE , Mockito .mock (SplitCacheConsumer .class ), EXECUTOR_FACTORY , null , Mockito .mock (RuleBasedSegmentCache .class ));
138136
139137 // Before executing, we'll update the map of segmentFecthers via reflection.
140138 Field segmentFetchersForced = SegmentSynchronizationTaskImp .class .getDeclaredField ("_segmentFetchers" );
@@ -162,7 +160,7 @@ public void testLocalhostSegmentChangeFetcher() throws InterruptedException, Fil
162160 RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp ();
163161 RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser ();
164162
165- SplitFetcher splitFetcher = new SplitFetcherImp (splitChangeFetcher , splitParser , splitCacheProducer , TELEMETRY_STORAGE_NOOP , flagSetsFilter ,
163+ SplitFetcher splitFetcher = new SplitFetcherImp (splitChangeFetcher , splitParser , splitCacheProducer , Mockito . mock ( TelemetryRuntimeProducer . class ) , flagSetsFilter ,
166164 ruleBasedSegmentParser , ruleBasedSegmentCache );
167165
168166 SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask (splitFetcher , splitCacheProducer , 1000 , null );
@@ -175,7 +173,7 @@ public void testLocalhostSegmentChangeFetcher() throws InterruptedException, Fil
175173 SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl ();
176174
177175 SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp (segmentChangeFetcher , 1000 , 1 , segmentCacheProducer ,
178- TELEMETRY_STORAGE_NOOP , splitCacheProducer , null , ruleBasedSegmentCache );
176+ TELEMETRY_STORAGE_NOOP , splitCacheProducer , EXECUTOR_FACTORY , null , ruleBasedSegmentCache );
179177
180178 segmentSynchronizationTaskImp .start ();
181179
0 commit comments