Skip to content

Commit d41bfa7

Browse files
committed
Added new Multiplexer
Added a new Multiplexer that uses a single byte to determine which processor to use. Modifies RpcService to have a (byte) shortId that can be used by the Multiplexer. Adds a new MultiplexedProtocol that uses the RpcService shortId to encode the processor selection instead of a string. Adds a test that uses the new AccumuloTMultiplexedProtocol.
1 parent a1ff22e commit d41bfa7

8 files changed

Lines changed: 323 additions & 50 deletions

File tree

core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.accumulo.core.Constants;
2424
import org.apache.accumulo.core.data.InstanceId;
25+
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
2526
import org.apache.accumulo.core.trace.TraceUtil;
2627
import org.apache.thrift.TException;
2728
import org.apache.thrift.protocol.TCompactProtocol;
@@ -43,22 +44,26 @@ public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
4344

4445
private final boolean isClient;
4546
private final InstanceId instanceId;
47+
private final RpcService rpcService;
4648

4749
static class AccumuloProtocol extends TCompactProtocol {
4850

4951
static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
5052
static final byte PROTOCOL_VERSION = 1; // changes only when the header format changes
5153

54+
private final RpcService rpcService;
5255
private final boolean isClient;
5356
private final InstanceId instanceId;
5457

5558
private Span span = null;
5659
private Scope scope = null;
5760

58-
private AccumuloProtocol(TTransport transport, InstanceId instanceId, boolean isClient) {
61+
private AccumuloProtocol(TTransport transport, InstanceId instanceId, boolean isClient,
62+
RpcService rpcService) {
5963
super(transport);
6064
this.instanceId = instanceId;
6165
this.isClient = isClient;
66+
this.rpcService = rpcService;
6267
}
6368

6469
/**
@@ -205,7 +210,7 @@ private void validateInstanceId(String clientInstanceId) throws TException {
205210

206211
@Override
207212
public AccumuloProtocol getProtocol(TTransport trans) {
208-
return new AccumuloProtocol(requireNonNull(trans), this.instanceId, isClient);
213+
return new AccumuloProtocol(requireNonNull(trans), this.instanceId, isClient, rpcService);
209214
}
210215

211216
/**
@@ -215,22 +220,24 @@ public AccumuloProtocol getProtocol(TTransport trans) {
215220
* @param isClient true if this factory produces protocols for the client side, false for the
216221
* server side
217222
*/
218-
private AccumuloProtocolFactory(InstanceId instanceId, boolean isClient) {
223+
private AccumuloProtocolFactory(InstanceId instanceId, boolean isClient, RpcService rpcService) {
219224
this.isClient = isClient;
220225
this.instanceId = requireNonNull(instanceId);
226+
this.rpcService = rpcService;
221227
}
222228

223229
/**
224230
* Creates a client-side factory for use in clients making RPC calls
225231
*/
226-
public static AccumuloProtocolFactory clientFactory(InstanceId instanceId) {
227-
return new AccumuloProtocolFactory(instanceId, true);
232+
public static AccumuloProtocolFactory clientFactory(InstanceId instanceId,
233+
ThriftClientTypes<?> clientType) {
234+
return new AccumuloProtocolFactory(instanceId, true, clientType.getService());
228235
}
229236

230237
/**
231238
* Creates a server-side factory for use in servers receiving RPC calls
232239
*/
233240
public static AccumuloProtocolFactory serverFactory(InstanceId instanceId) {
234-
return new AccumuloProtocolFactory(instanceId, false);
241+
return new AccumuloProtocolFactory(instanceId, false, RpcService.NONE);
235242
}
236243
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.core.rpc;
20+
21+
import java.util.Objects;
22+
23+
import org.apache.thrift.TException;
24+
import org.apache.thrift.TMultiplexedProcessor;
25+
import org.apache.thrift.TProcessor;
26+
import org.apache.thrift.protocol.TMessage;
27+
import org.apache.thrift.protocol.TMessageType;
28+
import org.apache.thrift.protocol.TProtocol;
29+
import org.apache.thrift.protocol.TProtocolDecorator;
30+
import org.apache.thrift.protocol.TProtocolException;
31+
32+
/**
33+
* AccumuloTMultiplexedProcessor is a {@link TMultiplexedProcessor} variant that uses the
34+
* {@link AccumuloTMultiplexedProtocol} to register additional processors and then forward messages
35+
* based on a single byte entry instead of having to create substrings of the original message.
36+
*/
37+
public class AccumuloTMultiplexedProcessor extends TMultiplexedProcessor {
38+
39+
// Support enough slots as RPC service types.
40+
private final TProcessor[] PROCESSORS = new TProcessor[RpcService.values().length];
41+
42+
public void registerProcessor(RpcService service, TProcessor processor) {
43+
PROCESSORS[Byte.toUnsignedInt(service.getShortId())] = Objects.requireNonNull(processor,
44+
"processor must not be null for RPCService: " + service.name());
45+
}
46+
47+
/**
48+
* Default processors are not supported as all AccumuloMultiplexedProcessors will have multiple
49+
* processors.
50+
*
51+
* @param processor the service which will be ignored.
52+
* @throws UnsupportedOperationException because this operation is not supported.
53+
*/
54+
@Override
55+
public void registerDefault(TProcessor processor) {
56+
throw new UnsupportedOperationException(
57+
this.getClass().getSimpleName() + " does not use default processors in any server type");
58+
}
59+
60+
/**
61+
* Registering processors with strings is not supported
62+
*
63+
* @param processor the service which will be ignored.
64+
* @throws UnsupportedOperationException because this operation is not supported.
65+
*/
66+
@Override
67+
public void registerProcessor(String string, TProcessor processor) {
68+
throw new UnsupportedOperationException(this.getClass().getSimpleName()
69+
+ " does not allow processors to be registered via strings");
70+
}
71+
72+
/**
73+
* This implementation of <code>process</code> performs the following steps:
74+
*
75+
* <ol>
76+
* <li>Read the beginning of the message.
77+
* <li>Reads a single byte at the end of the message.
78+
* <li>Uses that byte to locate the appropriate processor.
79+
* <li>Dispatch to the processor, with a decorated instance of TProtocol that allows
80+
* readMessageBegin() to return the original TMessage.
81+
* </ol>
82+
*
83+
* @throws TProtocolException If the message type is not CALL or ONEWAY, if the service name was
84+
* not found in the message, or if the service name was not found in the service map. You
85+
* called {@link AccumuloTMultiplexedProcessor#registerProcessor(RpcService, TProcessor)
86+
* registerProcessor} during initialization, right? :)
87+
*/
88+
@Override
89+
public void process(TProtocol iprot, TProtocol oprot) throws TException {
90+
/*
91+
* Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header. This
92+
* pulls the message "off the wire", which we'll deal with at the end of this method.
93+
*/
94+
TMessage message = iprot.readMessageBegin();
95+
96+
if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
97+
throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED,
98+
"This should not have happened!?");
99+
}
100+
101+
// Read the rpcServiceShortId from the message body
102+
byte serviceId = iprot.readByte();
103+
104+
int index = Byte.toUnsignedInt(serviceId);
105+
TProcessor actualProcessor = PROCESSORS[index];
106+
107+
if (actualProcessor == null) {
108+
throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED,
109+
"RpcService shortId not found after message: " + message.name + ". Did you "
110+
+ "forget to use the " + AccumuloTMultiplexedProtocol.class.getSimpleName()
111+
+ " in your client?");
112+
}
113+
114+
// Dispatch processing to the stored processor
115+
actualProcessor.process(new StoredMessageProtocol(iprot, message), oprot);
116+
}
117+
118+
private static class StoredMessageProtocol extends TProtocolDecorator {
119+
120+
TMessage pendingMessage;
121+
122+
public StoredMessageProtocol(TProtocol delegate, TMessage message) {
123+
super(delegate);
124+
this.pendingMessage = message;
125+
}
126+
127+
@Override
128+
public TMessage readMessageBegin() throws TException {
129+
if (pendingMessage != null) {
130+
TMessage m = pendingMessage;
131+
pendingMessage = null; // Release for GC; subsequent calls fall through
132+
return m;
133+
}
134+
return super.readMessageBegin();
135+
}
136+
}
137+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.core.rpc;
20+
21+
import org.apache.thrift.TException;
22+
import org.apache.thrift.protocol.TMessage;
23+
import org.apache.thrift.protocol.TMessageType;
24+
import org.apache.thrift.protocol.TProtocol;
25+
import org.apache.thrift.protocol.TProtocolDecorator;
26+
27+
public class AccumuloTMultiplexedProtocol extends TProtocolDecorator {
28+
private final RpcService rpcService;
29+
30+
/**
31+
* Wrap the specified protocol, allowing it to be used to communicate with a multiplexing server.
32+
* The <code>rpcService</code> is required as it is written to the end of the message so that the
33+
* multiplexing server can broker the function call to the proper service.
34+
*
35+
* @param protocol Your communication protocol of choice, e.g. <code>TBinaryProtocol</code>.
36+
* @param rpcService The RpcService communicating via this protocol.
37+
*/
38+
public AccumuloTMultiplexedProtocol(TProtocol protocol, RpcService rpcService) {
39+
super(protocol);
40+
this.rpcService = rpcService;
41+
}
42+
43+
/**
44+
* Appends the RpcService shortId to the message body.
45+
*
46+
* @param tMessage The original message.
47+
* @throws TException Passed through from wrapped <code>TProtocol</code> instance.
48+
*/
49+
@Override
50+
public void writeMessageBegin(TMessage tMessage) throws TException {
51+
if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
52+
super.writeMessageBegin(new TMessage(tMessage.name, tMessage.type, tMessage.seqid));
53+
super.writeByte(this.rpcService.getShortId());
54+
} else {
55+
super.writeMessageBegin(tMessage);
56+
}
57+
}
58+
}

core/src/main/java/org/apache/accumulo/core/rpc/RpcService.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,43 @@
2323

2424
/**
2525
* This is an enum containing all rpc service types used by Thrift. These are used by
26-
* {@link ThriftClientTypes} and {@link ServiceLockData}
26+
* {@link ThriftClientTypes} and {@link ServiceLockData} These services also contain a 1-byte
27+
* identifier to reduce RPC header size.
2728
*/
2829
public enum RpcService {
29-
CLIENT,
30-
COORDINATOR,
31-
COMPACTOR,
32-
FATE_CLIENT,
33-
FATE_WORKER,
34-
GC,
35-
MANAGER,
36-
NONE,
37-
TABLET_INGEST,
38-
TABLET_MANAGEMENT,
39-
TABLET_SCAN,
40-
TSERV,
41-
SERVER_PROCESS
30+
31+
CLIENT((byte) 0),
32+
COORDINATOR((byte) 1),
33+
COMPACTOR((byte) 2),
34+
FATE_CLIENT((byte) 3),
35+
FATE_WORKER((byte) 4),
36+
GC((byte) 5),
37+
MANAGER((byte) 6),
38+
NONE((byte) 7),
39+
TABLET_INGEST((byte) 8),
40+
TABLET_MANAGEMENT((byte) 9),
41+
TABLET_SCAN((byte) 10),
42+
TSERV((byte) 11),
43+
SERVER_PROCESS((byte) 12);
44+
45+
private final byte shortID;
46+
47+
private static final RpcService[] SERVICES = values();
48+
49+
RpcService(byte shortID) {
50+
this.shortID = shortID;
51+
}
52+
53+
public byte getShortId() {
54+
return this.shortID;
55+
}
56+
57+
public static RpcService fromShortId(byte id) {
58+
for (RpcService service : SERVICES) {
59+
if (service.shortID == id) {
60+
return service;
61+
}
62+
}
63+
throw new IllegalArgumentException("Unknown RPC shortId: " + id);
64+
}
4265
}

core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ public class ThriftUtil {
7171
*
7272
* @return The client-side Accumulo TProtocolFactory for RPC
7373
*/
74-
public static TProtocolFactory clientProtocolFactory(InstanceId instanceId) {
75-
return AccumuloProtocolFactory.clientFactory(instanceId);
74+
public static TProtocolFactory clientProtocolFactory(InstanceId instanceId,
75+
ThriftClientTypes<?> type) {
76+
return AccumuloProtocolFactory.clientFactory(instanceId, type);
7677
}
7778

7879
/**
@@ -101,7 +102,7 @@ public static TTransportFactory transportFactory() {
101102
*/
102103
public static <T extends TServiceClient> T createClient(ThriftClientTypes<T> type,
103104
TTransport transport, InstanceId instanceId) {
104-
return type.getClient(clientProtocolFactory(instanceId).getProtocol(transport));
105+
return type.getClient(clientProtocolFactory(instanceId, type).getProtocol(transport));
105106
}
106107

107108
/**

core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323

2424
import org.apache.accumulo.core.clientImpl.ClientContext;
2525
import org.apache.accumulo.core.manager.thrift.FateWorkerService;
26+
import org.apache.accumulo.core.rpc.AccumuloTMultiplexedProtocol;
2627
import org.apache.accumulo.core.rpc.RpcService;
2728
import org.apache.thrift.TServiceClient;
2829
import org.apache.thrift.TServiceClientFactory;
29-
import org.apache.thrift.protocol.TMultiplexedProtocol;
3030
import org.apache.thrift.protocol.TProtocol;
3131

32-
public class ThriftClientTypes<C extends TServiceClient> {
32+
public abstract class ThriftClientTypes<C extends TServiceClient> {
3333

3434
public static final ClientServiceThriftClient CLIENT =
3535
new ClientServiceThriftClient(RpcService.CLIENT);
@@ -87,7 +87,7 @@ public final TServiceClientFactory<C> getClientFactory() {
8787

8888
public C getClient(TProtocol prot) {
8989
// All server side TProcessors are multiplexed. Wrap this protocol.
90-
return getClientFactory().getClient(new TMultiplexedProtocol(prot, getServiceName()));
90+
return getClientFactory().getClient(new AccumuloTMultiplexedProtocol(prot, getService()));
9191
}
9292

9393
public C getConnection(ClientContext context) {

0 commit comments

Comments
 (0)