Skip to content

Commit 21f3ab9

Browse files
committed
pool: add firefly onStart marker for MoverProtocol-based transfers
AbstractMoverProtocolTransferService, which handles remote/TPC transfers (e.g., RemoteHttpTransferService for FTS third-party copies), never called transferLifeCycle.onStart(). This meant that firefly flow-start UDP markers were never emitted for TPC movers, even though onEnd markers were correctly sent via DefaultPostTransferService. Only NettyTransferService (direct client connections) called onStart(), so pool-side SciTag flow markers for TPC transfers were incomplete: ESnet Stardust dashboards received end markers with correct experiment and activity IDs from transfer tags, but no corresponding start markers. Add a startTransferLifeCycle() call in MoverTask.run() that derives the local endpoint from the remote address using NetworkUtils.getLocalAddress (the same approach used by DefaultPostTransferService.deriveLocalEndpoint for onEnd) and invokes transferLifeCycle.onStart() before I/O begins. Motivation: ATLAS AGLT2 pool logs showed 194 TPC onEnd markers with classifier=transfer-tag and correct activityIds (11, 15, 16) on a typical day, but zero TPC onStart markers. Direct-read start markers (from NettyTransferService) were present but all carried activityId=1 (FQAN fallback) since worker nodes do not send SciTag headers. Signed-off-by: Shawn McKee <smckee@umich.edu>
1 parent 5fe6ec9 commit 21f3ab9

1 file changed

Lines changed: 54 additions & 1 deletion

File tree

modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* dCache - http://www.dcache.org/
22
*
3-
* Copyright (C) 2015 - 2025 Deutsches Elektronen-Synchrotron
3+
* Copyright (C) 2015 - 2026 Deutsches Elektronen-Synchrotron
44
*
55
* This program is free software: you can redistribute it and/or modify
66
* it under the terms of the GNU Affero General Public License as
@@ -20,6 +20,7 @@
2020
import com.google.common.base.Throwables;
2121
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2222
import diskCacheV111.util.CacheException;
23+
import diskCacheV111.vehicles.IpProtocolInfo;
2324
import diskCacheV111.vehicles.PoolIoFileMessage;
2425
import diskCacheV111.vehicles.ProtocolInfo;
2526
import dmg.cells.nucleus.AbstractCellComponent;
@@ -29,6 +30,9 @@
2930
import java.io.InterruptedIOException;
3031
import java.io.SyncFailedException;
3132
import java.lang.reflect.InvocationTargetException;
33+
import java.net.InetAddress;
34+
import java.net.InetSocketAddress;
35+
import java.net.SocketException;
3236
import java.nio.channels.ClosedChannelException;
3337
import java.nio.channels.CompletionHandler;
3438
import java.nio.file.StandardOpenOption;
@@ -43,6 +47,7 @@
4347
import org.dcache.pool.repository.RepositoryChannel;
4448
import org.dcache.util.CDCExecutorServiceDecorator;
4549
import org.dcache.util.Exceptions;
50+
import org.dcache.util.NetworkUtils;
4651
import org.slf4j.Logger;
4752
import org.slf4j.LoggerFactory;
4853
import org.springframework.beans.factory.annotation.Required;
@@ -143,6 +148,7 @@ public MoverTask(MoverProtocolMover mover,
143148
public void run() {
144149
try {
145150
setThread();
151+
startTransferLifeCycle();
146152
RepositoryChannel fileIoChannel = _mover.openChannel();
147153
try {
148154
if (_mover.getIoMode().contains(StandardOpenOption.WRITE)) {
@@ -183,6 +189,53 @@ public void run() {
183189
}
184190
}
185191

192+
private void startTransferLifeCycle() {
193+
if (_transferLifeCycle == null) {
194+
SCITAGS_LOGGER.debug(
195+
"scitags lifecycle=start skip reason=no-transfer-lifecycle protocol={} pnfsid={} transferTag={}",
196+
protocolName(),
197+
_mover.getFileAttributes().getPnfsId(),
198+
transferTag());
199+
return;
200+
}
201+
202+
if (!(_mover.getProtocolInfo() instanceof IpProtocolInfo ipProtocolInfo)) {
203+
SCITAGS_LOGGER.debug(
204+
"scitags lifecycle=start skip reason=non-ip-protocol protocol={} pnfsid={} transferTag={}",
205+
protocolName(),
206+
_mover.getFileAttributes().getPnfsId(),
207+
transferTag());
208+
return;
209+
}
210+
211+
InetSocketAddress remoteEndpoint = ipProtocolInfo.getSocketAddress();
212+
if (remoteEndpoint == null) {
213+
SCITAGS_LOGGER.debug(
214+
"scitags lifecycle=start skip reason=no-remote-endpoint protocol={} pnfsid={} transferTag={}",
215+
protocolName(),
216+
_mover.getFileAttributes().getPnfsId(),
217+
transferTag());
218+
return;
219+
}
220+
221+
try {
222+
InetAddress localAddress =
223+
NetworkUtils.getLocalAddress(remoteEndpoint.getAddress());
224+
InetSocketAddress localEndpoint =
225+
new InetSocketAddress(localAddress, 0);
226+
_transferLifeCycle.onStart(remoteEndpoint, localEndpoint,
227+
_mover.getProtocolInfo(), _mover.getSubject());
228+
} catch (SocketException e) {
229+
SCITAGS_LOGGER.debug(
230+
"scitags lifecycle=start skip reason=local-endpoint-derivation-failed protocol={} pnfsid={} remote={} transferTag={} message={}",
231+
protocolName(),
232+
_mover.getFileAttributes().getPnfsId(),
233+
remoteEndpoint,
234+
transferTag(),
235+
e.getMessage());
236+
}
237+
}
238+
186239
private void handleChecksumMover() {
187240
MoverProtocol mover = _mover.getMover();
188241

0 commit comments

Comments
 (0)