Skip to content

Commit 7bfb16e

Browse files
committed
Create Kafka partitions and able to access which partition event is published to
1 parent 608f877 commit 7bfb16e

5 files changed

Lines changed: 208 additions & 105 deletions

File tree

backend/docker-compose.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ services:
5151
# Required for a single node cluster
5252
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
5353

54-
5554
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
5655
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
5756
KAFKA_LOG_RETENTION_HOURS: 168

backend/kafka/consumer.go

Lines changed: 168 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,23 @@ package kafka
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"fmt"
68
"log"
7-
"encoding/json"
9+
"os"
10+
"os/signal"
11+
"sync"
12+
"syscall"
13+
814
"github.com/IBM/sarama"
915
"github.com/Leo7Deng/ChatApp/models"
1016
"github.com/Leo7Deng/ChatApp/websockets"
17+
// "github.com/gocql/gocql"
1118
)
1219

20+
var hub *websockets.Hub
21+
1322
// StartConsumer continuously logs incoming messages
1423
func StartConsumer(ctx context.Context) {
1524
config := sarama.NewConfig()
@@ -46,38 +55,180 @@ func StartConsumer(ctx context.Context) {
4655
}
4756
}
4857

49-
func WebsocketConsumer(ctx context.Context, hub *websockets.Hub) {
58+
func WebsocketConsumer(ctx context.Context, websocketHub *websockets.Hub) {
59+
hub = websocketHub
60+
keepRunning := true
5061
config := sarama.NewConfig()
5162
config.Consumer.Return.Errors = true
63+
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
5264

53-
client, err := sarama.NewConsumer(brokers, config)
54-
if err != nil {
55-
log.Fatalf("Failed to start Kafka consumer: %v", err)
65+
66+
consumer := Consumer{
67+
ready: make(chan bool),
5668
}
57-
defer client.Close()
5869

59-
partitionConsumer, err := client.ConsumePartition(topic, 0, sarama.OffsetNewest)
70+
ctx, cancel := context.WithCancel(context.Background())
71+
client, err := sarama.NewConsumerGroup(brokers, "websocket-group", config)
6072
if err != nil {
61-
log.Fatalf("Failed to consume Kafka topic: %v", err)
73+
log.Panicf("Error creating consumer group client: %v", err)
6274
}
63-
defer partitionConsumer.Close()
6475

65-
fmt.Println("Kafka Consumer started...")
76+
consumptionIsPaused := false
77+
wg := &sync.WaitGroup{}
78+
wg.Add(1)
79+
go func() {
80+
defer wg.Done()
81+
for {
82+
err := client.Consume(ctx, []string{topic}, &consumer)
83+
if err != nil {
84+
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
85+
return
86+
}
87+
log.Printf("Error from consumer: %v", err)
88+
}
89+
90+
// If context is canceled, stop the consumer
91+
if ctx.Err() != nil {
92+
return
93+
}
94+
95+
// Reset readiness so we wait for the next session
96+
consumer.ready = make(chan bool)
97+
}
98+
}()
99+
100+
101+
<-consumer.ready // Await till the consumer has been set up
102+
log.Println("Sarama consumer up and running!...")
103+
104+
sigusr1 := make(chan os.Signal, 1)
105+
signal.Notify(sigusr1, syscall.SIGUSR1)
106+
107+
sigterm := make(chan os.Signal, 1)
108+
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
109+
110+
for keepRunning {
111+
select {
112+
case <-ctx.Done():
113+
log.Println("terminating: context cancelled")
114+
keepRunning = false
115+
case <-sigterm:
116+
log.Println("terminating: via signal")
117+
keepRunning = false
118+
case <-sigusr1:
119+
toggleConsumptionFlow(client, &consumptionIsPaused)
120+
}
121+
}
122+
cancel()
123+
wg.Wait()
124+
if err = client.Close(); err != nil {
125+
log.Panicf("Error closing client: %v", err)
126+
}
127+
}
128+
129+
func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
130+
if *isPaused {
131+
client.ResumeAll()
132+
log.Println("Resuming consumption")
133+
} else {
134+
client.PauseAll()
135+
log.Println("Pausing consumption")
136+
}
66137

138+
*isPaused = !*isPaused
139+
}
140+
141+
// Consumer represents a Sarama consumer group consumer
142+
type Consumer struct {
143+
ready chan bool
144+
}
145+
146+
// Setup is run at the beginning of a new session, before ConsumeClaim
147+
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
148+
// Mark the consumer as ready
149+
close(consumer.ready)
150+
return nil
151+
}
152+
153+
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
154+
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
155+
return nil
156+
}
157+
158+
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
159+
// Once the Messages() channel is closed, the Handler must finish its processing
160+
// loop and exit.
161+
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
162+
// NOTE:
163+
// Do not move the code below to a goroutine.
164+
// The `ConsumeClaim` itself is called within a goroutine, see:
165+
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
67166
for {
68167
select {
69-
case msg := <-partitionConsumer.Messages():
168+
case message, ok := <-claim.Messages():
169+
if !ok {
170+
log.Printf("message channel was closed")
171+
return nil
172+
}
173+
partition, offset := message.Partition, message.Offset
174+
log.Printf("Kafka websocket consumer: %s | Partition: %d | Offset: %d\n", message.Value, partition, offset)
70175
var websocketMessage models.WebsocketMessage
71-
fmt.Printf("Kafka consumer viewed: %s\n", string(msg.Value))
72-
err := json.Unmarshal(msg.Value, &websocketMessage)
176+
err := json.Unmarshal(message.Value, &websocketMessage)
73177
if err != nil {
74178
fmt.Printf("Failed to unmarshal message: %v\n", err)
75179
}
76180
websocketMessage.Origin = "server"
77181
hub.SendWebsocketMessage(websocketMessage)
78-
case <-ctx.Done():
79-
fmt.Println("Consumer shutting down...")
80-
return
182+
session.MarkMessage(message, "")
183+
// Should return when `session.Context()` is done.
184+
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
185+
// https://github.com/IBM/sarama/issues/1192
186+
case <-session.Context().Done():
187+
return nil
81188
}
82189
}
83-
}
190+
}
191+
192+
193+
194+
195+
196+
197+
198+
199+
200+
201+
202+
203+
204+
205+
// //////////////////
206+
// group, err := sarama.NewConsumerGroup(brokers, "websocket-group", config)
207+
// if err != nil {
208+
// log.Fatalf("Failed to start Kafka consumer group: %v", err)
209+
// }
210+
// defer group.Close()
211+
212+
// fmt.Println("Kafka Consumer started...")
213+
214+
// for {
215+
// select {
216+
// case msg := <-partitionConsumer.Messages():
217+
// var websocketMessage models.WebsocketMessage
218+
// fmt.Printf("Kafka consumer viewed: %s\n", string(msg.Value))
219+
// err := json.Unmarshal(msg.Value, &websocketMessage)
220+
// if err != nil {
221+
// fmt.Printf("Failed to unmarshal message: %v\n", err)
222+
// }
223+
// websocketMessage.Origin = "server"
224+
// hub.SendWebsocketMessage(websocketMessage)
225+
// case <-ctx.Done():
226+
// fmt.Println("Consumer shutting down...")
227+
// return
228+
// }
229+
// }
230+
// }
231+
232+
// func CassandraConsumer(ctx context.Context, cassandraSession *gocql.Session) {
233+
234+
// }

backend/kafka/kafka.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,23 @@ import (
44
"context"
55
"sync"
66
"github.com/Leo7Deng/ChatApp/websockets"
7+
"github.com/gocql/gocql"
78
)
89

910
// InitKafka starts the Kafka producer and consumer as goroutines
10-
func InitKafka(ctx context.Context, wg *sync.WaitGroup, hub *websockets.Hub) {
11+
func InitKafka(ctx context.Context, wg *sync.WaitGroup, hub *websockets.Hub, cassandraSession *gocql.Session) {
1112
wg.Add(2)
1213

1314
go func() {
1415
defer wg.Done()
15-
WebsocketConsumer(ctx, hub) // Websocket consumer
16+
WebsocketConsumer(ctx, hub)
1617
}()
1718

19+
// go func() {
20+
// defer wg.Done()
21+
// CassandraConsumer(ctx, cassandraSession)
22+
// }()
23+
1824
go func() {
1925
defer wg.Done()
2026
WebsocketProducer(ctx, hub)

backend/kafka/producer.go

Lines changed: 31 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"encoding/json"
66
"fmt"
77
"log"
8-
"time"
98

109
"github.com/IBM/sarama"
1110
"github.com/Leo7Deng/ChatApp/models"
1211
"github.com/Leo7Deng/ChatApp/websockets"
1312
)
1413

1514
var topic = "chat"
15+
var partitions = int32(3)
1616
var brokers = []string{"kafka:9093"}
1717
var kafkaProducer sarama.SyncProducer
1818

@@ -29,92 +29,40 @@ func WebsocketProducer(ctx context.Context, hub *websockets.Hub) {
2929

3030
fmt.Println("Kafka Websocket Producer started...")
3131

32-
for {
33-
select {
34-
case message := <-hub.Broadcast:
35-
var websocketMessage models.WebsocketMessage
36-
websocketMessage.Message = &models.Message{}
37-
websocketMessage.Circle = &models.Circle{}
38-
log.Printf("Client sent over websocket: %s\n", message)
39-
err := json.Unmarshal(message, &websocketMessage)
40-
if err != nil {
41-
log.Println("Error decoding JSON:", err)
42-
break
43-
}
44-
if websocketMessage.Origin == "server" {
45-
continue
46-
}
47-
switch websocketMessage.Type {
48-
case "message":
49-
if websocketMessage.Message != nil {
50-
log.Printf("New Message: %s from %s\n", websocketMessage.Message.Content, websocketMessage.Message.AuthorID)
51-
msg := &sarama.ProducerMessage{
52-
Topic: topic,
53-
Value: sarama.StringEncoder(message),
54-
}
55-
_, _, err = kafkaProducer.SendMessage(msg)
56-
if err != nil {
57-
fmt.Printf("Failed to send message: %v\n", err)
58-
} else {
59-
fmt.Println("Produced message successfully!")
60-
}
32+
for message := range hub.Broadcast {
33+
var websocketMessage models.WebsocketMessage
34+
websocketMessage.Message = &models.Message{}
35+
websocketMessage.Circle = &models.Circle{}
36+
log.Printf("Client sent over websocket: %s\n", message)
37+
err := json.Unmarshal(message, &websocketMessage)
38+
if (err != nil) {
39+
log.Println("Error decoding JSON:", err)
40+
continue
41+
}
42+
if websocketMessage.Origin == "server" {
43+
continue
44+
}
45+
switch websocketMessage.Type {
46+
case "message":
47+
if websocketMessage.Message != nil {
48+
// log.Printf("New Message: %s from %s\n", websocketMessage.Message.Content, websocketMessage.Message.AuthorID)
49+
msg := &sarama.ProducerMessage{
50+
Topic: topic,
51+
Value: sarama.StringEncoder(message),
6152
}
62-
case "circle":
63-
if websocketMessage.Circle != nil {
64-
log.Printf("Circle %s %s\n", websocketMessage.Action, websocketMessage.Circle.Name)
53+
partition, offset, err := kafkaProducer.SendMessage(msg)
54+
if err != nil {
55+
fmt.Printf("Failed to send message: %v\n", err)
56+
} else {
57+
fmt.Printf("Websocket producer sent message to partition %d at offset %d\n", partition, offset)
6558
}
66-
default:
67-
log.Println("Unknown message type:", websocketMessage.Type)
6859
}
69-
}
70-
71-
}
72-
}
73-
74-
// TestProducer sends a message to Kafka every 5 seconds
75-
func TestProducer(ctx context.Context) {
76-
config := sarama.NewConfig()
77-
config.Producer.Return.Successes = true
78-
79-
var err error
80-
kafkaProducer, err = sarama.NewSyncProducer(brokers, config)
81-
if err != nil {
82-
log.Fatalf("Failed to start Kafka producer: %v", err)
83-
}
84-
defer kafkaProducer.Close()
85-
86-
fmt.Println("Kafka Producer started...")
87-
88-
for {
89-
select {
90-
case <-ctx.Done():
91-
fmt.Println("Producer shutting down...")
92-
return
93-
default:
94-
message := models.Message{
95-
CircleID: "1",
96-
Content: "Hello from server",
97-
CreatedAt: time.Now().String(),
98-
AuthorID: "1",
99-
}
100-
JsonMessage, err := json.Marshal(message)
101-
if err != nil {
102-
fmt.Printf("Failed to marshal message: %v\n", err)
60+
case "circle":
61+
if websocketMessage.Circle != nil {
62+
log.Printf("Circle %s %s\n", websocketMessage.Action, websocketMessage.Circle.Name)
10363
}
104-
105-
msg := &sarama.ProducerMessage{
106-
Topic: topic,
107-
Value: sarama.StringEncoder(JsonMessage),
108-
}
109-
110-
_, _, err = kafkaProducer.SendMessage(msg)
111-
if err != nil {
112-
fmt.Printf("Failed to send message: %v\n", err)
113-
} else {
114-
fmt.Println("Produced message successfully!")
115-
}
116-
117-
time.Sleep(5 * time.Second)
64+
default:
65+
log.Println("Unknown message type:", websocketMessage.Type)
11866
}
11967
}
12068
}

0 commit comments

Comments
 (0)