Skip to content

Commit 4e2e64b

Browse files
authored
IGNITE-28539 Rename PartitionReplicaListener and related classes (#7986)
1 parent 1bc2dfb commit 4e2e64b

30 files changed

Lines changed: 162 additions & 146 deletions

File tree

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import org.apache.ignite.internal.metastorage.MetaStorageManager;
6767
import org.apache.ignite.internal.metastorage.dsl.Operation;
6868
import org.apache.ignite.internal.partition.replicator.fixtures.Node;
69-
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
69+
import org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
7070
import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
7171
import org.apache.ignite.internal.partitiondistribution.Assignment;
7272
import org.apache.ignite.internal.partitiondistribution.Assignments;
@@ -795,7 +795,7 @@ private static boolean raftTableProcessorExists(Node node, int zoneId, int table
795795

796796
var fsm = (JraftServerImpl.DelegatingStateMachine) grp.getRaftNode().getOptions().getFsm();
797797

798-
RaftTableProcessor tableProcessor = ((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(tableId);
798+
TablePartitionRaftProcessor tableProcessor = ((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(tableId);
799799

800800
return tableProcessor != null;
801801
}

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@
111111
import org.apache.ignite.internal.storage.lease.LeaseInfo;
112112
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
113113
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
114+
import org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
114115
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
115-
import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
116116
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
117117
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
118118
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -387,7 +387,7 @@ private void stopRaftGroupNode() throws NodeStoppingException {
387387
raftManager.stopRaftNodes(PARTITION_ID);
388388
}
389389

390-
private RaftTableProcessor createTableProcessor(int tableId) {
390+
private TablePartitionRaftProcessor createTableProcessor(int tableId) {
391391
var storage = new SnapshotAwarePartitionDataStorage(
392392
tableId,
393393
mockStorage(tableId).storage,
@@ -405,7 +405,7 @@ private RaftTableProcessor createTableProcessor(int tableId) {
405405
return clock.update(requestTime);
406406
});
407407

408-
return new TablePartitionProcessor(
408+
return new DefaultTablePartitionRaftProcessor(
409409
txManager,
410410
storage,
411411
storageUpdateHandler,

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@
142142
import org.apache.ignite.internal.network.TopologyService;
143143
import org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
144144
import org.apache.ignite.internal.partition.replicator.index.IndexMetasAccess;
145-
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
145+
import org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
146146
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
147147
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
148148
import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -1948,7 +1948,7 @@ public void loadTableListenerToZoneReplica(
19481948
ZonePartitionId zonePartitionId,
19491949
int tableId,
19501950
TablePartitionReplicaProcessorFactory tablePartitionReplicaProcessorFactory,
1951-
RaftTableProcessor raftTableProcessor,
1951+
TablePartitionRaftProcessor raftTableProcessor,
19521952
PartitionMvStorageAccess partitionMvStorageAccess,
19531953
boolean onNodeRecovery
19541954
) {
@@ -2346,6 +2346,9 @@ public interface TablePartitionReplicaProcessorFactory {
23462346
* @param transactionStateResolver Transaction state resolver.
23472347
* @return Table partition replica processor.
23482348
*/
2349-
ReplicaTableProcessor createProcessor(RaftCommandRunner raftCommandRunner, TransactionStateResolver transactionStateResolver);
2349+
TablePartitionReplicaProcessor createProcessor(
2350+
RaftCommandRunner raftCommandRunner,
2351+
TransactionStateResolver transactionStateResolver
2352+
);
23502353
}
23512354
}

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java renamed to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TablePartitionReplicaProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* Processor of replica requests targeted at a particular table.
2727
*/
28-
public interface ReplicaTableProcessor {
28+
public interface TablePartitionReplicaProcessor {
2929
/**
3030
* Processes replica request.
3131
*

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class ZonePartitionReplicaListener implements ReplicaListener {
7575
private static final IgniteLogger LOG = Loggers.forClass(ZonePartitionReplicaListener.class);
7676

7777
// tableId -> tableProcessor.
78-
private final Map<Integer, ReplicaTableProcessor> replicaProcessors = new ConcurrentHashMap<>();
78+
private final Map<Integer, TablePartitionReplicaProcessor> replicaProcessors = new ConcurrentHashMap<>();
7979

8080
/** Raft client. */
8181
private final RaftCommandRunner raftClient;
@@ -260,7 +260,7 @@ private CompletableFuture<ReplicaResult> processTableAwareRequest(
260260
.thenCompose(ignored -> {
261261
int tableId = ((TableAware) request).tableId();
262262

263-
ReplicaTableProcessor replicaProcessor = replicaProcessors.get(tableId);
263+
TablePartitionReplicaProcessor replicaProcessor = replicaProcessors.get(tableId);
264264

265265
if (replicaProcessor == null) {
266266
// Most of the times this condition should be false. This block handles a case when a request got stuck
@@ -334,7 +334,7 @@ boolean areTableReplicaProcessorsEmpty() {
334334
* @return Table replicas listeners.
335335
*/
336336
@VisibleForTesting
337-
public Map<Integer, ReplicaTableProcessor> tableReplicaProcessors() {
337+
public Map<Integer, TablePartitionReplicaProcessor> tableReplicaProcessors() {
338338
return replicaProcessors;
339339
}
340340

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@
4040
import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
4141
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
4242
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
43-
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
4443
import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
4544
import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
4645
import org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
46+
import org.apache.ignite.internal.partition.replicator.TablePartitionReplicaProcessor;
4747
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
4848
import org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
4949
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
@@ -74,7 +74,7 @@ public class WriteIntentSwitchRequestHandler {
7474

7575
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
7676

77-
private final IntFunction<ReplicaTableProcessor> replicaListenerByTableId;
77+
private final IntFunction<TablePartitionReplicaProcessor> replicaListenerByTableId;
7878

7979
private final ClockService clockService;
8080

@@ -89,7 +89,7 @@ public class WriteIntentSwitchRequestHandler {
8989

9090
/** Constructor. */
9191
public WriteIntentSwitchRequestHandler(
92-
IntFunction<ReplicaTableProcessor> replicaListenerByTableId,
92+
IntFunction<TablePartitionReplicaProcessor> replicaListenerByTableId,
9393
ClockService clockService,
9494
SchemaSyncService schemaSyncService,
9595
CatalogService catalogService,
@@ -174,7 +174,7 @@ private CompletableFuture<ReplicaResult> invokeTableWriteIntentSwitchReplicaRequ
174174
// Using empty primacy because the request is not a PrimaryReplicaRequest.
175175
return tableAwareReplicaRequestPreProcessor.preProcessTableAwareRequest(tableSpecificRequest)
176176
.thenCompose(ignored -> {
177-
ReplicaTableProcessor replicaProcessor = replicaTableProcessor(tableId);
177+
TablePartitionReplicaProcessor replicaProcessor = replicaTableProcessor(tableId);
178178

179179
if (replicaProcessor == null) {
180180
// Most of the times this condition should be false. This block handles a case when a request got stuck
@@ -215,7 +215,7 @@ private WriteIntentSwitchReplicatedInfo writeIntentSwitchReplicationInfoFor(Writ
215215
return new WriteIntentSwitchReplicatedInfo(request.txId(), replicationGroupId);
216216
}
217217

218-
private ReplicaTableProcessor replicaTableProcessor(int tableId) {
218+
private TablePartitionReplicaProcessor replicaTableProcessor(int tableId) {
219219
return replicaListenerByTableId.apply(tableId);
220220
}
221221
}

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ public OnSnapshotSaveHandler(TxStatePartitionStorage txStatePartitionStorage, Ex
4242
/**
4343
* Called when {@link RaftGroupListener#onSnapshotSave} is triggered.
4444
*/
45-
public CompletableFuture<Void> onSnapshotSave(PartitionSnapshotInfo snapshotInfo, Collection<RaftTableProcessor> tableProcessors) {
45+
public CompletableFuture<Void> onSnapshotSave(
46+
PartitionSnapshotInfo snapshotInfo,
47+
Collection<TablePartitionRaftProcessor> tableProcessors
48+
) {
4649
// The max index here is required for local recovery and a possible scenario
4750
// of false node failure when we actually have all required data. This might happen because we use the minimal index
4851
// among storages on a node restart.
@@ -63,7 +66,7 @@ public CompletableFuture<Void> onSnapshotSave(PartitionSnapshotInfo snapshotInfo
6366
txStatePartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
6467

6568
CompletableFuture<?>[] tableStorageFlushFutures = tableProcessors.stream()
66-
.map(RaftTableProcessor::flushStorage)
69+
.map(TablePartitionRaftProcessor::flushStorage)
6770
.toArray(CompletableFuture<?>[]::new);
6871

6972
// Flush the TX state storage last to guarantee that all data is flushed before the snapshot is saved.

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java renamed to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/TablePartitionRaftProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
/**
2828
* Processor of Raft commands targeted at a particular table.
2929
*/
30-
public interface RaftTableProcessor {
30+
public interface TablePartitionRaftProcessor {
3131
/**
3232
* Processes a Raft command.
3333
*

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class ZonePartitionRaftListener implements RaftGroupListener {
8181
*
8282
* <p>Concurrent access is guarded by {@link #tableProcessorsStateLock}.
8383
*/
84-
private final Int2ObjectMap<RaftTableProcessor> tableProcessors = new Int2ObjectOpenHashMap<>();
84+
private final Int2ObjectMap<TablePartitionRaftProcessor> tableProcessors = new Int2ObjectOpenHashMap<>();
8585

8686
private final TxStatePartitionStorage txStateStorage;
8787

@@ -313,7 +313,7 @@ private CrossTableCommandResult processCrossTableProcessorsCommand(
313313

314314
boolean wasApplied = false;
315315

316-
for (RaftTableProcessor processor : tableProcessors.values()) {
316+
for (TablePartitionRaftProcessor processor : tableProcessors.values()) {
317317
CommandResult r = processor.processCommand(command, commandIndex, commandTerm, safeTimestamp);
318318

319319
wasApplied = wasApplied || r.wasApplied();
@@ -339,7 +339,7 @@ private CommandResult processTableAwareCommand(
339339
long commandTerm,
340340
@Nullable HybridTimestamp safeTimestamp
341341
) {
342-
RaftTableProcessor tableProcessor = tableProcessors.get(tableId);
342+
TablePartitionRaftProcessor tableProcessor = tableProcessors.get(tableId);
343343

344344
if (tableProcessor == null) {
345345
// Most of the times this condition should be false. This logging message is added in case a Raft command got stuck somewhere
@@ -444,7 +444,7 @@ public void onShutdown() {
444444
cleanupSnapshots();
445445

446446
synchronized (tableProcessorsStateLock) {
447-
tableProcessors.values().forEach(RaftTableProcessor::onShutdown);
447+
tableProcessors.values().forEach(TablePartitionRaftProcessor::onShutdown);
448448
}
449449
}
450450

@@ -454,7 +454,7 @@ public long getPersistedAppliedIndex() {
454454
long result = max(0, txStateStorage.lastAppliedIndex());
455455

456456
synchronized (tableProcessorsStateLock) {
457-
for (RaftTableProcessor processor : tableProcessors.values()) {
457+
for (TablePartitionRaftProcessor processor : tableProcessors.values()) {
458458
result = max(result, processor.lastAppliedIndex());
459459
}
460460
}
@@ -471,15 +471,15 @@ public long getPersistedAppliedIndex() {
471471
* version. Until the Catalog version is updated, commands targeting the table being added will be rejected by an interceptor that
472472
* requires the Catalog version to be equal to a particular value.
473473
*/
474-
public void addTableProcessor(int tableId, RaftTableProcessor processor) {
474+
public void addTableProcessor(int tableId, TablePartitionRaftProcessor processor) {
475475
synchronized (tableProcessorsStateLock) {
476476
RaftGroupConfiguration configuration = raftGroupConfigurationConverter.fromBytes(txStateStorage.committedGroupConfiguration());
477477

478478
LeaseInfo leaseInfo = txStateStorage.leaseInfo();
479479

480480
processor.initialize(configuration, leaseInfo, lastAppliedIndex, lastAppliedTerm);
481481

482-
RaftTableProcessor prev = tableProcessors.put(tableId, processor);
482+
TablePartitionRaftProcessor prev = tableProcessors.put(tableId, processor);
483483

484484
assert prev == null : "Listener for table " + tableId + " already exists";
485485
}
@@ -488,7 +488,7 @@ public void addTableProcessor(int tableId, RaftTableProcessor processor) {
488488
/**
489489
* Adds a given Table Partition-level Raft processor to the set of managed processors during node recovery on startup.
490490
*/
491-
public void addTableProcessorOnRecovery(int tableId, RaftTableProcessor processor) {
491+
public void addTableProcessorOnRecovery(int tableId, TablePartitionRaftProcessor processor) {
492492
synchronized (tableProcessorsStateLock) {
493493
PartitionSnapshotInfo snapshotInfo = snapshotInfo();
494494

@@ -513,7 +513,7 @@ public void addTableProcessorOnRecovery(int tableId, RaftTableProcessor processo
513513
);
514514
}
515515

516-
RaftTableProcessor prev = tableProcessors.put(tableId, processor);
516+
TablePartitionRaftProcessor prev = tableProcessors.put(tableId, processor);
517517

518518
assert prev == null : "Listener for table " + tableId + " already exists";
519519
}
@@ -531,7 +531,7 @@ public void addTableProcessorOnRecovery(int tableId, RaftTableProcessor processo
531531

532532
/** Returns the table processor associated with the given table ID. */
533533
@TestOnly
534-
public @Nullable RaftTableProcessor tableProcessor(int tableId) {
534+
public @Nullable TablePartitionRaftProcessor tableProcessor(int tableId) {
535535
synchronized (tableProcessorsStateLock) {
536536
return tableProcessors.get(tableId);
537537
}

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
2525
import org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
2626
import org.apache.ignite.internal.partition.replicator.raft.CommandResult;
27-
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
2827
import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
28+
import org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
2929
import org.apache.ignite.internal.tx.TxManager;
3030
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
3131
import org.jetbrains.annotations.Nullable;
@@ -36,15 +36,15 @@
3636
public class WriteIntentSwitchCommandHandler extends AbstractCommandHandler<WriteIntentSwitchCommand> {
3737
private static final IgniteLogger LOG = Loggers.forClass(WriteIntentSwitchCommandHandler.class);
3838

39-
private final IntFunction<RaftTableProcessor> tableProcessorByTableId;
39+
private final IntFunction<TablePartitionRaftProcessor> tableProcessorByTableId;
4040

4141
private final RaftTxFinishMarker txFinishMarker;
4242

4343
private final TxStatePartitionStorage txStatePartitionStorage;
4444

4545
/** Constructor. */
4646
public WriteIntentSwitchCommandHandler(
47-
IntFunction<RaftTableProcessor> tableProcessorByTableId,
47+
IntFunction<TablePartitionRaftProcessor> tableProcessorByTableId,
4848
TxManager txManager,
4949
TxStatePartitionStorage txStatePartitionStorage
5050
) {
@@ -68,7 +68,7 @@ protected CommandResult handleInternally(
6868
boolean applied = false;
6969
boolean handledByAnyTable = false;
7070
for (int tableId : ((WriteIntentSwitchCommandV2) switchCommand).tableIds()) {
71-
RaftTableProcessor tableProcessor = raftTableProcessor(tableId);
71+
TablePartitionRaftProcessor tableProcessor = raftTableProcessor(tableId);
7272

7373
if (tableProcessor == null) {
7474
// This can only happen if the table in question has already been dropped and destroyed. In such case, we simply
@@ -97,7 +97,7 @@ protected CommandResult handleInternally(
9797
return new CommandResult(null, applied);
9898
}
9999

100-
private @Nullable RaftTableProcessor raftTableProcessor(int tableId) {
100+
private @Nullable TablePartitionRaftProcessor raftTableProcessor(int tableId) {
101101
return tableProcessorByTableId.apply(tableId);
102102
}
103103
}

0 commit comments

Comments
 (0)