Skip to content

Commit 5380ee9

Browse files
authored
Merge pull request #8055 from ShawnMcKee/fix/firefly-tpc-onstart-marker
pool: add firefly onStart marker for MoverProtocol-based transfers
2 parents b656f9a + 07fb840 commit 5380ee9

3 files changed

Lines changed: 55 additions & 3 deletions

File tree

modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) {
7373
_transferLifeCycle = transferLifeCycle;
7474
}
7575

76+
protected TransferLifeCycle getTransferLifeCycle() {
77+
return _transferLifeCycle;
78+
}
79+
7680
@Override
7781
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
7882
CellPath pathToDoor)

modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.base.Splitter;
2121
import diskCacheV111.util.CacheException;
22+
import diskCacheV111.vehicles.PoolIoFileMessage;
2223
import diskCacheV111.vehicles.ProtocolInfo;
2324
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
2425
import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo;
@@ -58,8 +59,11 @@
5859
import org.apache.http.impl.client.HttpClients;
5960
import org.apache.http.protocol.HttpContext;
6061
import org.apache.http.protocol.HttpRequestExecutor;
62+
import org.dcache.pool.movers.Mover;
6163
import org.dcache.pool.movers.MoverProtocol;
64+
import org.dcache.pool.movers.MoverProtocolMover;
6265
import org.dcache.pool.movers.RemoteHttpDataTransferProtocol;
66+
import org.dcache.pool.repository.ReplicaDescriptor;
6367
import org.dcache.security.trust.AggregateX509TrustManager;
6468
import org.dcache.util.Version;
6569
import org.slf4j.Logger;
@@ -69,6 +73,8 @@
6973
import static com.google.common.base.Preconditions.checkArgument;
7074
import static dmg.util.Exceptions.meaningfulMessage;
7175

76+
import dmg.cells.nucleus.CellPath;
77+
7278
public class RemoteHttpTransferService extends SecureRemoteTransferService {
7379

7480
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHttpTransferService.class);
@@ -128,6 +134,15 @@ public HttpUriRequest getRedirect(final HttpRequest request,
128134
private X509TrustManager trustManager;
129135
private CloseableHttpClient sharedClient;
130136

137+
@Override
138+
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
139+
CellPath pathToDoor) throws CacheException {
140+
Mover<?> mover = super.createMover(handle, message, pathToDoor);
141+
MoverProtocolMover mpm = (MoverProtocolMover) mover;
142+
((RemoteHttpDataTransferProtocol) mpm.getMover()).setSubject(message.getSubject());
143+
return mover;
144+
}
145+
131146
@Override
132147
protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception {
133148
if (!(info instanceof RemoteHttpDataTransferProtocolInfo)) {
@@ -145,7 +160,7 @@ protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception
145160
SSLContext context = buildSSLContext(credential.getKeyManager());
146161
CloseableHttpClient client = createClient(context);
147162

148-
return new RemoteHttpDataTransferProtocol(client) {
163+
return new RemoteHttpDataTransferProtocol(client, getTransferLifeCycle()) {
149164
@Override
150165
protected void afterTransfer() {
151166
super.afterTransfer();
@@ -159,7 +174,7 @@ protected void afterTransfer() {
159174
}
160175
}
161176

162-
return new RemoteHttpDataTransferProtocol(sharedClient);
177+
return new RemoteHttpDataTransferProtocol(sharedClient, getTransferLifeCycle());
163178
}
164179

165180
@PostConstruct

modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import diskCacheV111.util.CacheException;
1818
import diskCacheV111.util.ThirdPartyTransferFailedCacheException;
19+
import diskCacheV111.vehicles.IpProtocolInfo;
1920
import diskCacheV111.vehicles.ProtocolInfo;
2021
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
2122
import java.io.IOException;
@@ -39,6 +40,7 @@
3940
import java.util.function.Consumer;
4041
import java.util.stream.Collectors;
4142
import javax.annotation.concurrent.GuardedBy;
43+
import javax.security.auth.Subject;
4244
import org.apache.http.Header;
4345
import org.apache.http.HttpEntity;
4446
import org.apache.http.HttpInetConnection;
@@ -213,9 +215,18 @@ private enum HeaderFlags {
213215
private Long _expectedTransferSize;
214216

215217
private InetSocketAddress _localEndpoint;
218+
private final TransferLifeCycle _transferLifeCycle;
219+
private Subject _subject;
220+
private boolean _startMarkerSent;
216221

217-
public RemoteHttpDataTransferProtocol(CloseableHttpClient client) {
222+
public RemoteHttpDataTransferProtocol(CloseableHttpClient client,
223+
TransferLifeCycle transferLifeCycle) {
218224
_client = requireNonNull(client);
225+
_transferLifeCycle = requireNonNull(transferLifeCycle);
226+
}
227+
228+
public void setSubject(Subject subject) {
229+
_subject = subject;
219230
}
220231

221232
private static void checkThat(boolean isOk, String message) throws CacheException {
@@ -540,6 +551,7 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf
540551
CloseableHttpResponse response = _client.execute(get, context);
541552

542553
_localEndpoint = localAddress().orElse(null);
554+
startFlowMarker();
543555

544556
boolean isSuccessful = false;
545557
try {
@@ -605,6 +617,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info)
605617

606618
try (CloseableHttpResponse response = _client.execute(put, context)) {
607619
_localEndpoint = localAddress().orElse(null);
620+
startFlowMarker();
608621
StatusLine status = response.getStatusLine();
609622
switch (status.getStatusCode()) {
610623
case 200: /* OK (not actually a valid response from PUT) */
@@ -981,6 +994,26 @@ public Long getBytesExpected() {
981994
return _expectedTransferSize;
982995
}
983996

997+
private void startFlowMarker() {
998+
if (_startMarkerSent || _localEndpoint == null || _subject == null) {
999+
return;
1000+
}
1001+
1002+
ProtocolInfo protocolInfo = _channel.getProtocolInfo();
1003+
if (!(protocolInfo instanceof IpProtocolInfo ipInfo)) {
1004+
return;
1005+
}
1006+
1007+
InetSocketAddress remoteEndpoint = ipInfo.getSocketAddress();
1008+
if (remoteEndpoint == null) {
1009+
return;
1010+
}
1011+
1012+
_transferLifeCycle.onStart(remoteEndpoint, _localEndpoint,
1013+
protocolInfo, _subject);
1014+
_startMarkerSent = true;
1015+
}
1016+
9841017
@Override
9851018
public Optional<InetSocketAddress> getLocalEndpoint() {
9861019
return Optional.ofNullable(_localEndpoint);

0 commit comments

Comments
 (0)