Skip to content

Add Inbound Channel Adapter for MQTT based on hivemq#10989

Open
mjd507 wants to merge 1 commit into
spring-projects:mainfrom
mjd507:hivemq-mqtt
Open

Add Inbound Channel Adapter for MQTT based on hivemq#10989
mjd507 wants to merge 1 commit into
spring-projects:mainfrom
mjd507:hivemq-mqtt

Conversation

@mjd507
Copy link
Copy Markdown
Contributor

@mjd507 mjd507 commented May 6, 2026

Related to: #3102

  • introduce a new spring-integration-hivemq module for MQTT adapters.
  • basically, the inbound message driven adapters are based on the MqttClient(either Mqtt5AsyncClient or Mqtt3AsyncClient) and a topic.
  • abstract common properties setup (topic/qos/manualAck/executor) for both mqtt v3 and v5 from hivemq side.
  • abstract common properties setup (messageConverter/payloadType) for both mqtt v3 and v5 from SI side.
  • for Mqtt5MessageDrivenChannelAdapter, there are some additional properties (noLocal/retainHandling/retainAsPublished) for subscription, also a Mqtt5HeaderMapper is used for mapping v5 specific headers like contentType/responseTopic/correlationData/UserProperties.
  • introduce MqttClientConnectionCoordinator for coordinating MQTT client connections and disconnections. avoid race-induced state exception. these v3 and v5 Coordinators are intent for internal use only.
  • provide setMqttConnect(MqttConnect) in v5 and setMqtt3ConnectView(Mqtt3ConnectView) in v3, if specific setup needed in connect. for example: cleanStart/keepAlive.
  • if connect or subscribe failed, a MqttConnectionFailedEvent will be published. after subscribe success, a MqttSubscribedEvent will be published.
  • if AutomaticReconnect is applied by the mqttClient, after reconnect, topic will be automatic subscribed as well.

GenericContainer<?> HIVEMQ_CONTAINER = new GenericContainer<>("hivemq/hivemq-ce:2024.3")
.withExposedPorts(CONTAINER_PORT)
.withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(new HostConfig()
.withPortBindings(new PortBinding(Ports.Binding.bindPort(MAPPED_PORT), new ExposedPort(CONTAINER_PORT)))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to mapped port will be changed after container stop then start,
I haven't found a way to set the new mapped port to the already built MqttClient,
so nows set a fixed mapped port here.

Related to: spring-projects#3102

- introduce a new `spring-integration-hivemq` module for MQTT adapters.
- basically, the inbound message driven adapters are based on the `MqttClient`(either `Mqtt5AsyncClient` or `Mqtt3AsyncClient`) and a `topic`.
- abstract common properties setup (topic/qos/manualAck/executor) for both mqtt v3 and v5 from hivemq side.
- abstract common properties setup (messageConverter/payloadType) for both mqtt v3 and v5 from SI side.
- for Mqtt5MessageDrivenChannelAdapter, there are some additional properties (noLocal/retainHandling/retainAsPublished) for subscription, also a `Mqtt5HeaderMapper` is used for mapping v5 specific headers like contentType/responseTopic/correlationData/UserProperties.
- introduce `MqttClientConnectionCoordinator` for coordinating MQTT client connections and disconnections. avoid race-induced state exception. these v3 and v5 Coordinators are intent for internal use only.
- provide `setMqttConnect(MqttConnect)` in v5 and `setMqtt3ConnectView(Mqtt3ConnectView)` in v3, if specific setup needed in connect. for example: cleanStart/keepAlive.
- if connect or subscribe failed, a `MqttConnectionFailedEvent` will be published. after subscribe success, a `MqttSubscribedEvent` will be published.
- if AutomaticReconnect is applied by the mqttClient, after reconnect, topic will be automatic subscribed as well.

Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
@mjd507
Copy link
Copy Markdown
Contributor Author

mjd507 commented May 12, 2026

here is the PR in my own repo for outbound v3/v5 adapters (mjd507#1).
I will modify and raise it once the inbound review completed.

Comment thread build.gradle
}
}

project('spring-integration-hivemq') {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for doing this contribution!
I know that I've asked about this name in the issue, but now when I think about this again it looks like the name is wrong.
This module is the client for MQTT protocol and doesn't matter what is the broker (or peer) is there we are connecting to.
Let's see if we can rename it into spring-integration-mqtt-client!
I am considering to deprecated existing module based on Paho.
Then one day we will remove Paho dependency and rename this new module back to just spring-integration-mqtt.
It is a bit awkward to name it hivemq when it is going to work with any MQTT.
Isn't it?
Sorry for confusion; and I'm open for any other suggestions!

Note: this new feature is aimed at least for the next 7.2 due upcoming November release cycle.
Therefore, bear with us how slow we are responding to your effort.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok to go ahead with the name spring-integration-mqtt-client.

Just It might bring a little confusion for end users to adjust the dependency, we might take effort to explain why name as this and finally as that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants