Skip to content

Adds rpc services and refactors ThriftClientTypes#6311

Open
ddanielr wants to merge 6 commits intoapache:mainfrom
ddanielr:Add-rpc-service
Open

Adds rpc services and refactors ThriftClientTypes#6311
ddanielr wants to merge 6 commits intoapache:mainfrom
ddanielr:Add-rpc-service

Conversation

@ddanielr
Copy link
Copy Markdown
Contributor

This PR is part of the larger refactor work in #6291 and contains the first set of changes needed for #6291.

This PR moves the existing ThriftService Enum type to a new enum class in the rpc package.
This new RpcService enum is used instead of strings in the ThriftClientTypes constructor which links it to the ThriftProcessorTypes class.

This change now links the thrift classes to the enum as well as the ServiceLockData.

The Exec and ExecVoid interfaces are also moved from ThriftClientTypes to TServiceClient to simply that class.

Moves the existing ThriftService enum to a rpc.RpcService class.
Replace hardcoded strings with RpcService references in tests.
Switch from using hardcoded strings to RpcServices in
ThriftClientTypes.

This enforces that all ThriftClientTypes map to an existing rpc service.
Only four ThriftClientType classes used the Exec and ExecVoid interfaces.
These were moved over to the TServerClient as three of the four classes that
used these interfaces also implemented TServerClient.

ManagerThriftClient also used these interfaces, so they were directly
added to that class and the Override annotations were removed.

Moving the interfaces allows ThriftClientTypes to become a concrete class.
@ddanielr ddanielr added this to the 4.0.0 milestone Apr 10, 2026
Fix the missed references caused by merging in the main branch.
Also fix code to eliminate warnings from the ide for code improvements.
Comment thread core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java Outdated

public static final TabletManagementClientServiceThriftClient TABLET_MGMT =
new TabletManagementClientServiceThriftClient("tablet");
new TabletManagementClientServiceThriftClient(RpcService.TABLET_MANAGEMENT);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getServiceName is called from ThriftProcessortTypes when setting up the Multiplexed Thrift Server. The name is sent as part of the client request and when the server receives the request it passes the client request to the handler mapped to the name. Just providing some context for the short names ("tablet" vs "TABLET_MANAGEMENT") in the older code. My thought was to use something meaningful, but shorter rather than longer from a bytes-over-the-network perspective.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RpcService enum could have a method on it that gives a short/multiplex name.


ClientServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
ClientServiceThriftClient(RpcService service) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in ThriftClientTypes regarding the service name and the multiplexed connection. I don't think the constructors for the ThriftClientTypes need to take an argument at this point, there is only one valid argument, RpcService.CLIENT.

The argument was a String in case we needed to change it, but things are working as expected. I think we just want to use something meaningful and short.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could change to

ClientServiceThriftClient() {
  super(RpcService.CLIENT, new Client.Factory());
}

Added a new Multiplexer that uses a single byte to determine which processor to use.
Modifies RpcService to have a (byte) shortId that can be used by the
Multiplexer.

Adds a new MultiplexedProtocol that uses the RpcService shortId to
encode the processor selection instead of a string.

Adds a test that uses the new AccumuloTMultiplexedProtocol.
Comment thread core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTMultiplexedProtocol.java Outdated
@dlmarion
Copy link
Copy Markdown
Contributor

I feel like the changes in d41bfa7 might be overkill. My previous comments suggested that we keep the string short because it's passed in every message and used for directing traffic on the server side. The current values are several characters and the initial changes in this PR seemed like they could have made them longer. Going to a single byte might be the best in terms of performance, but is it much different than using a single character string? If we used a single character string, then we wouldn't need the new custom Multiplexer classes.

Adds a check to ensure processors are only registered once.

Replaces duplicative RpcService usages with the getService() method.

Removes an unnecessary TMessage object creation.
@ddanielr
Copy link
Copy Markdown
Contributor Author

I feel like the changes in d41bfa7 might be overkill. My previous comments suggested that we keep the string short because it's passed in every message and used for directing traffic on the server side. The current values are several characters and the initial changes in this PR seemed like they could have made them longer. Going to a single byte might be the best in terms of performance, but is it much different than using a single character string? If we used a single character string, then we wouldn't need the new custom Multiplexer classes.

I had another set of changes that reduced the RpcService Enum name lengths down to 5 characters but when I was looking at why we were using strings I examined the multiplexer code and found that it was creating multiple string objects per message that were immediately discarded and could be putting more pressure on GC.

Thrift's Multiplexing Overview

The current TMultiplexedProcessor uses a multiplexer protocol (TMultiplexedProtocol) to craft custom messages that identify which Tprocessor should receive the message.

TMultiplexedProtocol message creation.

The protocol takes in a string on initialization and assigns it to SERVICE_NAME.

When the protocol has to write a Tmessage, if the TMessage type is CALL or ONEWAY it prepends that SERVICE_NAME string and a separator character onto every TMessage.

  public void writeMessageBegin(TMessage tMessage) throws TException {
    if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
      super.writeMessageBegin(
          new TMessage(SERVICE_NAME + SEPARATOR + tMessage.name, tMessage.type, tMessage.seqid));
    } else {
      super.writeMessageBegin(tMessage);
    }
  }

The separator character is needed to allow for strings of various lengths to exist.

TMultiplexedProcessor message processing

The TMultiplexedProcessor takes in strings and processors and stores them in a HashMap with Strings used as keys.

For each TMessage the processor needs to process, it has to find the index of the separator character.
int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);

Then a new string is created from the message and used as the lookup key in the HashMap.
String serviceName = message.name.substring(0, index);
TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName);

Once the processor is found, a new TMessage object is created with the processor service name removed.

    // Create a new TMessage, removing the service name
    TMessage standardMessage =
        new TMessage(
            message.name.substring(serviceName.length() + TMultiplexedProtocol.SEPARATOR.length()),
            message.type,
            message.seqid);

And that new message is finally set to the specific processor
actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot);

New Accumulo Modified Multiplexing

The first major change is using a single byte as the processor identifier instead of String values.

By not using Strings we can use an array for registering the processors. We can also be precise with the array size.
private final TProcessor[] PROCESSORS = new TProcessor[RpcService.values().length];

Using bytes also removes the need for a "indexOf" operation on the processor side since thrift supports a readByte operation on the TProtocol.

// Read the rpcServiceShortId from the message body
byte serviceId = iprot.readByte();

This also trades the String.hash call for a Byte.toUnsignedInt call.

The final change was writing the shortId after the writeMessageBegin call so the original message can be passed onto the correct processor.
This removes the need to create a new TMessage based on a substring of the original message.

This was similar to the work done in the AccumuloProtocol with the ClientHeaders and AccumuloProtocolFactory.readAndValidateHeader()

I tested this by writing some jmh tests to see how efficient the new processor was compared to the original thrift processor.
I also wrote test cases that only tested the routing cost to understand if the GC benefits were there.
https://gist.github.com/ddanielr/9b2c65d03ffdae4c098a2fdea41acd2d

Based on these benchmarks, the new processor is significantly faster. However the real benefit is the reduction of GC pressure by creating less, short-lived objects for each TMessage received.

@keith-turner
Copy link
Copy Markdown
Contributor

Based on these benchmarks, the new processor is significantly faster. However the real benefit is the reduction of GC pressure by creating less, short-lived objects for each TMessage received.

This is neat stuff, seems like its doing significantly less work per message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants