@@ -6,159 +6,95 @@ import (
66 "errors"
77 "fmt"
88 "log"
9- "os"
10- "os/signal"
119 "sync"
12- "syscall"
1310
1411 "github.com/IBM/sarama"
12+ "github.com/Leo7Deng/ChatApp/cassandra"
1513 "github.com/Leo7Deng/ChatApp/models"
1614 "github.com/Leo7Deng/ChatApp/websockets"
17- // "github.com/gocql/gocql"
15+ "github.com/gocql/gocql"
1816)
1917
2018var hub * websockets.Hub
19+ var cassandraSession * gocql.Session
2120
22- // StartConsumer continuously logs incoming messages
23- func StartConsumer (ctx context.Context ) {
24- config := sarama .NewConfig ()
25- config .Consumer .Return .Errors = true
26-
27- client , err := sarama .NewConsumer (brokers , config )
28- if err != nil {
29- log .Fatalf ("Failed to start Kafka consumer: %v" , err )
30- }
31- defer client .Close ()
32-
33- partitionConsumer , err := client .ConsumePartition (topic , 0 , sarama .OffsetNewest )
34- if err != nil {
35- log .Fatalf ("Failed to consume Kafka topic: %v" , err )
36- }
37- defer partitionConsumer .Close ()
38-
39- fmt .Println ("Kafka Consumer started..." )
40-
41- for {
42- select {
43- case msg := <- partitionConsumer .Messages ():
44- fmt .Printf ("Kafka message: %s\n " , string (msg .Value ))
45- var message models.Message
46- err := json .Unmarshal (msg .Value , & message )
47- if err != nil {
48- fmt .Printf ("Failed to unmarshal message: %v\n " , err )
49- }
21+ func WebsocketConsumer (ctx context.Context , websocketHub * websockets.Hub ) {
22+ hub = websocketHub
23+ groupID := "websocket-group"
24+ handler := & WebsocketConsumerHandler {hub : websocketHub }
25+ runConsumer (ctx , groupID , handler )
26+ }
5027
51- case <- ctx . Done ():
52- fmt . Println ( "Consumer shutting down..." )
53- return
54- }
55- }
28+ func CassandraConsumer ( ctx context. Context , session * gocql. Session ) {
29+ cassandraSession = session
30+ groupID := "cassandra-group"
31+ handler := & CassandraConsumerHandler { }
32+ runConsumer ( ctx , groupID , handler )
5633}
5734
58- func WebsocketConsumer (ctx context.Context , websocketHub * websockets.Hub ) {
59- hub = websocketHub
60- keepRunning := true
35+ func runConsumer (ctx context.Context , groupID string , handler sarama.ConsumerGroupHandler ) {
6136 config := sarama .NewConfig ()
6237 config .Consumer .Return .Errors = true
6338 config .Consumer .Group .Rebalance .GroupStrategies = []sarama.BalanceStrategy {sarama .NewBalanceStrategyRoundRobin ()}
39+ config .Consumer .Offsets .Initial = sarama .OffsetNewest
6440
65-
66- consumer := Consumer {
67- ready : make (chan bool ),
68- }
69-
70- ctx , cancel := context .WithCancel (context .Background ())
71- client , err := sarama .NewConsumerGroup (brokers , "websocket-group" , config )
41+ client , err := sarama .NewConsumerGroup (brokers , groupID , config )
7242 if err != nil {
73- log .Panicf ("Error creating consumer group client : %v" , err )
43+ log .Panicf ("Error creating Kafka consumer group (%s) : %v" , groupID , err )
7444 }
45+ defer client .Close ()
7546
76- consumptionIsPaused := false
7747 wg := & sync.WaitGroup {}
7848 wg .Add (1 )
49+
7950 go func () {
8051 defer wg .Done ()
81- for {
82- err := client .Consume (ctx , []string {topic }, & consumer )
52+ for {
53+ err := client .Consume (ctx , []string {topic }, handler )
8354 if err != nil {
8455 if errors .Is (err , sarama .ErrClosedConsumerGroup ) {
8556 return
8657 }
87- log .Printf ("Error from consumer: %v" , err )
58+ log .Printf ("Error from Kafka consumer (%s) : %v" , groupID , err )
8859 }
8960
9061 // If context is canceled, stop the consumer
9162 if ctx .Err () != nil {
9263 return
9364 }
94-
95- // Reset readiness so we wait for the next session
96- consumer .ready = make (chan bool )
9765 }
9866 }()
9967
100-
101- <- consumer .ready // Await till the consumer has been set up
102- log .Println ("Sarama consumer up and running!..." )
103-
104- sigusr1 := make (chan os.Signal , 1 )
105- signal .Notify (sigusr1 , syscall .SIGUSR1 )
106-
107- sigterm := make (chan os.Signal , 1 )
108- signal .Notify (sigterm , syscall .SIGINT , syscall .SIGTERM )
109-
110- for keepRunning {
111- select {
112- case <- ctx .Done ():
113- log .Println ("terminating: context cancelled" )
114- keepRunning = false
115- case <- sigterm :
116- log .Println ("terminating: via signal" )
117- keepRunning = false
118- case <- sigusr1 :
119- toggleConsumptionFlow (client , & consumptionIsPaused )
120- }
121- }
122- cancel ()
68+ log .Printf ("%s consumer started..." , groupID )
69+ <- ctx .Done ()
70+ log .Printf ("%s consumer shutting down..." , groupID )
12371 wg .Wait ()
124- if err = client .Close (); err != nil {
125- log .Panicf ("Error closing client: %v" , err )
126- }
12772}
12873
129- func toggleConsumptionFlow (client sarama.ConsumerGroup , isPaused * bool ) {
130- if * isPaused {
131- client .ResumeAll ()
132- log .Println ("Resuming consumption" )
133- } else {
134- client .PauseAll ()
135- log .Println ("Pausing consumption" )
136- }
13774
138- * isPaused = ! * isPaused
139- }
140-
141- // Consumer represents a Sarama consumer group consumer
142- type Consumer struct {
143- ready chan bool
144- }
75+ type WebsocketConsumerHandler struct {hub * websockets.Hub }
76+ type CassandraConsumerHandler struct {}
14577
14678// Setup is run at the beginning of a new session, before ConsumeClaim
147- func (consumer * Consumer ) Setup (sarama.ConsumerGroupSession ) error {
148- // Mark the consumer as ready
149- close (consumer .ready )
79+ func (consumer * WebsocketConsumerHandler ) Setup (sarama.ConsumerGroupSession ) error {
80+ return nil
81+ }
82+ func (consumer * CassandraConsumerHandler ) Setup (sarama.ConsumerGroupSession ) error {
15083 return nil
15184}
15285
15386// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
154- func (consumer * Consumer ) Cleanup (sarama.ConsumerGroupSession ) error {
87+ func (consumer * WebsocketConsumerHandler ) Cleanup (sarama.ConsumerGroupSession ) error {
88+ return nil
89+ }
90+ func (consumer * CassandraConsumerHandler ) Cleanup (sarama.ConsumerGroupSession ) error {
15591 return nil
15692}
15793
15894// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
15995// Once the Messages() channel is closed, the Handler must finish its processing
16096// loop and exit.
161- func (consumer * Consumer ) ConsumeClaim (session sarama.ConsumerGroupSession , claim sarama.ConsumerGroupClaim ) error {
97+ func (consumer * WebsocketConsumerHandler ) ConsumeClaim (session sarama.ConsumerGroupSession , claim sarama.ConsumerGroupClaim ) error {
16298 // NOTE:
16399 // Do not move the code below to a goroutine.
164100 // The `ConsumeClaim` itself is called within a goroutine, see:
@@ -171,7 +107,7 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
171107 return nil
172108 }
173109 partition , offset := message .Partition , message .Offset
174- log .Printf ("Kafka websocket consumer: %s | Partition: %d | Offset: %d\n " , message .Value , partition , offset )
110+ log .Printf ("Kafka consumer: %s | Partition: %d | Offset: %d\n " , message .Value , partition , offset )
175111 var websocketMessage models.WebsocketMessage
176112 err := json .Unmarshal (message .Value , & websocketMessage )
177113 if err != nil {
@@ -189,46 +125,38 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
189125 }
190126}
191127
128+ func (consumer * CassandraConsumerHandler ) ConsumeClaim (session sarama.ConsumerGroupSession , claim sarama.ConsumerGroupClaim ) error {
129+ for {
130+ select {
131+ case message , ok := <- claim .Messages ():
132+ if ! ok {
133+ log .Printf ("message channel was closed" )
134+ return nil
135+ }
136+ partition , offset := message .Partition , message .Offset
137+ var websocketMessage models.WebsocketMessage
138+ err := json .Unmarshal (message .Value , & websocketMessage )
139+ if err != nil {
140+ fmt .Printf ("Failed to unmarshal message: %v\n " , err )
141+ }
142+ log .Printf ("Cassandra consumer: %s | Partition: %d | Offset: %d\n " , message .Value , partition , offset )
143+ insertMessage := models.Message {
144+ CircleID : websocketMessage .Message .CircleID ,
145+ AuthorID : websocketMessage .Message .AuthorID ,
146+ Content : websocketMessage .Message .Content ,
147+ CreatedAt : websocketMessage .Message .CreatedAt ,
148+ }
149+ err = cassandra .InsertMessage (cassandraSession , insertMessage )
192150
193-
194-
195-
196-
197-
198-
199-
200-
201-
202-
203-
204-
205- // //////////////////
206- // group, err := sarama.NewConsumerGroup(brokers, "websocket-group", config)
207- // if err != nil {
208- // log.Fatalf("Failed to start Kafka consumer group: %v", err)
209- // }
210- // defer group.Close()
211-
212- // fmt.Println("Kafka Consumer started...")
213-
214- // for {
215- // select {
216- // case msg := <-partitionConsumer.Messages():
217- // var websocketMessage models.WebsocketMessage
218- // fmt.Printf("Kafka consumer viewed: %s\n", string(msg.Value))
219- // err := json.Unmarshal(msg.Value, &websocketMessage)
220- // if err != nil {
221- // fmt.Printf("Failed to unmarshal message: %v\n", err)
222- // }
223- // websocketMessage.Origin = "server"
224- // hub.SendWebsocketMessage(websocketMessage)
225- // case <-ctx.Done():
226- // fmt.Println("Consumer shutting down...")
227- // return
228- // }
229- // }
230- // }
231-
232- // func CassandraConsumer(ctx context.Context, cassandraSession *gocql.Session) {
233-
234- // }
151+ // Handle error on unprocessed message insert into Cassandra
152+ if err != nil {
153+ fmt .Printf ("Failed to insert message: %v\n " , err )
154+ } else {
155+ log .Printf ("Message inserted into Cassandra: %v\n " , insertMessage )
156+ session .MarkMessage (message , "" )
157+ }
158+ case <- session .Context ().Done ():
159+ return nil
160+ }
161+ }
162+ }
0 commit comments