Skip to content

Commit 40d68fa

Browse files
Introduce broadcast channel retransmission strategies
So far, the broadcast channel used a simple retransmission strategy that triggered a new retransmission on every new tick. Such an aggressive approach turned to be not suitable for all use cases. Specifically, this approach tends to cause network congestion if a big amount of messages is retransmitted for a long time. To make the retransmission mechanism more flexible, here we introduce the way to customize it through strategies. We are also add an implementation of a backoff strategy that decreases its retransmission frequency over time in an exponential way.
1 parent 7ee8811 commit 40d68fa

7 files changed

Lines changed: 223 additions & 14 deletions

File tree

pkg/net/libp2p/channel.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ func (c *channel) Name() string {
8888
return c.name
8989
}
9090

91-
func (c *channel) Send(ctx context.Context, message net.TaggedMarshaler) error {
91+
func (c *channel) Send(
92+
ctx context.Context,
93+
message net.TaggedMarshaler,
94+
strategy ...net.RetransmissionStrategy,
95+
) error {
9296
messageProto, err := c.messageProto(message)
9397
if err != nil {
9498
return err
@@ -100,7 +104,21 @@ func (c *channel) Send(ctx context.Context, message net.TaggedMarshaler) error {
100104
return c.publish(messageProto)
101105
}
102106

103-
retransmission.ScheduleRetransmissions(ctx, logger, c.retransmissionTicker, doSend)
107+
var selectedStrategy net.RetransmissionStrategy
108+
switch len(strategy) {
109+
case 1:
110+
selectedStrategy = strategy[0]
111+
default:
112+
selectedStrategy = net.StandardRetransmissionStrategy
113+
}
114+
115+
retransmission.ScheduleRetransmissions(
116+
ctx,
117+
logger,
118+
c.retransmissionTicker,
119+
doSend,
120+
retransmission.WithStrategy(selectedStrategy),
121+
)
104122

105123
return doSend()
106124
}

pkg/net/local/broadcast_channel.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ func (lc *localChannel) Name() string {
4040
return lc.name
4141
}
4242

43-
func (lc *localChannel) Send(ctx context.Context, message net.TaggedMarshaler) error {
43+
func (lc *localChannel) Send(
44+
ctx context.Context,
45+
message net.TaggedMarshaler,
46+
strategy ...net.RetransmissionStrategy,
47+
) error {
4448
bytes, err := message.Marshal()
4549
if err != nil {
4650
return err
@@ -67,13 +71,22 @@ func (lc *localChannel) Send(ctx context.Context, message net.TaggedMarshaler) e
6771
lc.nextSeqno(),
6872
)
6973

74+
var selectedStrategy net.RetransmissionStrategy
75+
switch len(strategy) {
76+
case 1:
77+
selectedStrategy = strategy[0]
78+
default:
79+
selectedStrategy = net.StandardRetransmissionStrategy
80+
}
81+
7082
retransmission.ScheduleRetransmissions(
7183
ctx,
7284
logger,
7385
lc.retransmissionTicker,
7486
func() error {
7587
return broadcastMessage(lc.name, netMessage)
7688
},
89+
retransmission.WithStrategy(selectedStrategy),
7790
)
7891

7992
return broadcastMessage(lc.name, netMessage)

pkg/net/net.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,23 @@ package net
22

33
import (
44
"context"
5-
65
"github.com/keep-network/keep-core/pkg/internal/pb"
76
"github.com/keep-network/keep-core/pkg/operator"
87
)
98

9+
// RetransmissionStrategy represents a specific retransmission strategy.
10+
type RetransmissionStrategy int
11+
12+
const (
13+
// StandardRetransmissionStrategy is the default retransmission strategy
14+
// that retransmit the message with a constant frequency.
15+
StandardRetransmissionStrategy RetransmissionStrategy = iota
16+
// BackoffRetransmissionStrategy is a retransmission strategy that
17+
// retransmit the message with an exponentially increasing delay between
18+
// subsequent retransmissions.
19+
BackoffRetransmissionStrategy
20+
)
21+
1022
// TransportIdentifier represents a protocol-level identifier. It is an opaque
1123
// type to the network layer.
1224
type TransportIdentifier interface {
@@ -93,10 +105,18 @@ type TaggedUnmarshaler interface {
93105
type BroadcastChannel interface {
94106
// Name returns the name of this broadcast channel.
95107
Name() string
96-
// Send function publishes a message m to the channel. Message m needs to
108+
// Send function publishes a message to the channel. Message needs to
97109
// conform to the marshalling interface. Message will be periodically
98-
// retransmitted by the channel for the lifetime of the provided context.
99-
Send(ctx context.Context, m TaggedMarshaler) error
110+
// retransmitted by the channel for the lifetime of the provided context
111+
// according to the default StandardRetransmissionStrategy. Retransmission
112+
// strategy can be set through the `strategy` vararg. If the vararg is
113+
// given more than one value, the first value is used as the valid
114+
// strategy.
115+
Send(
116+
ctx context.Context,
117+
message TaggedMarshaler,
118+
strategy ...RetransmissionStrategy,
119+
) error
100120
// Recv installs a message handler that will receive messages from the
101121
// channel for the entire lifetime of the provided context.
102122
// When the context is done, handler is automatically unregistered and

pkg/net/retransmission/retransmission.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,25 @@ import (
1414
"github.com/keep-network/keep-core/pkg/net"
1515
)
1616

17-
// ScheduleRetransmissions takes the provided message and retransmits it
18-
// for every new tick received from the provided Ticker for the entire lifetime
19-
// of the Context calling the provided retransmit function. The retransmit
20-
// function has to guarantee that every call from this function sends a message
21-
// with the same sequence number.
17+
// RetransmitFn represents a retransmission routine.
18+
type RetransmitFn func() error
19+
20+
// ScheduleRetransmissions uses the given Strategy to decide whether to call
21+
// the provided RetransmitFn for every new tick received from the provided
22+
// Ticker for the entire lifetime of the Context. The RetransmitFn function has
23+
// to guarantee that every call from this function sends a message with the
24+
// same sequence number.
2225
func ScheduleRetransmissions(
2326
ctx context.Context,
2427
logger log.StandardLogger,
2528
ticker *Ticker,
26-
retransmit func() error,
29+
retransmit RetransmitFn,
30+
strategy Strategy,
2731
) {
2832
go func() {
2933
ticker.onTick(ctx, func() {
3034
go func() {
31-
if err := retransmit(); err != nil {
35+
if err := strategy.Tick(retransmit); err != nil {
3236
logger.Errorf("could not retransmit message: [%v]", err)
3337
}
3438
}()

pkg/net/retransmission/retransmission_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func TestRetransmitExpectedNumberOfTimes(t *testing.T) {
2323
atomic.AddUint64(&retransmissionsCount, 1)
2424
return nil
2525
},
26+
WithStandardStrategy(),
2627
)
2728

2829
<-ctx.Done()

pkg/net/retransmission/strategy.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package retransmission
2+
3+
import "github.com/keep-network/keep-core/pkg/net"
4+
5+
// Strategy represents a specific retransmission strategy.
6+
type Strategy interface {
7+
// Tick asks the strategy to run the provided retransmission routine.
8+
// The strategy uses their internal state and logic to decide whether to
9+
// call the retransmission function or not.
10+
Tick(retransmitFn RetransmitFn) error
11+
}
12+
13+
// WithStrategy is a strategy factory function that returns the requested
14+
// strategy instance.
15+
func WithStrategy(strategy net.RetransmissionStrategy) Strategy {
16+
switch strategy {
17+
case net.StandardRetransmissionStrategy:
18+
return WithStandardStrategy()
19+
case net.BackoffRetransmissionStrategy:
20+
return WithBackoffStrategy()
21+
default:
22+
panic("retransmission strategy not implemented")
23+
}
24+
}
25+
26+
// StandardStrategy is the basic retransmission strategy that triggers the
27+
// retransmission routine on every tick.
28+
type StandardStrategy struct{}
29+
30+
// WithStandardStrategy uses the StandardStrategy as the retransmission
31+
// strategy.
32+
func WithStandardStrategy() *StandardStrategy {
33+
return &StandardStrategy{}
34+
}
35+
36+
// Tick implements the Strategy.Tick function.
37+
func (ss *StandardStrategy) Tick(retransmitFn RetransmitFn) error {
38+
return retransmitFn()
39+
}
40+
41+
// BackoffStrategy is a retransmission strategy that triggers the retransmission
42+
// routine with an exponentially increasing delay. That is, the delay between
43+
// first and second retransmission is 1 tick, between second and third is 2
44+
// ticks, between third and fourth is 4 ticks and so on. Graphically, the
45+
// schedule looks as follows: R _ R _ _ R _ _ _ _ R _ _ _ _ _ _ _ _ R
46+
type BackoffStrategy struct {
47+
tickCounter uint64
48+
delay uint64
49+
retransmitTick uint64
50+
}
51+
52+
// WithBackoffStrategy uses the BackoffStrategy as the retransmission
53+
// strategy.
54+
func WithBackoffStrategy() *BackoffStrategy {
55+
return &BackoffStrategy{
56+
tickCounter: 0,
57+
delay: 1,
58+
retransmitTick: 1,
59+
}
60+
}
61+
62+
// Tick implements the Strategy.Tick function.
63+
func (bos *BackoffStrategy) Tick(retransmitFn RetransmitFn) error {
64+
bos.tickCounter++
65+
66+
if bos.tickCounter == bos.retransmitTick {
67+
bos.retransmitTick += bos.delay + 1
68+
bos.delay *= 2
69+
70+
return retransmitFn()
71+
}
72+
73+
return nil
74+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package retransmission
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
)
7+
8+
func TestStandardStrategy(t *testing.T) {
9+
strategy := WithStandardStrategy()
10+
11+
retransmitInvocations := make(map[int]bool)
12+
13+
for i := 1; i <= 10; i++ {
14+
err := strategy.Tick(func() error {
15+
retransmitInvocations[i] = true
16+
return nil
17+
})
18+
if err != nil {
19+
t.Fatal(err)
20+
}
21+
}
22+
23+
expectedRetransmitInvocations := map[int]bool{
24+
1: true,
25+
2: true,
26+
3: true,
27+
4: true,
28+
5: true,
29+
6: true,
30+
7: true,
31+
8: true,
32+
9: true,
33+
10: true,
34+
}
35+
if !reflect.DeepEqual(expectedRetransmitInvocations, retransmitInvocations) {
36+
t.Errorf(
37+
"unexpected invocations\n"+
38+
"expected: [%v]\n"+
39+
"actual: [%v]",
40+
expectedRetransmitInvocations,
41+
retransmitInvocations,
42+
)
43+
}
44+
}
45+
46+
func TestBackoffStrategy(t *testing.T) {
47+
strategy := WithBackoffStrategy()
48+
49+
retransmitInvocations := make(map[int]bool)
50+
51+
for i := 1; i <= 100; i++ {
52+
err := strategy.Tick(func() error {
53+
retransmitInvocations[i] = true
54+
return nil
55+
})
56+
if err != nil {
57+
t.Fatal(err)
58+
}
59+
}
60+
61+
expectedRetransmitInvocations := map[int]bool{
62+
1: true,
63+
3: true,
64+
6: true,
65+
11: true,
66+
20: true,
67+
37: true,
68+
70: true,
69+
}
70+
if !reflect.DeepEqual(expectedRetransmitInvocations, retransmitInvocations) {
71+
t.Errorf(
72+
"unexpected invocations\n"+
73+
"expected: [%v]\n"+
74+
"actual: [%v]",
75+
expectedRetransmitInvocations,
76+
retransmitInvocations,
77+
)
78+
}
79+
}

0 commit comments

Comments
 (0)