当前位置: 首页 > news >正文

文网站建设广告制作公司

文网站建设,广告制作公司,抖音代运营合作方案,石泉政协网站建设方案背景xxx,关键字 binder stream ,解决多中间件通信及切换问题直接主菜:spring cloud stream 架构中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件 springcloudstream已自己集成了kafk…

背景xxx,关键字 binder stream ,解决多中间件通信及切换问题

直接主菜:

  1. spring cloud stream 架构

中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件

springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html

但是有时候还需自己实现,官方也给出了响应步骤

https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl

  1. 自定义实现

定义xxBinder

cp了一网友的项目,我换成了maven,

https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven

具体实现如下:

设置config类


import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;/*** Mqtt binder configuration class* @author Alex , Li Sheung Lai*/
@Configuration
@EnableConfigurationProperties({MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {@Autowiredprivate MqttExtendedBindingProperties mqttExtendedBindingProperties;@Beanpublic MqttBinderConfigurationProperties configurationProperties(){return new MqttBinderConfigurationProperties();}@Beanpublic MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){return new MqttProvisioningProvider();}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(configurationProperties.getUrl());options.setUserName(configurationProperties.getUsername());options.setPassword(configurationProperties.getPassword().toCharArray());options.setCleanSession(configurationProperties.isCleanSession());options.setConnectionTimeout(configurationProperties.getConnectionTimeout());options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));}else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {factory.setPersistence(new MemoryPersistence());}return factory;}@Beanpublic MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,MqttProvisioningProvider provisioningProvider){MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);return mqttMessageChannelBinder;}

配置properties


import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;import javax.validation.constraints.Size;/*** Configuration properties for the Mqtt binder . The properties in the class* are prefixed with <b>spring.cloud.stream.mqtt.binder</b>* @author Alex , Li Sheung Lai*/
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {/*** location of the mqtt broker(s) (comma-delimited list)*/@Size(min = 1)private String[] url = new String[] { "tcp://localhost:1883" };/*** the username to use when connecting to the broker*/private String username = "guest";/*** the password to use when connecting to the broker*/private String password = "guest";/*** whether the client and server should remember state across restarts and reconnects*/private boolean cleanSession = true;/*** the connection timeout in seconds*/private int connectionTimeout = 30;/*** the ping interval in seconds*/private int keepAliveInterval = 60;/*** 'memory' or 'file'*/private String persistence = "memory";/*** Persistence directory*/private String persistenceDirectory = "/tmp/paho";public MqttBinderConfigurationProperties() {}public String[] getUrl() {return url;}public void setUrl(String[] url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public boolean isCleanSession() {return cleanSession;}public void setCleanSession(boolean cleanSession) {this.cleanSession = cleanSession;}public int getConnectionTimeout() {return connectionTimeout;}public void setConnectionTimeout(int connectionTimeout) {this.connectionTimeout = connectionTimeout;}public int getKeepAliveInterval() {return keepAliveInterval;}public void setKeepAliveInterval(int keepAliveInterval) {this.keepAliveInterval = keepAliveInterval;}public String getPersistence() {return persistence;}public void setPersistence(String persistence) {this.persistence = persistence;}public String getPersistenceDirectory() {return persistenceDirectory;}public void setPersistenceDirectory(String persistenceDirectory) {this.persistenceDirectory = persistenceDirectory;}
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝

实现一个channel binder


import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;public class MqttMessageChannelBinderextends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();private MqttPahoClientFactory mqttPahoClientFactory;public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {this.mqttPahoClientFactory = mqttPahoClientFactory;}public MqttMessageChannelBinder(MqttPahoClientFactory factory,MqttProvisioningProvider provisioningProvider) {super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);this.mqttPahoClientFactory = factory;}@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination,ExtendedProducerProperties<MqttSinkProperties> producerProperties,MessageChannel errorChannel) throws Exception {MqttSinkProperties sinkProperties = producerProperties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sinkProperties.getQos(),sinkProperties.isRetained(),sinkProperties.getCharset());MqttPahoMessageHandler handler = new MqttPahoMessageHandler(sinkProperties.getClientId(),this.mqttPahoClientFactory);handler.setAsync(sinkProperties.isAsync());handler.setDefaultTopic(sinkProperties.getTopic());handler.setConverter(converter);return handler;}@Overrideprotected MessageProducer createConsumerEndpoint(ConsumerDestination destination,String group,ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {MqttSourceProperties sourceProperties = properties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sourceProperties.getCharset());converter.setPayloadAsBytes(sourceProperties.isBinary());MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(sourceProperties.getClientId(),this.mqttPahoClientFactory,sourceProperties.getTopics());adapter.setBeanFactory(this.getBeanFactory());adapter.setQos(sourceProperties.getQos());adapter.setConverter(converter);adapter.setOutputChannelName(destination.getName());return adapter;}public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {this.extendedBindingProperties = extendedBindingProperties;}@Overridepublic MqttSourceProperties getExtendedConsumerProperties(String channelName) {return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);}@Overridepublic MqttSinkProperties getExtendedProducerProperties(String channelName) {return this.extendedBindingProperties.getExtendedProducerProperties(channelName);}@Overridepublic String getDefaultsPrefix() {return this.extendedBindingProperties.getDefaultsPrefix();}@Overridepublic Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {return this.extendedBindingProperties.getExtendedPropertiesEntryClass();}}

实现一个Provider


import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;public class MqttProvisioningProvider implementsProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {@Overridepublic ProducerDestination provisionProducerDestination(String name,ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@Overridepublic ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@RequiredArgsConstructorprivate class MqttTopicDestination implements ProducerDestination , ConsumerDestination{private final String destination;@Overridepublic String getName() {return this.destination.trim();}@Overridepublic String getNameForPartition(int partition) {throw  new UnsupportedOperationException("Partitioning is not implemented for mqtt");}}
}

配置 spring.binders

mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration

配置如下:

spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin

记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了

http://www.zhongyajixie.com/news/7724.html

相关文章:

  • 制作网站的公司怎么样太原网站优化公司
  • 杭州兼职网站建设seo公司赚钱吗
  • 休闲小零食网站开发方案网页百度
  • 济南学习做网站如何创建一个app平台
  • 北京团建网站体验式营销案例
  • 做视频网站用哪个软件好郑州厉害的seo顾问
  • 淮安专业网站建设海口网站建设
  • 一家专门做母婴的网站网络服务器的作用
  • 哪个网站可以做危险化学品供求童程童美少儿编程怎样收费
  • 学校网站建设自查报告网上的推广公司
  • 浦东企业网站建设个人网站怎么做
  • 有保障的广州网站建设做网站多少钱一年
  • 大型网站怎么做优化阳山网站seo
  • wordpress 写文章空白优化搜索引擎营销
  • 网站后台选择成都计算机培训机构排名前十
  • 做电影网站还能赚钱百度关键字优化价格
  • 宁波网站公司哪家好免费引流推广方法
  • 做技术网站赚钱吗长沙快速排名优化
  • 公司转让注意事项西安seo网站排名
  • 烟台网站设计销售网站有哪些
  • 婚纱手机网站制作营销型网站建设需要多少钱
  • 成品网站怎么新建网页搜狗搜索网
  • 平邑网站建设广州seo代理
  • 上海专业网站建设网站微信广告怎么投放
  • 珠海网站建设王道下拉強竞价软件哪个好
  • 企业网站的宣传功能体现在().网站seo排名优化方法
  • 怎么做企业网站推广新闻发稿公司
  • 阳江网红打卡旅游景点百度问答seo
  • 武汉网站制作027全国疫情最新情况
  • 做网站的公司哪家好杭州网络推广