Skip to content

Commit a35d649

Browse files
KAFKA-19851; Delete dynamic configs that were removed by Kafka (#21053)
When upgrading from Kafka 3.x to 4.0, the metadata log may contain dynamic configurations that were removed in 4.0 (e.g., message.format.version per KIP-724). These removed configs cause InvalidConfigurationException when users attempt to modify any configuration, because validation checks all existing configs including the removed ones. Adds filtering to prevent unsupported or invalid configurations from being applied during metadata replay. The filtering is implemented using a SupportedConfigChecker interface that is injected via dependency injection through Builder patterns. When a ConfigRecord is replayed, the checker validates whether the configuration name is supported for the given resource type. Unsupported configurations are silently ignored during replay, ensuring that only valid configurations enter the in-memory state. The SupportedConfigChecker interface provides a default TRUE implementation that accepts all configurations. The actual filtering logic is implemented by DefaultSupportedConfigChecker, which maintains a whitelist of valid configuration names per resource type (TOPIC, CLIENT_METRICS, GROUP) based on the actual config definitions. The filtering occurs in both ConfigurationDelta#replay and ConfigurationControlManager#replay methods. Added unit tests to ensure: - Removed configurations are filtered during the replay operations - Only supported configurations appear in the resulting metadata images - The filtering works correctly for all resource types (TOPIC, BROKER, CLIENT_METRICS, GROUP) - DefaultSupportedConfigChecker correctly identifies supported vs unsupported configurations for each resource type Reviewers: José Armando García Sancio <jsancio@apache.org>, Jun Rao <junrao@apache.org>, Alyssa Huang <ahuang@confluent.io>, Kevin Wu <kevin.wu2412@gmail.com>, Andrew Grant <agrant@confluent.io>
1 parent b347f4b commit a35d649

37 files changed

Lines changed: 690 additions & 78 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2655,6 +2655,7 @@ project(':shell') {
26552655
implementation project(':core')
26562656
implementation project(':metadata')
26572657
implementation project(':raft')
2658+
implementation project(':server')
26582659

26592660
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
26602661
implementation libs.jacksonJakartarsJsonProvider

checkstyle/import-control.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@
275275
<allow pkg="org.apache.kafka.queue"/>
276276
<allow pkg="org.apache.kafka.raft"/>
277277
<allow pkg="org.apache.kafka.server.common" />
278+
<allow pkg="org.apache.kafka.server.config" />
278279
<allow pkg="org.apache.kafka.server.fault" />
279280
<allow pkg="org.apache.kafka.server.util" />
280281
<allow pkg="org.apache.kafka.shell"/>

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ public Optional<TopicMetadata> topicMetadata(String topicName) {
7676

7777
@Override
7878
public CoordinatorMetadataDelta emptyDelta() {
79-
return new KRaftCoordinatorMetadataDelta(new MetadataDelta(metadataImage));
79+
// Note: supportedConfigChecker is not set because CoordinatorMetadataDelta only exposes topic-related methods.
80+
// No ConfigRecord replay happens through this path, so the checker is never invoked.
81+
return new KRaftCoordinatorMetadataDelta(
82+
new MetadataDelta.Builder()
83+
.setImage(metadataImage)
84+
.build()
85+
);
8086
}
8187

8288
@Override

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2229,7 +2229,11 @@ public void testOnMetadataUpdate() {
22292229
verify(coordinator0).onLoaded(CoordinatorMetadataImage.EMPTY);
22302230

22312231
// Publish a new image.
2232-
CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new MetadataDelta(MetadataImage.EMPTY));
2232+
CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(
2233+
new MetadataDelta.Builder()
2234+
.setImage(MetadataImage.EMPTY)
2235+
.build()
2236+
);
22332237
CoordinatorMetadataImage newImage = CoordinatorMetadataImage.EMPTY;
22342238
runtime.onMetadataUpdate(delta, newImage);
22352239

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ public void testKRaftCoordinatorDelta() {
5757
.addTopic(deletedTopicId, deletedTopicName, 1)
5858
.addTopic(changedTopicId, changedTopicName, 1)
5959
.build();
60-
MetadataDelta delta = new MetadataDelta(image);
60+
MetadataDelta delta = new MetadataDelta.Builder()
61+
.setImage(image)
62+
.build();
6163
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
6264
delta.replay(new TopicRecord().setTopicId(topicId2).setName(topicName2));
6365
delta.replay(new RemoveTopicRecord().setTopicId(deletedTopicId));
@@ -107,14 +109,18 @@ public void testEqualsAndHashcode() {
107109
Uuid topicId3 = Uuid.randomUuid();
108110
String topicName3 = "test-topic3";
109111

110-
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
112+
MetadataDelta delta = new MetadataDelta.Builder()
113+
.setImage(MetadataImage.EMPTY)
114+
.build();
111115
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
112116
delta.replay(new TopicRecord().setTopicId(topicId2).setName(topicName2));
113117

114118
KRaftCoordinatorMetadataDelta coordinatorDelta = new KRaftCoordinatorMetadataDelta(delta);
115119
KRaftCoordinatorMetadataDelta coordinatorDeltaCopy = new KRaftCoordinatorMetadataDelta(delta);
116120

117-
MetadataDelta delta2 = new MetadataDelta(MetadataImage.EMPTY);
121+
MetadataDelta delta2 = new MetadataDelta.Builder()
122+
.setImage(MetadataImage.EMPTY)
123+
.build();
118124
delta.replay(new TopicRecord().setTopicId(topicId3).setName(topicName3));
119125
KRaftCoordinatorMetadataDelta coordinatorDelta2 = new KRaftCoordinatorMetadataDelta(delta2);
120126

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MetadataImageBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public MetadataImageBuilder() {
3434
}
3535

3636
public MetadataImageBuilder(MetadataImage image) {
37-
this.delta = new MetadataDelta(image);
37+
this.delta = new MetadataDelta.Builder()
38+
.setImage(image)
39+
.build();
3840
}
3941

4042
public MetadataImageBuilder addTopic(

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ class ControllerServer(
249249
setCreateTopicPolicy(createTopicPolicy.toJava).
250250
setAlterConfigPolicy(alterConfigPolicy.toJava).
251251
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
252+
setSupportedConfigChecker(sharedServer.supportedConfigChecker).
252253
setStaticConfig(config.originals).
253254
setBootstrapMetadata(bootstrapMetadata).
254255
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).

core/src/main/scala/kafka/server/SharedServer.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import org.apache.kafka.image.loader.MetadataLoader
3030
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
3131
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
3232
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
33-
import org.apache.kafka.metadata.ListenerInfo
34-
import org.apache.kafka.metadata.MetadataRecordSerde
33+
import org.apache.kafka.metadata.{SupportedConfigChecker, ListenerInfo, MetadataRecordSerde}
3534
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
3635
import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics}
3736
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
37+
import org.apache.kafka.server.config.DefaultSupportedConfigChecker
3838
import org.apache.kafka.server.common.ApiMessageAndVersion
3939
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
4040
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics}
@@ -112,6 +112,7 @@ class SharedServer(
112112
private var usedByController: Boolean = false
113113
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
114114
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
115+
val supportedConfigChecker: SupportedConfigChecker = new DefaultSupportedConfigChecker()
115116

116117
// Factory for creating request handler pools with shared aggregate thread counter
117118
val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
@@ -326,7 +327,8 @@ class SharedServer(
326327
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
327328
setFaultHandler(metadataLoaderFaultHandler).
328329
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
329-
setMetrics(metadataLoaderMetrics)
330+
setMetrics(metadataLoaderMetrics).
331+
setSupportedConfigChecker(supportedConfigChecker)
330332
loader = loaderBuilder.build()
331333
snapshotEmitter = new SnapshotEmitter.Builder().
332334
setNodeId(nodeId).

core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ class LocalLeaderEndPointTest extends Logging {
8989
alterPartitionManager = alterPartitionManager
9090
)
9191

92-
val delta = new MetadataDelta(MetadataImage.EMPTY)
92+
val delta = new MetadataDelta.Builder()
93+
.setImage(MetadataImage.EMPTY)
94+
.build()
9395
delta.replay(new FeatureLevelRecord()
9496
.setName(MetadataVersion.FEATURE_NAME)
9597
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -410,7 +412,9 @@ class LocalLeaderEndPointTest extends Logging {
410412
}
411413

412414
private def bumpLeaderEpoch(): Unit = {
413-
val delta = new MetadataDelta(image)
415+
val delta = new MetadataDelta.Builder()
416+
.setImage(image)
417+
.build()
414418
delta.replay(new PartitionChangeRecord()
415419
.setTopicId(topicId)
416420
.setPartitionId(partition)

core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class DefaultApiVersionManagerTest {
3838
private val brokerFeatures = BrokerFeatures.createDefault(true)
3939
private val metadataCache = {
4040
val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
41-
val delta = new MetadataDelta(MetadataImage.EMPTY)
41+
val delta = new MetadataDelta.Builder()
42+
.setImage(MetadataImage.EMPTY)
43+
.build()
4244
delta.replay(new FeatureLevelRecord()
4345
.setName(MetadataVersion.FEATURE_NAME)
4446
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel())

0 commit comments

Comments
 (0)