diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 1a6521f57e..d6e861fb0f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -377,9 +377,12 @@ private void startInBatchMode() { p.getPartitionId(), p.getPartitionName())) .collect(Collectors.toList()); - splits = this.initPartitionedSplits(partitions); + // Use log-only splits to avoid generating mixed split + // types (HybridSnapshotLogSplit + LogSplit) for + // primary-key tables, which is not supported. + splits = this.initLogTablePartitionSplits(partitions); } else { - splits = this.initNonPartitionedSplits(); + splits = this.getLogSplit(null, null); } } return splits;