Skip to content

Commit b338f32

Browse files
PatStilesentropidelicMarcosNicolauuri-99
authored
refactor(aggregator): Aggregator retries (#1304)
Co-authored-by: Mariano Nicolini <mariano.nicolini.91@gmail.com> Co-authored-by: Marcos Nicolau <marcosnicolau@lambdaclass.com> Co-authored-by: Urix <43704209+uri-99@users.noreply.github.com>
1 parent 5151da8 commit b338f32

18 files changed

Lines changed: 1472 additions & 173 deletions

.github/workflows/build-and-test-go.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,3 @@ jobs:
3838
run: go build operator/cmd/main.go
3939
- name: Build aggregator
4040
run: go build aggregator/cmd/main.go
41-
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
name: test-go-retries
2+
3+
on:
4+
push:
5+
branches: [main]
6+
pull_request:
7+
branches: ["*"]
8+
paths:
9+
- 'core/**'
10+
- '.github/workflows/test-go-retries.yml'
11+
12+
jobs:
13+
test:
14+
runs-on: ubuntu-latest
15+
steps:
16+
- name: Clear device space
17+
run: |
18+
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
19+
sudo rm -rf /usr/local/lib/android
20+
sudo rm -rf /opt/ghc
21+
sudo rm -rf /usr/local/.ghcup
22+
sudo rm -rf /usr/share/dotnet
23+
sudo rm -rf /opt/ghc
24+
sudo rm -rf "/usr/local/share/boost"
25+
- uses: actions/checkout@v4
26+
- uses: actions/setup-go@v5
27+
with:
28+
go-version: '1.22'
29+
cache: false
30+
- uses: actions-rs/toolchain@v1
31+
with:
32+
toolchain: stable
33+
- name: foundry-toolchain
34+
uses: foundry-rs/foundry-toolchain@v1.2.0
35+
- name: Test go Retry Functions
36+
run: make test_go_retries

.github/workflows/test-sp1-old.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
- uses: actions-rs/toolchain@v1
2222
with:
2323
toolchain: stable
24-
- name: Test Old SP1 Rust
25-
run: make test_sp1_rust_ffi_old
2624
- name: Test Old SP1 go bindings
2725
run: make test_sp1_go_bindings_linux_old
26+
- name: Test Old SP1 Rust
27+
run: make test_sp1_rust_ffi_old

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ aggregator_send_dummy_responses:
143143
@echo "Sending dummy responses to Aggregator..."
144144
@cd aggregator && go run dummy/submit_task_responses.go
145145

146+
test_go_retries:
147+
@cd core/ && \
148+
go test -v -timeout 15m
146149

147150
__OPERATOR__:
148151

@@ -192,7 +195,7 @@ bindings:
192195
cd contracts && ./generate-go-bindings.sh
193196

194197
test:
195-
go test ./...
198+
go test ./... -timeout 15m
196199

197200

198201
get_delegation_manager_address:

aggregator/cmd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"os"
88

99
"github.com/urfave/cli/v2"
10-
"github.com/yetanotherco/aligned_layer/aggregator/internal/pkg"
10+
"github.com/yetanotherco/aligned_layer/aggregator/pkg"
1111
"github.com/yetanotherco/aligned_layer/core/config"
1212
)
1313

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7+
"strings"
78
"sync"
89
"time"
910

1011
gethtypes "github.com/ethereum/go-ethereum/core/types"
1112

1213
"github.com/prometheus/client_golang/prometheus"
14+
retry "github.com/yetanotherco/aligned_layer/core"
1315
"github.com/yetanotherco/aligned_layer/metrics"
1416

1517
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
@@ -269,19 +271,14 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
269271

270272
agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
271273
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
272-
for i := 0; i < MaxSentTxRetries; i++ {
273-
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
274-
if err == nil {
275-
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
276-
agg.logger.Info("Aggregator successfully responded to task",
277-
"taskIndex", blsAggServiceResp.TaskIndex,
278-
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
279-
280-
return
281-
}
274+
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
275+
if err == nil {
276+
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
277+
agg.logger.Info("Aggregator successfully responded to task",
278+
"taskIndex", blsAggServiceResp.TaskIndex,
279+
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
282280

283-
// Sleep for a bit before retrying
284-
time.Sleep(2 * time.Second)
281+
return
285282
}
286283

287284
agg.logger.Error("Aggregator failed to respond to task, this batch will be lost",
@@ -385,6 +382,30 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
385382
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
386383
}
387384

385+
// |---RETRYABLE---|
386+
387+
/*
388+
- Errors:
389+
Permanent:
390+
- TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27).
391+
Transient:
392+
- All others.
393+
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
394+
*/
395+
func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error {
396+
initilizeNewTask_func := func() error {
397+
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry)
398+
if err != nil {
399+
// Task is already initialized
400+
if strings.Contains(err.Error(), "already initialized") {
401+
err = retry.PermanentError{Inner: err}
402+
}
403+
}
404+
return err
405+
}
406+
return retry.Retry(initilizeNewTask_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
407+
}
408+
388409
// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
389410
// It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge
390411
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,18 @@ package pkg
33
import (
44
"context"
55
"encoding/hex"
6+
"fmt"
67
"net/http"
78
"net/rpc"
9+
"strings"
810
"time"
911

12+
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
13+
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
14+
retry "github.com/yetanotherco/aligned_layer/core"
1015
"github.com/yetanotherco/aligned_layer/core/types"
1116
)
1217

13-
const waitForEventRetries = 50
14-
const waitForEventSleepSeconds = 4 * time.Second
15-
1618
func (agg *Aggregator) ServeOperators() error {
1719
// Registers a new RPC server
1820
err := rpc.Register(agg)
@@ -50,22 +52,10 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
5052
"BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
5153
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
5254
taskIndex := uint32(0)
53-
ok := false
5455

55-
for i := 0; i < waitForEventRetries; i++ {
56-
agg.taskMutex.Lock()
57-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
58-
taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
59-
if !ok {
60-
agg.taskMutex.Unlock()
61-
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
62-
time.Sleep(waitForEventSleepSeconds)
63-
} else {
64-
break
65-
}
66-
}
56+
taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
6757

68-
if !ok {
58+
if err != nil {
6959
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
7060
*reply = 1
7161
return nil
@@ -82,7 +72,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
8272

8373
agg.logger.Info("Starting bls signature process")
8474
go func() {
85-
err := agg.blsAggregationService.ProcessNewSignature(
75+
err := agg.ProcessNewSignature(
8676
context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash,
8777
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
8878
)
@@ -121,3 +111,48 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
121111
*reply = 1
122112
return nil
123113
}
114+
115+
// |---RETRYABLE---|
116+
117+
/*
118+
- Errors:
119+
Permanent:
120+
- SignatureVerificationError: Verification of the sigature within the BLS Aggregation Service failed. (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L42).
121+
Transient:
122+
- All others.
123+
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
124+
- NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown.
125+
*/
126+
func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error {
127+
processNewSignature_func := func() error {
128+
err := agg.blsAggregationService.ProcessNewSignature(
129+
ctx, taskIndex, taskResponse,
130+
blsSignature, operatorId,
131+
)
132+
if err != nil {
133+
if strings.Contains(err.Error(), "Failed to verify signature") {
134+
err = retry.PermanentError{Inner: err}
135+
}
136+
}
137+
return err
138+
}
139+
140+
return retry.Retry(processNewSignature_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
141+
}
142+
143+
func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
144+
getTaskIndex_func := func() (uint32, error) {
145+
agg.taskMutex.Lock()
146+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
147+
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
148+
if !ok {
149+
agg.taskMutex.Unlock()
150+
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
151+
return taskIndex, fmt.Errorf("Task not found in the internal map")
152+
} else {
153+
return taskIndex, nil
154+
}
155+
}
156+
157+
return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
158+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Common variables for all the services
2+
# 'production' only prints info and above. 'development' also prints debug
3+
environment: "production"
4+
aligned_layer_deployment_config_file_path: "../contracts/script/output/devnet/alignedlayer_deployment_output.json"
5+
eigen_layer_deployment_config_file_path: "../contracts/script/output/devnet/eigenlayer_deployment_output.json"
6+
eth_rpc_url: "http://localhost:8545"
7+
eth_rpc_url_fallback: "http://localhost:8545"
8+
eth_ws_url: "ws://localhost:8545"
9+
eth_ws_url_fallback: "ws://localhost:8545"
10+
eigen_metrics_ip_port_address: "localhost:9090"
11+
12+
## ECDSA Configurations
13+
ecdsa:
14+
private_key_store_path: "../config-files/anvil.aggregator.ecdsa.key.json"
15+
private_key_store_password: ""
16+
17+
## BLS Configurations
18+
bls:
19+
private_key_store_path: "../config-files/anvil.aggregator.bls.key.json"
20+
private_key_store_password: ""
21+
22+
## Aggregator Configurations
23+
aggregator:
24+
server_ip_port_address: localhost:8090
25+
bls_public_key_compendium_address: 0x322813Fd9A801c5507c9de605d63CEA4f2CE6c44
26+
avs_service_manager_address: 0xc3e53F4d16Ae77Db1c982e75a937B9f60FE63690
27+
enable_metrics: false
28+
metrics_ip_port_address: localhost:9091
29+
telemetry_ip_port_address: localhost:4001

0 commit comments

Comments
 (0)