Skip to content

Commit dcab5b5

Browse files
authored
Cache dataTasks() result in NodeFileScanTask to avoid repeated list creation (#4146)
* perf: cache dataTasks() result in NodeFileScanTask to avoid repeated list creation dataTasks() was creating a new List via Stream.concat().collect(toList()) on every call. Since the result is immutable after construction (or after addFile() calls complete), cache it lazily and invalidate the cache in addFile() when baseTasks or insertTasks are mutated. Signed-off-by: Jiwon Park <jpark92@outlook.kr> * Add tests for dataTasks() lazy caching in NodeFileScanTask Signed-off-by: Jiwon Park <jpark92@outlook.kr> * Fix spotless format violations in TestNodeFileScanTask Signed-off-by: Jiwon Park <jpark92@outlook.kr> --------- Signed-off-by: Jiwon Park <jpark92@outlook.kr>
1 parent b7f7de3 commit dcab5b5

2 files changed

Lines changed: 160 additions & 2 deletions

File tree

amoro-format-iceberg/src/main/java/org/apache/amoro/scan/NodeFileScanTask.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.util.ArrayList;
30+
import java.util.Collections;
3031
import java.util.List;
3132
import java.util.stream.Collectors;
3233
import java.util.stream.Stream;
@@ -38,6 +39,7 @@ public class NodeFileScanTask implements KeyedTableScanTask {
3839
private List<MixedFileScanTask> baseTasks = new ArrayList<>();
3940
private List<MixedFileScanTask> insertTasks = new ArrayList<>();
4041
private List<MixedFileScanTask> deleteFiles = new ArrayList<>();
42+
private List<MixedFileScanTask> cachedDataTasks = null;
4143
private long cost = 0;
4244
private final long openFileCost = Long.valueOf(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
4345
private DataTreeNode treeNode;
@@ -100,11 +102,16 @@ public List<MixedFileScanTask> mixedEquityDeletes() {
100102

101103
@Override
102104
public List<MixedFileScanTask> dataTasks() {
103-
return Stream.concat(baseTasks.stream(), insertTasks.stream()).collect(Collectors.toList());
105+
if (cachedDataTasks == null) {
106+
cachedDataTasks =
107+
Collections.unmodifiableList(
108+
Stream.concat(baseTasks.stream(), insertTasks.stream()).collect(Collectors.toList()));
109+
}
110+
return cachedDataTasks;
104111
}
105112

106113
public void addFile(MixedFileScanTask task) {
107-
114+
cachedDataTasks = null;
108115
DataFileType fileType = task.fileType();
109116
if (fileType == null) {
110117
LOG.warn("file type is null");
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.scan;
20+
21+
import org.apache.amoro.IcebergFileEntry;
22+
import org.apache.amoro.data.DefaultKeyedFile;
23+
import org.apache.amoro.utils.ManifestEntryFields;
24+
import org.apache.iceberg.DataFile;
25+
import org.apache.iceberg.DataFiles;
26+
import org.apache.iceberg.PartitionSpec;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
import java.util.Arrays;
31+
import java.util.List;
32+
33+
/** Unit tests for lazy caching behavior of {@link NodeFileScanTask#dataTasks()}. */
34+
public class TestNodeFileScanTask {
35+
36+
// File name pattern: {nodeId}-{type}-{transactionId}-{partitionId}-{taskId}-{operationId}-{count}
37+
// BASE_FILE: nodeId-B-transactionId-...
38+
// INSERT_FILE: nodeId-I-transactionId-...
39+
// EQ_DELETE_FILE: nodeId-ED-transactionId-...
40+
private static MixedFileScanTask createBaseTask(String path) {
41+
DataFile dataFile =
42+
DataFiles.builder(PartitionSpec.unpartitioned())
43+
.withPath(path)
44+
.withFileSizeInBytes(100L)
45+
.withRecordCount(10L)
46+
.build();
47+
DefaultKeyedFile keyedFile = DefaultKeyedFile.parseBase(dataFile);
48+
return new BasicMixedFileScanTask(keyedFile, null, PartitionSpec.unpartitioned(), null);
49+
}
50+
51+
private static MixedFileScanTask createInsertTask(String path) {
52+
IcebergFileEntry entry =
53+
new IcebergFileEntry(
54+
1L,
55+
2L,
56+
ManifestEntryFields.Status.ADDED,
57+
DataFiles.builder(PartitionSpec.unpartitioned())
58+
.withPath(path)
59+
.withFileSizeInBytes(100L)
60+
.withRecordCount(10L)
61+
.build());
62+
DefaultKeyedFile keyedFile = DefaultKeyedFile.parseChange((DataFile) entry.getFile());
63+
return new BasicMixedFileScanTask(keyedFile, null, PartitionSpec.unpartitioned(), null);
64+
}
65+
66+
@Test
67+
public void testDataTasksReturnsSameInstanceOnConsecutiveCalls() {
68+
MixedFileScanTask baseTask =
69+
createBaseTask("/tmp/1-B-2-00000-0-9009257362994691056-00001.parquet");
70+
MixedFileScanTask insertTask =
71+
createInsertTask("/tmp/1-I-2-00000-0-9009257362994691056-00002.parquet");
72+
73+
NodeFileScanTask nodeTask = new NodeFileScanTask(Arrays.asList(baseTask, insertTask));
74+
75+
List<MixedFileScanTask> first = nodeTask.dataTasks();
76+
List<MixedFileScanTask> second = nodeTask.dataTasks();
77+
78+
Assert.assertSame("dataTasks() should return the same cached instance", first, second);
79+
}
80+
81+
@Test
82+
public void testDataTasksCacheIsInvalidatedAfterAddFile() {
83+
MixedFileScanTask baseTask =
84+
createBaseTask("/tmp/1-B-2-00000-0-9009257362994691056-00001.parquet");
85+
86+
NodeFileScanTask nodeTask = new NodeFileScanTask();
87+
nodeTask.addFile(baseTask);
88+
89+
List<MixedFileScanTask> before = nodeTask.dataTasks();
90+
Assert.assertEquals("Before addFile: should have 1 data task", 1, before.size());
91+
92+
MixedFileScanTask insertTask =
93+
createInsertTask("/tmp/1-I-3-00000-0-9009257362994691056-00002.parquet");
94+
nodeTask.addFile(insertTask);
95+
96+
List<MixedFileScanTask> after = nodeTask.dataTasks();
97+
Assert.assertEquals("After addFile: should have 2 data tasks", 2, after.size());
98+
Assert.assertNotSame(
99+
"dataTasks() should return a new instance after addFile invalidates cache", before, after);
100+
}
101+
102+
@Test
103+
public void testDataTasksIsUnmodifiable() {
104+
MixedFileScanTask baseTask =
105+
createBaseTask("/tmp/1-B-2-00000-0-9009257362994691056-00001.parquet");
106+
NodeFileScanTask nodeTask = new NodeFileScanTask(Arrays.asList(baseTask));
107+
108+
List<MixedFileScanTask> tasks = nodeTask.dataTasks();
109+
try {
110+
tasks.add(baseTask);
111+
Assert.fail("dataTasks() should return an unmodifiable list");
112+
} catch (UnsupportedOperationException e) {
113+
// expected
114+
}
115+
}
116+
117+
@Test
118+
public void testDataTasksContainsBothBaseAndInsertTasks() {
119+
MixedFileScanTask baseTask =
120+
createBaseTask("/tmp/1-B-2-00000-0-9009257362994691056-00001.parquet");
121+
MixedFileScanTask insertTask =
122+
createInsertTask("/tmp/1-I-3-00000-0-9009257362994691056-00002.parquet");
123+
124+
NodeFileScanTask nodeTask = new NodeFileScanTask();
125+
nodeTask.addFile(baseTask);
126+
nodeTask.addFile(insertTask);
127+
128+
List<MixedFileScanTask> tasks = nodeTask.dataTasks();
129+
Assert.assertEquals(
130+
"dataTasks() should contain both baseTasks and insertTasks", 2, tasks.size());
131+
Assert.assertTrue("dataTasks() should include the base task", tasks.contains(baseTask));
132+
Assert.assertTrue("dataTasks() should include the insert task", tasks.contains(insertTask));
133+
}
134+
135+
@Test
136+
public void testDataTasksDoesNotContainEqDeleteFiles() {
137+
MixedFileScanTask baseTask =
138+
createBaseTask("/tmp/1-B-2-00000-0-9009257362994691056-00001.parquet");
139+
// EQ_DELETE_FILE: path with "ED" type identifier
140+
MixedFileScanTask deleteTask =
141+
createInsertTask("/tmp/1-ED-3-00000-0-9009257362994691056-00003.parquet");
142+
143+
NodeFileScanTask nodeTask = new NodeFileScanTask();
144+
nodeTask.addFile(baseTask);
145+
nodeTask.addFile(deleteTask);
146+
147+
List<MixedFileScanTask> tasks = nodeTask.dataTasks();
148+
Assert.assertEquals("dataTasks() should not include EQ_DELETE_FILE tasks", 1, tasks.size());
149+
Assert.assertTrue("dataTasks() should include only the base task", tasks.contains(baseTask));
150+
}
151+
}

0 commit comments

Comments
 (0)