@@ -18,11 +18,59 @@ import (
1818
1919var (
2020 DefaultBufferSize = 10
21+ MaxBatchSize = 500000 // size in bytes
22+ MaxBatchCount = 100
23+ BatchSendFreq = 1 * time .Second
2124 DefaultLogTimeout = 1 * time .Second
2225)
2326
27+ // Batch defines a log batch that handles the size in bytes of the logs
28+ type Batch struct {
29+ addr string
30+ logs []server.Log
31+ size uint64
32+ }
33+
34+ func NewBatch (addr string ) Batch {
35+ return Batch {
36+ addr : addr ,
37+ }
38+ }
39+
40+ func (b * Batch ) Add (log server.Log ) {
41+ b .logs = append (b .logs , log )
42+ b .size += uint64 (len (log .LogData ))
43+ }
44+
45+ func (b * Batch ) Size () uint64 {
46+ return b .size
47+ }
48+
49+ func (b * Batch ) Count () int {
50+ return len (b .logs )
51+ }
52+
53+ // PostAndReset makes a post request sending hh.batch and reseting the batch
54+ func (b * Batch ) PostAndReset () error {
55+ logJson , err := json .Marshal (b .logs )
56+ if err != nil {
57+ return fmt .Errorf ("Marshal Err: %v" , err )
58+ }
59+ requestBody := bytes .NewBuffer (logJson )
60+ _ , err = http .Post (b .addr , "application/json" , requestBody )
61+ if err != nil {
62+ return fmt .Errorf ("Http Logger Err: %v" , err )
63+ }
64+
65+ b .logs = nil
66+ b .size = 0
67+ return nil
68+ }
69+
2470type HttpHook struct {
25- Addr string
71+ batch Batch
72+ batchTicker * time.Ticker
73+
2674 logChan chan server.Log
2775 closeChan chan struct {}
2876}
@@ -36,9 +84,10 @@ func NewHttpHook(addr string) (*HttpHook, error) {
3684 url .Path = path .Join (url .Path , "log" )
3785
3886 hh := HttpHook {
39- Addr : url .String (),
40- logChan : make (chan server.Log , DefaultBufferSize ),
41- closeChan : make (chan struct {}),
87+ batch : NewBatch (url .String ()),
88+ batchTicker : time .NewTicker (BatchSendFreq ),
89+ logChan : make (chan server.Log , DefaultBufferSize ),
90+ closeChan : make (chan struct {}),
4291 }
4392
4493 go hh .logHandler ()
@@ -90,16 +139,32 @@ func (hh *HttpHook) logHandler() {
90139 for {
91140 select {
92141 case log := <- hh .logChan :
93- logJson , err := json .Marshal (log )
94- if err != nil {
95- fmt .Fprintf (os .Stderr , "Marshal Err: %v" , err )
142+ hh .batch .Add (log )
143+ if hh .batch .Count () > MaxBatchCount || hh .batch .Size () > uint64 (MaxBatchSize ) {
144+ err := hh .batch .PostAndReset ()
145+ if err != nil {
146+ fmt .Fprintf (os .Stderr , "Send Batch failed: %v" , err )
147+ break
148+ }
149+ // if the batch is sent
150+ // to avoid ticking on an empty batch
151+ hh .batchTicker .Reset (BatchSendFreq )
96152 }
97- requestBody := bytes .NewBuffer (logJson )
98- _ , err = http .Post (hh .Addr , "application/json" , requestBody )
99- if err != nil {
100- fmt .Fprintf (os .Stderr , "Http Logger Err: %v" , err )
153+ case <- hh .batchTicker .C :
154+ if hh .batch .Size () > 0 {
155+ err := hh .batch .PostAndReset ()
156+ if err != nil {
157+ fmt .Fprintf (os .Stderr , "Send Batch failed: %v" , err )
158+ }
101159 }
102160 case <- hh .closeChan :
161+ // if there are logs in the buffered batch, send them
162+ if hh .batch .Count () > 0 {
163+ err := hh .batch .PostAndReset ()
164+ if err != nil {
165+ fmt .Fprintf (os .Stderr , "Send Batch failed: %v" , err )
166+ }
167+ }
103168 fmt .Fprintf (os .Stderr , "Closing http logger" )
104169 return
105170 }
@@ -109,6 +174,7 @@ func (hh *HttpHook) logHandler() {
109174// Close ends the logHandler goroutine
110175func (hh * HttpHook ) Close () {
111176 hh .closeChan <- struct {}{}
177+ hh .batchTicker .Stop ()
112178 // to mark further Close as no-op
113179 hh .closeChan = nil
114180}
0 commit comments