Skip to content

Commit 17d3508

Browse files
committed
kafka stream application example
1 parent a8509f0 commit 17d3508

5 files changed

Lines changed: 329 additions & 0 deletions

File tree

kafka-stream/kafka.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
## docs
2+
* [java example](https://developer.confluent.io/tutorials/creating-first-apache-kafka-streams-application/kstreams.html)
3+
https://github.com/confluentinc/kafka-streams-examples/blob/7.6.1-post/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java
4+
* [how to use single message transformation](https://docs.confluent.io/platform/current/connect/transforms/overview.html)
5+
* Kafka Stream
6+
* [Start with the DSL](https://github.com/confluentinc/examples)
7+
* [Documentation](http://docs.confluent.io/current/streams/)
8+
* [Run the application](http://docs.confluent.io/current/streams/developer-quide.html#running-a-kafka-streams-application)
9+
https://docs.confluent.io/current/tutorials/examples/microservices-orders/docs/index.html#tutorial-microservi
10+
KStreams library - github.com/michelin/kstreamplify
11+
12+
## python project
13+
> https://developer.confluent.io/get-started/python/
14+
15+
### start docker containers
16+
```sh
17+
# pip3 install kafka-python
18+
19+
docker-compose up -d
20+
# docker-compose down
21+
docker ps
22+
23+
export KAFKA_BROKER=localhost:9092
24+
export KAFKA_TOPIC=playground.topic
25+
```
26+
27+
### create topic
28+
```sh
29+
x-www-browser http://localhost:9021/
30+
31+
# docker-compose exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic playground.topic
32+
# docker-compose exec broker kafka-topics --list --bootstrap-server $KAFKA_BROKER
33+
```
34+
35+
### java stream processor
36+
```sh
37+
temp; cd kafka/stream-processor
38+
39+
KAFKA_TOPIC_STREAM_INPUT=stream-processor-input
40+
KAFKA_TOPIC_STREAM_OUTPUT=stream-processor-output
41+
42+
# mkdir -p src/main/java/io/confluent/developer
43+
# code src/main/java/io/confluent/developer/Util.java
44+
# code src/main/java/io/confluent/developer/KafkaStreamsApplication.java
45+
46+
# gradle -Dhttp.proxyHost=localhost -Dhttp.proxyPort=48157 -Dhttps.proxyHost=localhost -Dhttps.proxyPort=48157 build
47+
# gradle --refresh-dependencies build
48+
gradle shadowJar
49+
java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties
50+
```
51+
52+
## trainings education
53+
54+
* kafka streams
55+
56+
* [official kafka streams](https://kafka.apache.org/documentation/streams/)
57+
* [write application](https://kafka.apache.org/37/documentation/streams/tutorial)
58+
* [quarkus kafka stream](https://quarkus.io/guides/kafka-streams)
59+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
dependencies {
6+
classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
7+
}
8+
}
9+
10+
plugins {
11+
id "java"
12+
id "idea"
13+
id "eclipse"
14+
id "application"}
15+
16+
version = "0.0.1"
17+
sourceCompatibility = JavaVersion.VERSION_17
18+
targetCompatibility = JavaVersion.VERSION_17
19+
20+
application {
21+
mainClass.set("io.confluent.developer.KafkaStreamsApplication")
22+
}
23+
24+
repositories {
25+
mavenCentral()
26+
27+
maven {
28+
url "https://packages.confluent.io/maven"
29+
}
30+
31+
maven {
32+
url "https://mvnrepository.com/artifact"
33+
}
34+
}
35+
36+
apply plugin: "com.github.johnrengelman.shadow"
37+
38+
dependencies {
39+
implementation "org.slf4j:slf4j-simple:2.0.7"
40+
implementation 'org.apache.kafka:kafka-streams:3.4.0'
41+
implementation group: 'org.json', name: 'json', version: '20240303'
42+
implementation ('org.apache.kafka:kafka-clients') {
43+
version {
44+
strictly '3.4.0'
45+
}
46+
}
47+
implementation 'com.github.javafaker:javafaker:1.0.2'
48+
testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
49+
testImplementation "junit:junit:4.13.2"
50+
testImplementation 'org.hamcrest:hamcrest:2.2'
51+
}
52+
53+
test {
54+
testLogging {
55+
outputs.upToDateWhen { false }
56+
showStandardStreams = true
57+
exceptionFormat = "full"
58+
}
59+
}
60+
61+
jar {
62+
manifest {
63+
attributes(
64+
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
65+
"Main-Class": "io.confluent.developer.KafkaStreamsApplication"
66+
)
67+
}
68+
}
69+
70+
shadowJar {
71+
archivesBaseName = "creating-first-apache-kafka-streams-application-standalone"
72+
archiveClassifier = ''
73+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
application.id=stream-processor
2+
bootstrap.servers=localhost:9092
3+
4+
input.topic.name=stream-processor-input
5+
output.topic.name=stream-processor-output
6+
7+
key.serializer=org.apache.kafka.common.serialization.StringSerializer
8+
value.serializer=org.apache.kafka.common.serialization.StringSerializer
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.confluent.developer;
2+
3+
import org.apache.kafka.clients.admin.NewTopic;
4+
import org.apache.kafka.common.serialization.Serde;
5+
import org.apache.kafka.common.serialization.Serdes;
6+
import org.apache.kafka.streams.KafkaStreams;
7+
import org.apache.kafka.streams.StreamsBuilder;
8+
import org.apache.kafka.streams.Topology;
9+
import org.apache.kafka.streams.kstream.Consumed;
10+
import org.apache.kafka.streams.kstream.Produced;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import java.io.FileInputStream;
15+
import java.io.IOException;
16+
import java.io.InputStream;
17+
import java.util.Arrays;
18+
import java.util.Optional;
19+
import java.util.Properties;
20+
import java.util.concurrent.CountDownLatch;
21+
22+
import org.json.JSONArray;
23+
import org.json.JSONException;
24+
import org.json.JSONObject;
25+
26+
public class KafkaStreamsApplication {
27+
28+
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class);
29+
30+
static Topology buildTopology(String inputTopic, String outputTopic) {
31+
Serde<String> stringSerde = Serdes.String();
32+
33+
StreamsBuilder builder = new StreamsBuilder();
34+
35+
builder
36+
.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
37+
.peek((k,v) -> logger.info("Observed event: {}", v))
38+
.mapValues(s -> transformMessage(s))
39+
.peek((k,v) -> logger.info("Transformed event: {}", v))
40+
.to(outputTopic, Produced.with(stringSerde, stringSerde));
41+
42+
return builder.build();
43+
}
44+
45+
private static String transformMessage(String jsonContent) {
46+
try{
47+
JSONObject jsonObject = new JSONObject(jsonContent);
48+
removeEverySecondProperty(jsonObject);
49+
return jsonObject.toString();
50+
}catch(Exception ex){
51+
return jsonContent;
52+
}
53+
}
54+
55+
private static void removeEverySecondProperty(JSONObject jsonObject) {
56+
JSONArray keys = jsonObject.names();
57+
if (keys != null) {
58+
for (int i = 0; i < keys.length(); i++) {
59+
if (i % 2 != 0) {
60+
String key = keys.getString(i);
61+
jsonObject.remove(key);
62+
}
63+
}
64+
}
65+
}
66+
67+
68+
public static void main(String[] args) throws Exception {
69+
if (args.length < 1) {
70+
throw new IllegalArgumentException("Configuration file is expected as a first argument");
71+
}
72+
73+
Properties props = readProperties(args);
74+
75+
KafkaStreams kafkaStreams = new KafkaStreams(
76+
buildTopology(
77+
props.getProperty("input.topic.name"),
78+
props.getProperty("output.topic.name")
79+
),
80+
props);
81+
82+
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
83+
kafkaStreams.start();
84+
}
85+
86+
private static Properties readProperties(String[] args) throws IOException {
87+
Properties props = new Properties();
88+
try (InputStream inputStream = new FileInputStream(args[0])) {
89+
props.load(inputStream);
90+
}
91+
return props;
92+
}
93+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.confluent.developer;
2+
3+
import com.github.javafaker.Faker;
4+
import org.apache.kafka.clients.admin.AdminClient;
5+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
6+
import org.apache.kafka.clients.admin.NewTopic;
7+
import org.apache.kafka.clients.producer.KafkaProducer;
8+
import org.apache.kafka.clients.producer.Producer;
9+
import org.apache.kafka.clients.producer.ProducerRecord;
10+
import org.apache.kafka.common.errors.TopicExistsException;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import java.util.*;
15+
import java.util.concurrent.*;
16+
import java.util.stream.Collectors;
17+
18+
public class Util implements AutoCloseable {
19+
20+
private final Logger logger = LoggerFactory.getLogger(Util.class);
21+
private ExecutorService executorService = Executors.newFixedThreadPool(1);
22+
23+
public class Randomizer implements AutoCloseable, Runnable {
24+
private Properties props;
25+
private String topic;
26+
private Producer<String, String> producer;
27+
private boolean closed;
28+
29+
public Randomizer(Properties producerProps, String topic) {
30+
this.closed = false;
31+
this.topic = topic;
32+
this.props = producerProps;
33+
this.props.setProperty("client.id", "faker");
34+
}
35+
36+
public void run() {
37+
try (KafkaProducer producer = new KafkaProducer<String, String>(props)) {
38+
Faker faker = new Faker();
39+
while (!closed) {
40+
try {
41+
Object result = producer.send(new ProducerRecord<>(
42+
this.topic,
43+
faker.chuckNorris().fact())).get();
44+
Thread.sleep(5000);
45+
} catch (InterruptedException e) {
46+
}
47+
}
48+
} catch (Exception ex) {
49+
logger.error(ex.toString());
50+
}
51+
}
52+
public void close() {
53+
closed = true;
54+
}
55+
}
56+
57+
public Randomizer startNewRandomizer(Properties producerProps, String topic) {
58+
Randomizer rv = new Randomizer(producerProps, topic);
59+
executorService.submit(rv);
60+
return rv;
61+
}
62+
63+
public void createTopics(final Properties allProps, List<NewTopic> topics)
64+
throws InterruptedException, ExecutionException, TimeoutException {
65+
try (final AdminClient client = AdminClient.create(allProps)) {
66+
logger.info("Creating topics");
67+
68+
client.createTopics(topics).values().forEach( (topic, future) -> {
69+
try {
70+
future.get();
71+
} catch (Exception ex) {
72+
logger.info(ex.toString());
73+
}
74+
});
75+
76+
Collection<String> topicNames = topics
77+
.stream()
78+
.map(t -> t.name())
79+
.collect(Collectors.toCollection(LinkedList::new));
80+
81+
logger.info("Asking cluster for topic descriptions");
82+
client
83+
.describeTopics(topicNames)
84+
.allTopicNames()
85+
.get(10, TimeUnit.SECONDS)
86+
.forEach((name, description) -> logger.info("Topic Description: {}", description.toString()));
87+
}
88+
}
89+
90+
public void close() {
91+
if (executorService != null) {
92+
executorService.shutdownNow();
93+
executorService = null;
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)