-
Notifications
You must be signed in to change notification settings - Fork 260
Expand file tree
/
Copy pathraft_retriever.go
More file actions
149 lines (129 loc) · 3.97 KB
/
raft_retriever.go
File metadata and controls
149 lines (129 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package syncing
import (
"context"
"errors"
"fmt"
"sync"
"github.com/rs/zerolog"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/raft"
"github.com/evstack/ev-node/types"
)
// raftStatePreProcessor is called before processing a raft block state
type raftStatePreProcessor func(ctx context.Context, state *raft.RaftBlockState) error
// raftRetriever retrieves raft blocks and feeds them into the eventProcessor
type raftRetriever struct {
raftNode common.RaftNode
wg sync.WaitGroup
logger zerolog.Logger
genesis genesis.Genesis
eventSink common.EventSink
raftBlockPreProcessor raftStatePreProcessor
mtx sync.Mutex
cancel context.CancelFunc
}
// newRaftRetriever constructor
func newRaftRetriever(
raftNode common.RaftNode,
genesis genesis.Genesis,
logger zerolog.Logger,
eventSink common.EventSink,
raftBlockPreProcessor raftStatePreProcessor,
) *raftRetriever {
return &raftRetriever{
raftNode: raftNode,
genesis: genesis,
logger: logger,
eventSink: eventSink,
raftBlockPreProcessor: raftBlockPreProcessor,
}
}
// Start begins the syncing component
func (r *raftRetriever) Start(ctx context.Context) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.cancel != nil {
return errors.New("syncer already started")
}
ctx, r.cancel = context.WithCancel(ctx)
applyCh := make(chan raft.RaftApplyMsg, 100)
r.raftNode.SetApplyCallback(applyCh)
r.wg.Go(func() {
r.raftApplyLoop(ctx, applyCh)
})
return nil
}
// Stop gracefully shuts down the raft retriever
func (r *raftRetriever) Stop() {
r.mtx.Lock()
if r.cancel != nil {
r.cancel()
r.cancel = nil
}
r.mtx.Unlock()
r.wg.Wait()
r.raftNode.SetApplyCallback(nil)
}
// raftApplyLoop processes blocks received from raft
func (r *raftRetriever) raftApplyLoop(ctx context.Context, applyCh <-chan raft.RaftApplyMsg) {
r.logger.Info().Msg("starting raft apply loop")
defer r.logger.Info().Msg("raft apply loop stopped")
for {
select {
case <-ctx.Done():
return
case msg := <-applyCh:
if err := r.consumeRaftBlock(ctx, msg.State); err != nil {
r.logger.Error().Err(err).Uint64("height", msg.State.Height).Msg("failed to apply raft block")
}
}
}
}
// consumeRaftBlock applies a block received from raft consensus
func (r *raftRetriever) consumeRaftBlock(ctx context.Context, state *raft.RaftBlockState) error {
r.logger.Debug().
Uint64("height", state.Height).
Hex("raft_hash", state.Hash).
Uint64("timestamp", state.Timestamp).
Msg("consuming raft block")
if err := r.raftBlockPreProcessor(ctx, state); err != nil {
return err
}
// Unmarshal header and data
var header types.SignedHeader
if err := header.UnmarshalBinary(state.Header); err != nil {
return fmt.Errorf("unmarshal header: %w", err)
}
// Log the unmarshalled header hash for comparison with raft state hash
r.logger.Debug().
Uint64("height", state.Height).
Hex("raft_hash", state.Hash).
Hex("header_hash", header.Hash()).
Str("chain_id", header.ChainID()).
Hex("data_hash", header.DataHash).
Hex("app_hash", header.AppHash).
Msg("raft block hash comparison")
if err := header.Header.ValidateBasic(); err != nil {
r.logger.Debug().Err(err).Msg("invalid header structure")
return nil
}
if err := assertExpectedProposer(r.genesis, header.ProposerAddress); err != nil {
r.logger.Debug().Err(err).Msg("unexpected proposer")
return nil
}
var data types.Data
if err := data.UnmarshalBinary(state.Data); err != nil {
return fmt.Errorf("unmarshal data: %w", err)
}
event := common.DAHeightEvent{
Header: &header,
Data: &data,
DaHeight: 0, // raft events don't have DA height context, yet as DA submission is asynchronous
}
return r.eventSink.PipeEvent(ctx, event)
}
// Height returns the current height of the raft node's state.
func (r *raftRetriever) Height() uint64 {
return r.raftNode.GetState().Height
}