Skip to content

Commit 8a419ff

Browse files
committed
Address PR feedback.
Adds a check to ensure processors are only registered once. Replaces duplicative RpcService usages with the getService() method. Removes an unnecessary TMessage object creation.
1 parent d41bfa7 commit 8a419ff

5 files changed

Lines changed: 11 additions & 5 deletions

File tree

core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTMultiplexedProcessor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.thrift.protocol.TProtocolDecorator;
3030
import org.apache.thrift.protocol.TProtocolException;
3131

32+
import com.google.common.base.Preconditions;
33+
3234
/**
3335
* AccumuloTMultiplexedProcessor is a {@link TMultiplexedProcessor} variant that uses the
3436
* {@link AccumuloTMultiplexedProtocol} to register additional processors and then forward messages
@@ -40,7 +42,11 @@ public class AccumuloTMultiplexedProcessor extends TMultiplexedProcessor {
4042
private final TProcessor[] PROCESSORS = new TProcessor[RpcService.values().length];
4143

4244
public void registerProcessor(RpcService service, TProcessor processor) {
43-
PROCESSORS[Byte.toUnsignedInt(service.getShortId())] = Objects.requireNonNull(processor,
45+
int serviceID = Byte.toUnsignedInt(service.getShortId());
46+
// Ensure that TProcessors are only registered once.
47+
Preconditions.checkState(PROCESSORS[serviceID] == null,
48+
"processor already exists with id " + serviceID);
49+
PROCESSORS[serviceID] = Objects.requireNonNull(processor,
4450
"processor must not be null for RPCService: " + service.name());
4551
}
4652

core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTMultiplexedProtocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public AccumuloTMultiplexedProtocol(TProtocol protocol, RpcService rpcService) {
4949
@Override
5050
public void writeMessageBegin(TMessage tMessage) throws TException {
5151
if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
52-
super.writeMessageBegin(new TMessage(tMessage.name, tMessage.type, tMessage.seqid));
52+
super.writeMessageBegin(tMessage);
5353
super.writeByte(this.rpcService.getShortId());
5454
} else {
5555
super.writeMessageBegin(tMessage);

core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class ClientServiceThriftClient extends ThriftClientTypes<Client>
4444
public Pair<String,Client> getThriftServerConnection(ClientContext context,
4545
boolean preferCachedConnections) throws TTransportException {
4646
return getThriftServerConnection(LOG, this, context, preferCachedConnections,
47-
warnedAboutTServersBeingDown, RpcService.CLIENT);
47+
warnedAboutTServersBeingDown, getService());
4848
}
4949

5050
@Override

core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public TabletManagementClientServiceThriftClient(RpcService service) {
4949
public Pair<String,Client> getThriftServerConnection(ClientContext context,
5050
boolean preferCachedConnections) throws TTransportException {
5151
return getThriftServerConnection(LOG, this, context, preferCachedConnections,
52-
warnedAboutTServersBeingDown, RpcService.TABLET_MANAGEMENT);
52+
warnedAboutTServersBeingDown, getService());
5353
}
5454

5555
@Override

core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class TabletServerThriftClient extends ThriftClientTypes<Client>
4848
public Pair<String,Client> getThriftServerConnection(ClientContext context,
4949
boolean preferCachedConnections) throws TTransportException {
5050
return getThriftServerConnection(LOG, this, context, preferCachedConnections,
51-
warnedAboutTServersBeingDown, RpcService.TSERV);
51+
warnedAboutTServersBeingDown, getService());
5252
}
5353

5454
@Override

0 commit comments

Comments
 (0)