文网站建设广告制作公司
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题
直接主菜:
spring cloud stream 架构
中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件
springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html
但是有时候还需自己实现,官方也给出了响应步骤
https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl
自定义实现
定义xxBinder
cp了一网友的项目,我换成了maven,
https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven
具体实现如下:
设置config类
import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;/*** Mqtt binder configuration class* @author Alex , Li Sheung Lai*/
@Configuration
@EnableConfigurationProperties({MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {@Autowiredprivate MqttExtendedBindingProperties mqttExtendedBindingProperties;@Beanpublic MqttBinderConfigurationProperties configurationProperties(){return new MqttBinderConfigurationProperties();}@Beanpublic MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){return new MqttProvisioningProvider();}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(configurationProperties.getUrl());options.setUserName(configurationProperties.getUsername());options.setPassword(configurationProperties.getPassword().toCharArray());options.setCleanSession(configurationProperties.isCleanSession());options.setConnectionTimeout(configurationProperties.getConnectionTimeout());options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));}else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {factory.setPersistence(new MemoryPersistence());}return factory;}@Beanpublic MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,MqttProvisioningProvider provisioningProvider){MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);return mqttMessageChannelBinder;}
配置properties
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;import javax.validation.constraints.Size;/*** Configuration properties for the Mqtt binder . The properties in the class* are prefixed with <b>spring.cloud.stream.mqtt.binder</b>* @author Alex , Li Sheung Lai*/
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {/*** location of the mqtt broker(s) (comma-delimited list)*/@Size(min = 1)private String[] url = new String[] { "tcp://localhost:1883" };/*** the username to use when connecting to the broker*/private String username = "guest";/*** the password to use when connecting to the broker*/private String password = "guest";/*** whether the client and server should remember state across restarts and reconnects*/private boolean cleanSession = true;/*** the connection timeout in seconds*/private int connectionTimeout = 30;/*** the ping interval in seconds*/private int keepAliveInterval = 60;/*** 'memory' or 'file'*/private String persistence = "memory";/*** Persistence directory*/private String persistenceDirectory = "/tmp/paho";public MqttBinderConfigurationProperties() {}public String[] getUrl() {return url;}public void setUrl(String[] url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public boolean isCleanSession() {return cleanSession;}public void setCleanSession(boolean cleanSession) {this.cleanSession = cleanSession;}public int getConnectionTimeout() {return connectionTimeout;}public void setConnectionTimeout(int connectionTimeout) {this.connectionTimeout = connectionTimeout;}public int getKeepAliveInterval() {return keepAliveInterval;}public void setKeepAliveInterval(int keepAliveInterval) {this.keepAliveInterval = keepAliveInterval;}public String getPersistence() {return persistence;}public void setPersistence(String persistence) {this.persistence = persistence;}public String getPersistenceDirectory() {return persistenceDirectory;}public void setPersistenceDirectory(String persistenceDirectory) {this.persistenceDirectory = persistenceDirectory;}
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝
实现一个channel binder
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;public class MqttMessageChannelBinderextends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();private MqttPahoClientFactory mqttPahoClientFactory;public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {this.mqttPahoClientFactory = mqttPahoClientFactory;}public MqttMessageChannelBinder(MqttPahoClientFactory factory,MqttProvisioningProvider provisioningProvider) {super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);this.mqttPahoClientFactory = factory;}@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination,ExtendedProducerProperties<MqttSinkProperties> producerProperties,MessageChannel errorChannel) throws Exception {MqttSinkProperties sinkProperties = producerProperties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sinkProperties.getQos(),sinkProperties.isRetained(),sinkProperties.getCharset());MqttPahoMessageHandler handler = new MqttPahoMessageHandler(sinkProperties.getClientId(),this.mqttPahoClientFactory);handler.setAsync(sinkProperties.isAsync());handler.setDefaultTopic(sinkProperties.getTopic());handler.setConverter(converter);return handler;}@Overrideprotected MessageProducer createConsumerEndpoint(ConsumerDestination destination,String group,ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {MqttSourceProperties sourceProperties = properties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sourceProperties.getCharset());converter.setPayloadAsBytes(sourceProperties.isBinary());MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(sourceProperties.getClientId(),this.mqttPahoClientFactory,sourceProperties.getTopics());adapter.setBeanFactory(this.getBeanFactory());adapter.setQos(sourceProperties.getQos());adapter.setConverter(converter);adapter.setOutputChannelName(destination.getName());return adapter;}public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {this.extendedBindingProperties = extendedBindingProperties;}@Overridepublic MqttSourceProperties getExtendedConsumerProperties(String channelName) {return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);}@Overridepublic MqttSinkProperties getExtendedProducerProperties(String channelName) {return this.extendedBindingProperties.getExtendedProducerProperties(channelName);}@Overridepublic String getDefaultsPrefix() {return this.extendedBindingProperties.getDefaultsPrefix();}@Overridepublic Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {return this.extendedBindingProperties.getExtendedPropertiesEntryClass();}}
实现一个Provider
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;public class MqttProvisioningProvider implementsProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {@Overridepublic ProducerDestination provisionProducerDestination(String name,ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@Overridepublic ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@RequiredArgsConstructorprivate class MqttTopicDestination implements ProducerDestination , ConsumerDestination{private final String destination;@Overridepublic String getName() {return this.destination.trim();}@Overridepublic String getNameForPartition(int partition) {throw new UnsupportedOperationException("Partitioning is not implemented for mqtt");}}
}
配置 spring.binders
mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration
配置如下:
spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin
记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了
