Skip to content

Commit d018dfb

Browse files
authored
Add new flux examples and add new links to README (#34)
1 parent 39edc9f commit d018dfb

2 files changed

Lines changed: 118 additions & 7 deletions

File tree

README.adoc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
= Asynchronous & reactive programming in Java
22
Marcos de la Calle Samaniego, @marcosDLCS <marcos.dlcs@gmail.com>
3-
v0.0.15, 2020-12-04
3+
v0.0.16, 2020-12-05
44
:toc:
55

66
++++
@@ -250,6 +250,11 @@ Project Loom is to intended to explore, incubate and deliver Java VM features an
250250
|Mark Paluch
251251
|2020
252252
|🇬🇧 📋
253+
254+
| https://www.javaadvent.com/2020/12/project-loom-and-structured-concurrency.html[Project Loom and structured concurrency, window=_blank]
255+
|Cay Horstmann
256+
|2020
257+
|🇬🇧 📋
253258
|===
254259

255260
=== ⚒️ Libraries and tools
@@ -327,15 +332,20 @@ Designed for both development and production time use
327332
|===
328333
|Name |Author |Platform |Lang
329334

330-
| https://learning.oreilly.com/library/view/reactive-spring-boot/9780136836421/[Reactive Spring, 2nd Edition, window=_blank]
331-
|Josh Long
332-
|Oreilly
335+
| https://www.youtube.com/playlist?list=PLL8woMHwr36EDxjUoCzboZjedsnhLP1j4[Java concurrency and multithreading (playlist), window=_blank]
336+
|Jakob Jenkov
337+
|Youtube
333338
|🇬🇧
334339

335340
| https://www.udemy.com/course/efficient-java-multithreading-with-executors/[Efficient Java Multithreading and Concurrency with Executors, window=_blank]
336341
|Arun Kumar
337342
|Udemy
338343
|🇬🇧
344+
345+
| https://learning.oreilly.com/library/view/reactive-spring-boot/9780136836421/[Reactive Spring, 2nd Edition, window=_blank]
346+
|Josh Long
347+
|Oreilly
348+
|🇬🇧
339349
|===
340350

341351
== 🚶‍♀️ Who to follow?

examples/00-reactor-operators/project-reactor-operators/src/test/java/es/codeurjc/arpj/FluxTest.java

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import org.junit.jupiter.api.Test;
66
import reactor.core.publisher.Flux;
77
import reactor.core.publisher.Mono;
8+
import reactor.core.scheduler.Scheduler;
9+
import reactor.core.scheduler.Schedulers;
810
import reactor.test.StepVerifier;
911
import reactor.util.function.Tuple2;
1012

@@ -14,6 +16,7 @@
1416
import java.util.List;
1517
import java.util.Map;
1618
import java.util.Random;
19+
import java.util.function.Function;
1720
import java.util.stream.Stream;
1821

1922
import static es.codeurjc.arpj.TestUtils.printSectionLine;
@@ -91,10 +94,10 @@ void flux_from_test() {
9194
}
9295

9396
@Test
94-
@DisplayName("Test 03: Flux transform")
95-
void flux_transform() {
97+
@DisplayName("Test 03: Flux map and cast")
98+
void flux_map_cast() {
9699

97-
System.out.println("Test 03: Flux transform");
100+
System.out.println("Test 03: Flux map and cast");
98101
printTestLine();
99102

100103
final String[] strings = {"one", "two", "three", "four", "five"};
@@ -310,4 +313,102 @@ void flux_aggregate_reduce() {
310313
.expectComplete()
311314
.verify();
312315
}
316+
317+
@Test
318+
@DisplayName("Test 08: Transform")
319+
void flux_transform() {
320+
321+
System.out.println("Test 08: Transform");
322+
printTestLine();
323+
324+
final Scheduler myParallel = Schedulers.newParallel("my-parallel", 2);
325+
326+
final Function<Flux<String>, Flux<Integer>> myTransformation =
327+
flux -> flux.publishOn(myParallel).map(String::length);
328+
329+
final String[] strings = {"one", "two", "three", "four", "five"};
330+
final Flux<Integer> mapToLength = Flux.fromArray(strings).transform(myTransformation).log();
331+
332+
StepVerifier
333+
.create(mapToLength)
334+
.expectNextCount(5)
335+
.expectComplete()
336+
.verify();
337+
}
338+
339+
@Test
340+
@DisplayName("Test 09: Concat map")
341+
void flux_concat_map() {
342+
343+
System.out.println("Test 09: Concat map");
344+
printTestLine();
345+
346+
final Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
347+
348+
final Flux<Integer> numbers = Flux.range(0, 10)
349+
.window(2)
350+
.concatMap(g -> g.flatMap(this::processing).subscribeOn(myParallel))
351+
.log();
352+
353+
StepVerifier
354+
.create(numbers)
355+
.expectNextCount(10)
356+
.expectComplete()
357+
.verify();
358+
359+
printTestLine();
360+
361+
final Flux<Integer> numbers2 = Flux.range(0, 10)
362+
.window(2)
363+
.flatMap(g -> g.flatMap(this::processing).subscribeOn(myParallel))
364+
.log();
365+
366+
StepVerifier
367+
.create(numbers2)
368+
.expectNextCount(10)
369+
.expectComplete()
370+
.verify();
371+
}
372+
373+
private Mono<Integer> processing(final Integer input) {
374+
375+
final var random = new Random();
376+
377+
final var millis = random.nextInt(10) * 100L;
378+
379+
System.out.println("Sleeping [" + input + "] " + millis + " milliseconds... @ "
380+
+ Thread.currentThread().getName());
381+
382+
try {
383+
Thread.sleep(millis);
384+
} catch (InterruptedException e) {
385+
e.printStackTrace();
386+
}
387+
388+
return Mono.just(input);
389+
}
390+
391+
@Test
392+
@DisplayName("Test 10: Handle")
393+
void flux_handle() {
394+
395+
System.out.println("Test 10: Handle");
396+
printTestLine();
397+
398+
final Flux<Integer> numbers = Flux.range(0, 10)
399+
.handle((n, s) -> {
400+
if (n % 2 == 0) {
401+
System.out.println("Even!!! -> " + n);
402+
s.next(n);
403+
} else {
404+
s.next(0);
405+
}
406+
}).cast(Integer.class).log();
407+
408+
StepVerifier
409+
.create(numbers)
410+
.expectNextCount(10)
411+
.expectComplete()
412+
.verify();
413+
}
313414
}

0 commit comments

Comments
 (0)