Skip to content

Commit c72ca0d

Browse files
authored
Improve Geyser Timelock backup worker with a getProgramAccounts RPC approach (#176)
1 parent 7049f2c commit c72ca0d

7 files changed

Lines changed: 91 additions & 120 deletions

File tree

ocp/data/blockchain.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type BlockchainData interface {
3535
GetBlockchainTokenAccountsByOwner(ctx context.Context, account string) ([]ed25519.PublicKey, error)
3636
GetBlockchainTransaction(ctx context.Context, sig string, commitment solana.Commitment) (*solana.ConfirmedTransaction, error)
3737
GetBlockchainTransactionTokenBalances(ctx context.Context, sig string) (*solana.TransactionTokenBalances, error)
38-
GetBlockchainFilteredProgramAccounts(ctx context.Context, program string, offset uint, filterValue []byte) ([]string, uint64, error)
38+
GetBlockchainFilteredProgramAccounts(ctx context.Context, program string, offset uint, filterValue []byte) ([]solana.ProgramAccount, uint64, error)
3939
}
4040

4141
type BlockchainProvider struct {
@@ -320,7 +320,7 @@ func (dp *BlockchainProvider) GetBlockchainTransactionTokenBalances(ctx context.
320320
return &res, nil
321321
}
322322

323-
func (dp *BlockchainProvider) GetBlockchainFilteredProgramAccounts(ctx context.Context, program string, offset uint, filterValue []byte) ([]string, uint64, error) {
323+
func (dp *BlockchainProvider) GetBlockchainFilteredProgramAccounts(ctx context.Context, program string, offset uint, filterValue []byte) ([]solana.ProgramAccount, uint64, error) {
324324
tracer := metrics.TraceMethodCall(ctx, blockchainProviderMetricsName, "GetBlockchainFilteredProgramAccounts")
325325
defer tracer.End()
326326

@@ -329,10 +329,10 @@ func (dp *BlockchainProvider) GetBlockchainFilteredProgramAccounts(ctx context.C
329329
return nil, 0, err
330330
}
331331

332-
addresses, slot, err := dp.sc.GetFilteredProgramAccounts(programId, offset, filterValue)
332+
accounts, slot, err := dp.sc.GetFilteredProgramAccounts(programId, offset, filterValue)
333333
if err != nil {
334334
tracer.OnError(err)
335335
return nil, 0, err
336336
}
337-
return addresses, slot, err
337+
return accounts, slot, err
338338
}

ocp/worker/geyser/backup.go

Lines changed: 42 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"sync"
66
"time"
77

8+
"github.com/mr-tron/base58"
89
"go.uber.org/zap"
910

10-
"github.com/code-payments/ocp-server/database/query"
1111
"github.com/code-payments/ocp-server/metrics"
1212
"github.com/code-payments/ocp-server/ocp/common"
1313
"github.com/code-payments/ocp-server/ocp/data/account"
1414
"github.com/code-payments/ocp-server/ocp/data/timelock"
15-
timelock_token "github.com/code-payments/ocp-server/solana/timelock/v1"
15+
"github.com/code-payments/ocp-server/solana/vm"
1616
)
1717

1818
// Backup system workers can be found here. This is necessary because we can't rely
@@ -21,7 +21,7 @@ import (
2121
// the case? Real time updates. Backup workers likely won't be able to guarantee
2222
// real time (or near real time) updates at scale.
2323

24-
func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state timelock_token.TimelockState, interval time.Duration) error {
24+
func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, interval time.Duration) error {
2525
log := p.log.With(zap.String("method", "backupTimelockStateWorker"))
2626
log.Debug("worker started")
2727

@@ -36,76 +36,74 @@ func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state ti
3636
log.Debug("worker stopped")
3737
}()
3838

39-
start := time.Now()
40-
cursor := query.EmptyCursor
4139
delay := 0 * time.Second // Initially no delay, so we can run right after a deploy
4240
for {
4341
select {
4442
case <-time.After(delay):
45-
batchStart := time.Now()
43+
start := time.Now()
4644

4745
func() {
4846
provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider)
4947
trace := provider.StartTrace("geyser_consumer_runtime__backup_timelock_state_worker")
5048
defer trace.End()
5149
tracedCtx := metrics.NewContext(runtimeCtx, trace)
5250

53-
timelockRecords, err := p.data.GetAllTimelocksByState(
51+
// todo: Also filter on unlock at if the result set gets too large
52+
programAccounts, slot, err := p.data.GetBlockchainFilteredProgramAccounts(
5453
tracedCtx,
55-
state,
56-
query.WithDirection(query.Ascending),
57-
query.WithCursor(cursor),
58-
query.WithLimit(p.conf.backupTimelockWorkerBatchSize.Get(runtimeCtx)),
54+
base58.Encode(vm.PROGRAM_ID),
55+
0,
56+
vm.UnlockStateAccountDiscriminator,
5957
)
60-
if err == timelock.ErrTimelockNotFound {
61-
p.metricStatusLock.Lock()
62-
duration := time.Since(start)
63-
if p.backupTimelockStateWorkerDuration == nil || *p.backupTimelockStateWorkerDuration < duration {
64-
p.backupTimelockStateWorkerDuration = &duration
65-
}
66-
p.metricStatusLock.Unlock()
67-
68-
start = time.Now()
69-
cursor = query.EmptyCursor
70-
return
71-
} else if err != nil {
72-
log.With(zap.Error(err)).Warn("failed to get timelock records")
58+
if err != nil {
59+
log.With(zap.Error(err)).Warn("failed to get unlock state program accounts")
7360
return
7461
}
7562

7663
reprocessDelay := p.conf.backupTimelockWorkerReprocessDelay.Get(runtimeCtx)
7764

78-
var wg sync.WaitGroup
79-
for _, timelockRecord := range timelockRecords {
80-
if lastProcessed, ok := p.backupTimelockProcessedCache.Load(timelockRecord.Address); ok {
65+
for _, programAccount := range programAccounts {
66+
var unlockState vm.UnlockStateAccount
67+
if err := unlockState.Unmarshal(programAccount.Data); err != nil {
68+
log.With(zap.Error(err)).Warn("failed to unmarshal unlock state account")
69+
continue
70+
}
71+
72+
stateAddress := base58.Encode(unlockState.Address)
73+
log := log.With(zap.String("timelock", stateAddress))
74+
75+
if lastProcessed, ok := p.backupTimelockProcessedCache.Load(stateAddress); ok {
8176
if time.Since(lastProcessed.(time.Time)) < reprocessDelay {
8277
continue
8378
}
8479
}
8580

86-
wg.Add(1)
87-
88-
go func(timelockRecord *timelock.Record) {
89-
defer wg.Done()
90-
91-
log := log.With(zap.String("timelock", timelockRecord.Address))
81+
timelockRecord, err := p.data.GetTimelockByAddress(tracedCtx, stateAddress)
82+
if err == timelock.ErrTimelockNotFound {
83+
p.backupTimelockProcessedCache.Store(stateAddress, time.Now())
84+
continue
85+
} else if err != nil {
86+
log.With(zap.Error(err)).Warn("failed to get timelock record")
87+
continue
88+
}
9289

93-
err := updateTimelockAccountRecord(tracedCtx, p.data, timelockRecord)
94-
if err != nil {
95-
log.With(zap.Error(err)).Warn("failed to update timelock account")
96-
return
97-
}
90+
if err := updateTimelockAccountRecord(tracedCtx, p.data, timelockRecord, &unlockState, slot); err != nil && err != timelock.ErrStaleTimelockState {
91+
log.With(zap.Error(err)).Warn("failed to update timelock account")
92+
continue
93+
}
9894

99-
p.backupTimelockProcessedCache.Store(timelockRecord.Address, time.Now())
100-
}(timelockRecord)
95+
p.backupTimelockProcessedCache.Store(stateAddress, time.Now())
10196
}
10297

103-
wg.Wait()
104-
105-
cursor = query.ToCursor(timelockRecords[len(timelockRecords)-1].Id)
98+
p.metricStatusLock.Lock()
99+
duration := time.Since(start)
100+
if p.backupTimelockStateWorkerDuration == nil || *p.backupTimelockStateWorkerDuration < duration {
101+
p.backupTimelockStateWorkerDuration = &duration
102+
}
103+
p.metricStatusLock.Unlock()
106104
}()
107105

108-
delay = interval - time.Since(batchStart)
106+
delay = interval - time.Since(start)
109107
case <-runtimeCtx.Done():
110108
return runtimeCtx.Err()
111109
}

ocp/worker/geyser/runtime.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/code-payments/ocp-server/ocp/integration"
1111
"github.com/code-payments/ocp-server/ocp/worker"
1212
geyserpb "github.com/code-payments/ocp-server/ocp/worker/geyser/api/gen"
13-
timelock_token "github.com/code-payments/ocp-server/solana/timelock/v1"
1413

1514
ocp_data "github.com/code-payments/ocp-server/ocp/data"
1615
)
@@ -62,13 +61,7 @@ func New(log *zap.Logger, data ocp_data.Provider, integration integration.Geyser
6261
func (p *runtime) Start(ctx context.Context, _ time.Duration) error {
6362
// Start backup workers to catch missed events
6463
go func() {
65-
err := p.backupTimelockStateWorker(ctx, timelock_token.StateLocked, p.conf.backupTimelockWorkerInterval.Get(ctx))
66-
if err != nil && err != context.Canceled {
67-
p.log.With(zap.Error(err)).Warn("timelock backup worker terminated unexpectedly")
68-
}
69-
}()
70-
go func() {
71-
err := p.backupTimelockStateWorker(ctx, timelock_token.StateUnknown, p.conf.backupTimelockWorkerInterval.Get(ctx))
64+
err := p.backupTimelockStateWorker(ctx, p.conf.backupTimelockWorkerInterval.Get(ctx))
7265
if err != nil && err != context.Canceled {
7366
p.log.With(zap.Error(err)).Warn("timelock backup worker terminated unexpectedly")
7467
}

ocp/worker/geyser/timelock.go

Lines changed: 11 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,74 +4,30 @@ import (
44
"context"
55
"time"
66

7-
"github.com/code-payments/ocp-server/ocp/common"
87
ocp_data "github.com/code-payments/ocp-server/ocp/data"
98
"github.com/code-payments/ocp-server/ocp/data/timelock"
10-
"github.com/code-payments/ocp-server/solana"
9+
"github.com/code-payments/ocp-server/pointer"
1110
timelock_token "github.com/code-payments/ocp-server/solana/timelock/v1"
1211
"github.com/code-payments/ocp-server/solana/vm"
1312
)
1413

15-
func updateTimelockAccountRecord(ctx context.Context, data ocp_data.Provider, timelockRecord *timelock.Record) error {
16-
unlockState, slot, err := getTimelockUnlockState(ctx, data, timelockRecord)
17-
if err != nil {
18-
return err
14+
func updateTimelockAccountRecord(ctx context.Context, data ocp_data.Provider, timelockRecord *timelock.Record, unlockState *vm.UnlockStateAccount, slot uint64) error {
15+
if unlockState == nil {
16+
return nil
1917
}
2018

21-
if unlockState != nil {
22-
timelockRecord.VaultState = timelock_token.StateWaitingForTimeout
23-
if unlockState.IsUnlocked() {
24-
timelockRecord.VaultState = timelock_token.StateUnlocked
25-
}
19+
expectedState := timelock_token.StateWaitingForTimeout
20+
if unlockState.IsUnlocked() {
21+
expectedState = timelock_token.StateUnlocked
22+
}
2623

27-
unlockAt := uint64(unlockState.UnlockAt)
28-
timelockRecord.UnlockAt = &unlockAt
29-
} else {
24+
if timelockRecord.VaultState == expectedState {
3025
return nil
3126
}
3227

28+
timelockRecord.VaultState = expectedState
29+
timelockRecord.UnlockAt = pointer.Uint64(uint64(unlockState.UnlockAt))
3330
timelockRecord.Block = slot
3431
timelockRecord.LastUpdatedAt = time.Now()
3532
return data.SaveTimelock(ctx, timelockRecord)
3633
}
37-
38-
func getTimelockUnlockState(ctx context.Context, data ocp_data.Provider, timelockRecord *timelock.Record) (*vm.UnlockStateAccount, uint64, error) {
39-
accountInfoRecord, err := data.GetAccountInfoByTokenAddress(ctx, timelockRecord.VaultAddress)
40-
if err != nil {
41-
return nil, 0, err
42-
}
43-
44-
vaultOwnerAccount, err := common.NewAccountFromPublicKeyString(timelockRecord.VaultOwner)
45-
if err != nil {
46-
return nil, 0, err
47-
}
48-
49-
mintAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.MintAccount)
50-
if err != nil {
51-
return nil, 0, err
52-
}
53-
54-
vmConfig, err := common.GetVmConfigForMint(ctx, data, mintAccount)
55-
if err != nil {
56-
return nil, 0, err
57-
}
58-
59-
timelockAccounts, err := vaultOwnerAccount.GetTimelockAccounts(vmConfig)
60-
if err != nil {
61-
return nil, 0, err
62-
}
63-
64-
ai, slot, err := data.GetBlockchainAccountInfo(ctx, timelockAccounts.Unlock.PublicKey().ToBase58(), solana.CommitmentFinalized)
65-
switch err {
66-
case nil:
67-
var unlockState vm.UnlockStateAccount
68-
if err = unlockState.Unmarshal(ai.Data); err != nil {
69-
return nil, 0, err
70-
}
71-
return &unlockState, slot, nil
72-
case solana.ErrNoAccountInfo:
73-
return nil, slot, nil
74-
default:
75-
return nil, 0, err
76-
}
77-
}

solana/client.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ type TransactionSignature struct {
164164
Memo *string
165165
}
166166

167+
type ProgramAccount struct {
168+
PublicKey ed25519.PublicKey
169+
Data []byte
170+
}
171+
167172
// Client provides an interaction with the Solana JSON RPC API.
168173
//
169174
// Reference: https://docs.solana.com/apps/jsonrpc-api
@@ -177,7 +182,7 @@ type Client interface {
177182
GetConfirmationStatus(Signature, Commitment) (bool, error)
178183
GetConfirmedBlock(slot uint64) (*Block, error)
179184
GetConfirmedBlocksWithLimit(start, limit uint64) ([]uint64, error)
180-
GetFilteredProgramAccounts(program ed25519.PublicKey, offset uint, filterValue []byte) ([]string, uint64, error)
185+
GetFilteredProgramAccounts(program ed25519.PublicKey, offset uint, filterValue []byte) ([]ProgramAccount, uint64, error)
181186
GetLatestBlockhash() (Blockhash, error)
182187
GetMinimumBalanceForRentExemption(size uint64) (lamports uint64, err error)
183188
GetSignatureStatus(Signature, Commitment) (*SignatureStatus, error)
@@ -1118,7 +1123,7 @@ func (c *client) GetTokenAccountsByOwner(owner, mint ed25519.PublicKey) ([]ed255
11181123
return keys, nil
11191124
}
11201125

1121-
func (c *client) GetFilteredProgramAccounts(program ed25519.PublicKey, offset uint, filterValue []byte) ([]string, uint64, error) {
1126+
func (c *client) GetFilteredProgramAccounts(program ed25519.PublicKey, offset uint, filterValue []byte) ([]ProgramAccount, uint64, error) {
11221127
type memcmpFilter struct {
11231128
Offset uint `json:"offset"`
11241129
Bytes string `json:"bytes"`
@@ -1152,16 +1157,35 @@ func (c *client) GetFilteredProgramAccounts(program ed25519.PublicKey, offset ui
11521157
Slot int64 `json:"slot"`
11531158
} `json:"context"`
11541159
Value []struct {
1155-
PubKey string `json:"pubkey"`
1160+
PubKey string `json:"pubkey"`
1161+
Account struct {
1162+
Data []string `json:"data"`
1163+
} `json:"account"`
11561164
} `json:"value"`
11571165
}
11581166
if err := c.call(&resp, "getProgramAccounts", base58.Encode(program), config); err != nil {
11591167
return nil, 0, err
11601168
}
11611169

1162-
var res []string
1170+
res := make([]ProgramAccount, 0, len(resp.Value))
11631171
for _, result := range resp.Value {
1164-
res = append(res, result.PubKey)
1172+
pubkey, err := base58.Decode(result.PubKey)
1173+
if err != nil {
1174+
return nil, 0, errors.Wrap(err, "invalid base58 encoded pubkey")
1175+
}
1176+
1177+
var data []byte
1178+
if len(result.Account.Data) > 0 {
1179+
data, err = base64.StdEncoding.DecodeString(result.Account.Data[0])
1180+
if err != nil {
1181+
return nil, 0, errors.Wrap(err, "invalid base64 encoded account data")
1182+
}
1183+
}
1184+
1185+
res = append(res, ProgramAccount{
1186+
PublicKey: pubkey,
1187+
Data: data,
1188+
})
11651189
}
11661190
return res, uint64(resp.Context.Slot), nil
11671191
}

solana/client_with_fallback.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ func (c *clientWithFallback) GetConfirmedBlocksWithLimit(start, limit uint64) ([
140140
)
141141
}
142142

143-
func (c *clientWithFallback) GetFilteredProgramAccounts(program ed25519.PublicKey, offset uint, filterValue []byte) ([]string, uint64, error) {
143+
func (c *clientWithFallback) GetFilteredProgramAccounts(program ed25519.PublicKey, offset uint, filterValue []byte) ([]ProgramAccount, uint64, error) {
144144
return withFallback2(
145-
func() ([]string, uint64, error) {
145+
func() ([]ProgramAccount, uint64, error) {
146146
return c.primary.GetFilteredProgramAccounts(program, offset, filterValue)
147147
},
148-
func() ([]string, uint64, error) {
148+
func() ([]ProgramAccount, uint64, error) {
149149
return c.fallback.GetFilteredProgramAccounts(program, offset, filterValue)
150150
},
151151
)

solana/client_with_fallback_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (m *mockClient) GetConfirmedBlocksWithLimit(uint64, uint64) ([]uint64, erro
9595
m.callCount++
9696
return nil, nil
9797
}
98-
func (m *mockClient) GetFilteredProgramAccounts(ed25519.PublicKey, uint, []byte) ([]string, uint64, error) {
98+
func (m *mockClient) GetFilteredProgramAccounts(ed25519.PublicKey, uint, []byte) ([]ProgramAccount, uint64, error) {
9999
m.callCount++
100100
return nil, 0, nil
101101
}

0 commit comments

Comments
 (0)