Skip to content

Commit 376b210

Browse files
Merge pull request #8039 from ShawnMcKee/fix/firefly-sci-tag-boundary-fqan-fallback
Fix TransferLifeCycle SciTag boundary and FQAN fallback mapping
1 parent 019bc90 commit 376b210

4 files changed

Lines changed: 242 additions & 20 deletions

File tree

modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,13 @@ private class HttpTransfer extends RedirectedTransfer<String> {
17001700
private final String _requestPath;
17011701
private String _transferTag = "";
17021702

1703+
private static final String HEADER_SCITAG = "SciTag";
1704+
private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag";
1705+
private static final String[] SCITAG_HEADERS = {
1706+
HEADER_SCITAG,
1707+
HEADER_TRANSFER_HEADER_SCITAG
1708+
};
1709+
17031710
public HttpTransfer(PnfsHandler pnfs, Subject subject,
17041711
Restriction restriction, FsPath path) throws URISyntaxException {
17051712
super(pnfs, subject, restriction, path);
@@ -1709,7 +1716,38 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject,
17091716
var request = ServletRequest.getRequest();
17101717
request.setAttribute(TRANSACTION_ATTRIBUTE, getTransaction());
17111718
_requestPath = Requests.stripToPath(request.getRequestURL().toString());
1712-
_transferTag = request.getHeader("SciTag");
1719+
_transferTag = readTransferTag(request);
1720+
}
1721+
1722+
private String readTransferTag(HttpServletRequest request) {
1723+
// SciTag takes precedence because it is checked first.
1724+
for (String header : SCITAG_HEADERS) {
1725+
String transferTag = request.getHeader(header);
1726+
if (transferTag != null && !transferTag.isBlank()) {
1727+
String trimmed = transferTag.trim();
1728+
if (LOGGER.isDebugEnabled()) {
1729+
LOGGER.debug("{} header found: {} (from client={})",
1730+
header, trimmed, request.getRemoteAddr());
1731+
}
1732+
return trimmed;
1733+
}
1734+
}
1735+
1736+
String flowFromQuery = request.getParameter("scitag.flow");
1737+
if (flowFromQuery != null && !flowFromQuery.isBlank()) {
1738+
String trimmed = flowFromQuery.trim();
1739+
if (LOGGER.isDebugEnabled()) {
1740+
LOGGER.debug("scitag.flow query parameter found: {} (from client={})",
1741+
trimmed, request.getRemoteAddr());
1742+
}
1743+
return trimmed;
1744+
}
1745+
1746+
if (LOGGER.isDebugEnabled()) {
1747+
LOGGER.debug("No SciTag header/parameter found in request (client={})",
1748+
request.getRemoteAddr());
1749+
}
1750+
return "";
17131751
}
17141752

17151753
protected ProtocolInfo createProtocolInfo(InetSocketAddress address) {
@@ -1729,6 +1767,10 @@ protected ProtocolInfo createProtocolInfo(InetSocketAddress address) {
17291767
wantedChecksums);
17301768
protocolInfo.setSessionId((int) getId());
17311769
protocolInfo.setTransferTag(_transferTag);
1770+
if (LOGGER.isDebugEnabled()) {
1771+
LOGGER.debug("ProtocolInfo created with transferTag='{}' for path={}",
1772+
_transferTag, _requestPath);
1773+
}
17321774
return protocolInfo;
17331775
}
17341776

modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
import org.dcache.xrootd.protocol.XrootdProtocol;
2121
import org.dcache.xrootd.tpc.XrootdTpcInfo;
2222
import org.dcache.xrootd.util.ParseException;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325

2426
public class XrootdTransfer extends RedirectedTransfer<InetSocketAddress> {
2527

28+
private static final Logger LOGGER = LoggerFactory.getLogger(XrootdTransfer.class);
29+
2630
private UUID _uuid;
2731
private InetSocketAddress _doorAddress;
2832
private InetSocketAddress _internalAddress;
@@ -39,6 +43,15 @@ public XrootdTransfer(PnfsHandler pnfs, Subject subject,
3943
this.restriction = requireNonNull(restriction);
4044
tpcInfo = new XrootdTpcInfo(opaque);
4145
_transferTag = opaque.getOrDefault("scitag.flow", "");
46+
if (LOGGER.isDebugEnabled()) {
47+
if (!_transferTag.isEmpty()) {
48+
LOGGER.debug("scitag.flow parameter found: {}", _transferTag);
49+
} else if (opaque.containsKey("scitag.flow")) {
50+
LOGGER.debug("scitag.flow parameter found but empty");
51+
} else {
52+
LOGGER.debug("No scitag.flow parameter in this request");
53+
}
54+
}
4255
try {
4356
tpcInfo.setUid(Subjects.getUid(subject));
4457
} catch (NoSuchElementException e) {
@@ -121,7 +134,11 @@ private XrootdProtocolInfo createXrootdProtocolInfo() {
121134
_uuid,
122135
_doorAddress);
123136

124-
protocolInfo.setTransferTag(_transferTag);
137+
protocolInfo.setTransferTag(_transferTag);
138+
if (LOGGER.isDebugEnabled()) {
139+
LOGGER.debug("XrootdProtocolInfo created with transferTag='{}' for pnfs={}",
140+
_transferTag, getPnfsId());
141+
}
125142
return protocolInfo;
126143
}
127144

modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.common.base.Preconditions.checkArgument;
2121
import static com.google.common.net.InetAddresses.forString;
2222

23+
import com.google.common.base.Splitter;
2324
import com.google.common.net.HostAndPort;
2425

2526
import diskCacheV111.vehicles.MoverInfoMessage;
@@ -49,6 +50,15 @@
4950
public class TransferLifeCycle {
5051

5152
private final static Logger LOGGER = LoggerFactory.getLogger(TransferLifeCycle.class);
53+
private static final int MIN_VALID_TRANSFER_TAG = 64;
54+
private static final int MAX_VALID_TRANSFER_TAG = 65535;
55+
private static final int EXPERIMENT_ID_BIT_SHIFT = 6;
56+
private static final int ACTIVITY_ID_MASK = 0x3F;
57+
private static final int DEFAULT_ACTIVITY_ID = 1;
58+
private static final Splitter FQAN_GROUP_SPLITTER = Splitter.on('/')
59+
.trimResults()
60+
.omitEmptyStrings()
61+
.limit(2);
5262

5363
/**
5464
* The UDP firefly default port as described in
@@ -79,12 +89,12 @@ public class TransferLifeCycle {
7989
*/
8090
public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo protocolInfo,
8191
Subject subject) {
82-
92+
8393
if (!enabled) {
8494
return;
8595
}
8696

87-
if (isLocalTransfer(src)) {
97+
if (isExcludedTransfer(src, dst)) {
8898
return;
8999
}
90100

@@ -122,12 +132,12 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage
122132
ProtocolInfo protocolInfo = mover.getProtocolInfo();
123133
Subject subject = mover.getSubject();
124134

125-
135+
126136
if (!enabled) {
127137
return;
128138
}
129139

130-
if (isLocalTransfer(src)) {
140+
if (isExcludedTransfer(src, dst)) {
131141
return;
132142
}
133143

@@ -200,7 +210,7 @@ public void setStorageStatisticsEnabled(boolean isEnabled) {
200210

201211
/**
202212
* Configures VO (Virtual Organization) to Experiment ID mapping.
203-
*
213+
*
204214
* @param voMap A comma-separated string of VO mapping entries in the format
205215
* "voName:expId".
206216
*/
@@ -214,7 +224,7 @@ public void setVoMapping(String voMap) {
214224
voToExpId.put(parts[0].trim().toLowerCase(), Integer.parseInt(parts[1].trim()));
215225
} catch (NumberFormatException e) {
216226
LOGGER.warn("Invalid VO mapping entry: {}", entry);
217-
}
227+
}
218228
} else {
219229
LOGGER.warn("Invalid VO mapping entry: {}", entry);
220230
}
@@ -273,8 +283,8 @@ private String getApplication(ProtocolInfo protocolInfo) {
273283
}
274284

275285
/**
276-
* Determine experiment ID, initially from the ProtocolInfo (xroot/http),
277-
* if that fails then fallback to the Subject's primary FQAN.
286+
* Determine experiment ID, initially from the ProtocolInfo (xroot/http),
287+
* if that fails then fallback to the Subject's primary FQAN.
278288
*
279289
* @param protocolInfo the ProtocolInfo object containing transfer-related metadata
280290
* @param subject the Subject representing the user or entity associated with the transfer
@@ -284,12 +294,12 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject)
284294
if (protocolInfo.getTransferTag() != null && !protocolInfo.getTransferTag().isEmpty()) {
285295
try {
286296
int transferTag = Integer.parseInt(protocolInfo.getTransferTag());
287-
if (transferTag <= 64 || transferTag >= 65536) {
297+
if (transferTag < MIN_VALID_TRANSFER_TAG || transferTag > MAX_VALID_TRANSFER_TAG) {
288298
LOGGER.warn("Invalid integer range for transfer tag: {}", protocolInfo.getTransferTag());
289299
return OptionalInt.empty();
290300
}
291301
// scitag = exp_id << 6 | act_id
292-
return OptionalInt.of(transferTag >> 6);
302+
return OptionalInt.of(transferTag >> EXPERIMENT_ID_BIT_SHIFT);
293303
} catch (NumberFormatException e) {
294304
LOGGER.warn("Invalid transfer tag: {}", protocolInfo.getTransferTag());
295305
return OptionalInt.empty();
@@ -301,23 +311,33 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject)
301311
return OptionalInt.empty();
302312
}
303313

304-
return voToExpId.containsKey(vo.getGroup().toLowerCase())
305-
? OptionalInt.of(voToExpId.get(vo.getGroup().toLowerCase()))
314+
String groupPath = vo.getGroup();
315+
if (groupPath == null || groupPath.isBlank()) {
316+
return OptionalInt.empty();
317+
}
318+
319+
groupPath = groupPath.toLowerCase();
320+
String voName = FQAN_GROUP_SPLITTER.splitToList(groupPath).get(0);
321+
322+
return voToExpId.containsKey(voName)
323+
? OptionalInt.of(voToExpId.get(voName))
306324
: OptionalInt.empty();
307325
}
308326

309-
private boolean isLocalTransfer(InetSocketAddress dst) {
310-
InetAddress addr = dst.getAddress();
311-
return localSubnet.test(addr);
327+
private boolean isExcludedTransfer(InetSocketAddress src, InetSocketAddress dst) {
328+
InetAddress srcAddress = src.getAddress();
329+
InetAddress dstAddress = dst.getAddress();
330+
return srcAddress != null && dstAddress != null
331+
&& localSubnet.test(srcAddress)
332+
&& localSubnet.test(dstAddress);
312333
}
313334

314335
private int getActivity(ProtocolInfo protocolInfo) {
315336
if (!protocolInfo.getTransferTag().isEmpty()) {
316337
// scitag = exp_id << 6 | act_id
317-
return Integer.parseInt(protocolInfo.getTransferTag()) & 0x3F;
338+
return Integer.parseInt(protocolInfo.getTransferTag()) & ACTIVITY_ID_MASK;
318339
} else {
319-
// default activity id = 1
320-
return 1;
340+
return DEFAULT_ACTIVITY_ID;
321341
}
322342
}
323343

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package org.dcache.pool.movers;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertTrue;
6+
7+
import diskCacheV111.vehicles.ProtocolInfo;
8+
import java.net.DatagramPacket;
9+
import java.net.DatagramSocket;
10+
import java.net.InetAddress;
11+
import java.net.InetSocketAddress;
12+
import java.net.SocketTimeoutException;
13+
import java.lang.reflect.Method;
14+
import java.util.OptionalInt;
15+
import javax.security.auth.Subject;
16+
import org.dcache.auth.FQANPrincipal;
17+
import org.junit.Before;
18+
import org.junit.Test;
19+
20+
public class TransferLifeCycleTest {
21+
22+
private Method getExperimentId;
23+
private TransferLifeCycle transferLifeCycle;
24+
25+
@Before
26+
public void setup() throws Exception {
27+
transferLifeCycle = new TransferLifeCycle();
28+
transferLifeCycle.setVoMapping("atlas:2,cms:3");
29+
30+
getExperimentId = TransferLifeCycle.class.getDeclaredMethod(
31+
"getExperimentId", ProtocolInfo.class, Subject.class);
32+
getExperimentId.setAccessible(true);
33+
}
34+
35+
@Test
36+
public void shouldAcceptMinimumValidSciTagValue() throws Exception {
37+
OptionalInt experimentId = resolveExperimentId("64", new Subject());
38+
39+
assertTrue(experimentId.isPresent());
40+
assertEquals(1, experimentId.getAsInt());
41+
}
42+
43+
@Test
44+
public void shouldRejectSciTagValueBelowValidRange() throws Exception {
45+
OptionalInt experimentId = resolveExperimentId("63", new Subject());
46+
47+
assertFalse(experimentId.isPresent());
48+
}
49+
50+
@Test
51+
public void shouldMapSlashPrefixedFqanToVoName() throws Exception {
52+
Subject subject = new Subject();
53+
subject.getPrincipals().add(new FQANPrincipal("/atlas/usatlas", true));
54+
55+
OptionalInt experimentId = resolveExperimentId("", subject);
56+
57+
assertTrue(experimentId.isPresent());
58+
assertEquals(2, experimentId.getAsInt());
59+
}
60+
61+
@Test
62+
public void shouldSuppressMarkerWhenBothEndpointsAreExcluded() throws Exception {
63+
assertFalse(sendsStartMarker("10.10.10.10", "10.20.20.20", "10.0.0.0/8"));
64+
}
65+
66+
@Test
67+
public void shouldNotSuppressMarkerWhenOnlySourceIsExcluded() throws Exception {
68+
assertTrue(sendsStartMarker("10.10.10.10", "203.0.113.20", "10.0.0.0/8"));
69+
}
70+
71+
@Test
72+
public void shouldNotSuppressMarkerWhenOnlyDestinationIsExcluded() throws Exception {
73+
assertTrue(sendsStartMarker("203.0.113.20", "10.20.20.20", "10.0.0.0/8"));
74+
}
75+
76+
private OptionalInt resolveExperimentId(String transferTag, Subject subject) throws Exception {
77+
return (OptionalInt) getExperimentId.invoke(transferLifeCycle,
78+
new TestProtocolInfo("xrootd", transferTag), subject);
79+
}
80+
81+
private boolean sendsStartMarker(String srcIp, String dstIp, String excludes) throws Exception {
82+
try (DatagramSocket socket = new DatagramSocket(0, InetAddress.getByName("127.0.0.1"))) {
83+
socket.setSoTimeout(700);
84+
85+
TransferLifeCycle lifecycle = new TransferLifeCycle();
86+
lifecycle.setEnabled(true);
87+
lifecycle.setVoMapping("atlas:2");
88+
lifecycle.setExcludes(new String[]{excludes});
89+
lifecycle.setFireflyDestination("127.0.0.1:" + socket.getLocalPort());
90+
91+
lifecycle.onStart(
92+
new InetSocketAddress(srcIp, 40000),
93+
new InetSocketAddress(dstIp, 20066),
94+
new TestProtocolInfo("xrootd", "129"),
95+
new Subject());
96+
97+
var packet = new DatagramPacket(new byte[4096], 4096);
98+
try {
99+
socket.receive(packet);
100+
return true;
101+
} catch (SocketTimeoutException ignored) {
102+
return false;
103+
}
104+
}
105+
}
106+
107+
private static class TestProtocolInfo implements ProtocolInfo {
108+
109+
private static final long serialVersionUID = 1L;
110+
private final String protocol;
111+
private final String transferTag;
112+
113+
private TestProtocolInfo(String protocol, String transferTag) {
114+
this.protocol = protocol;
115+
this.transferTag = transferTag;
116+
}
117+
118+
@Override
119+
public String getProtocol() {
120+
return protocol;
121+
}
122+
123+
@Override
124+
public int getMinorVersion() {
125+
return 0;
126+
}
127+
128+
@Override
129+
public int getMajorVersion() {
130+
return 0;
131+
}
132+
133+
@Override
134+
public String getVersionString() {
135+
return "test";
136+
}
137+
138+
@Override
139+
public String getTransferTag() {
140+
return transferTag;
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)