当前位置: 首页 > news >正文

音乐网站建设目标什么是网络营销公司

音乐网站建设目标,什么是网络营销公司,网站设计客户端,武汉免费建设网站平台文章目录避免重复消费(保证消息幂等性)消息积压上线更多的消费者,进行正常消费惰性队列消息缓存延时队列RabbitMQ如何保证消息的有序性?RabbitMQ消息的可靠性、延时队列如何实现数据库与缓存数据一致?开启消费者多线程消费避免重复消费(保证消…

文章目录

  • 避免重复消费(保证消息幂等性)
  • 消息积压
    • 上线更多的消费者,进行正常消费
    • 惰性队列
    • 消息缓存
    • 延时队列
  • RabbitMQ如何保证消息的有序性?
  • RabbitMQ消息的可靠性、延时队列
  • 如何实现数据库与缓存数据一致?
  • 开启消费者多线程消费

避免重复消费(保证消息幂等性)

  • 方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)

  • 方式2: 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可

  • 方式3: rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的

在这里插入图片描述

发送消息

	@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息*/public void sendMessage() {// 创建消费对象,并指定 全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)MessageProperties messageProperties = new MessageProperties ();messageProperties.setMessageId (UUID.randomUUID ().toString ());messageProperties.setContentType ("text/plain");messageProperties.setContentEncoding ("utf-8");Message message = new Message ("hello,message idempotent!".getBytes (), messageProperties);System.out.println ("生产消息:" + message.toString ());rabbitTemplate.convertAndSend (EXCHANGE_NAME, ROUTE_KEY, message);}

消费消息

 /*** 消费消息** @param message* @param channel* @throws IOException*/@RabbitHandler//org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。//注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器@RabbitListener(queues = "message_idempotent_queue")@Transactionalpublic void handler(Message message, Channel channel) throws IOException {/*** 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。*/// 获取消息IdString messageId = message.getMessageProperties ().getMessageId ();if (StringUtils.isBlank (messageId)) {logger.info ("获取消费ID为空!");return;}MessageIdempotent messageIdempotent = null;Optional<MessageIdempotent> list = messageIdempotentRepository.findById (messageId);if (list.isPresent ()) {messageIdempotent = list.get ();}// 如果找不到,则进行消费此消息if (null == messageIdempotent) {//获取消费内容String msg = new String (message.getBody (), StandardCharsets.UTF_8);logger.info ("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg);//手动ACKchannel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);//存入到表中,标识该消息已消费MessageIdempotent idempotent = new MessageIdempotent ();idempotent.setMessageId (messageId);idempotent.setMessageContent (msg);messageIdempotentRepository.save (idempotent);} else {//如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;logger.error ("该消息已消费,无须重复消费!");}}

消息积压

在这里插入图片描述

上线更多的消费者,进行正常消费

线上突发问题,要临时扩容,增加消费端的数量

考虑到消费者的处理能力,增加配置!!!

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

simple代表简单队列模型

惰性队列

	//基于@Bean声明lazy-queue@Beanpublic Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy() //开启x-queue-mode为lazy.build();}//基于@RabbitListener声明LazyQueue@RabbitListener(queuesToDeclare = {@Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy"))})public void listenLazyQueue(String msg) {System.out.println("接收到lazy.queue的消息:【" + msg + "】");}

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out性能比较稳定

消息缓存

使用Redis的List或ZSET做接收消息缓存,写一个程序 按照消费者处理时间定时从Redis取消息发送到MQ
在这里插入图片描述

延时队列

设置消息过期时间,过期后转入死信队列,写一个程序 处理死信消息(重新如队列或者 即使处理或记录到数据库延后处理)

在这里插入图片描述

RabbitMQ如何保证消息的有序性?

RabbitMQ是队列存储天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证

因此,要保证消息的有序性,需要做的下面几点:

  • 保证消息发送的有序性
  • 保证一组有序的消息都发送到同一个队列
  • 保证一个队列只包含一个消费者
    在这里插入图片描述

这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式消费

RabbitMQ消息的可靠性、延时队列

RabbitMQ消息可靠性、延时队列

如何实现数据库与缓存数据一致?

实现方案有下面几种:

  • 本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
  • 跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以通过MQ通知服务A服务A修改Redis缓存数据
  • 通用方案:使用Canal框架,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据

开启消费者多线程消费

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SpringRabbitListener {/*** @RabbitListener:加了该注解的方法表示该方法是一个消费者 concurrency:并发数量。* 其他属性和注解想了解的话,自己按Ctrl点进去看*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Queue1"),exchange = @Exchange(value = "Exchange1"),key = "key1"),concurrency = "10")public void process1(Message message) throws Exception {System.out.println("Queue1:" + new String(message.getBody()));}}
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class RabbitmqConfig {@Bean("batchQueueRabbitListenerContainerFactory")public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory ();factory.setConnectionFactory (connectionFactory);factory.setMessageConverter (new Jackson2JsonMessageConverter ());//确认方式,manual为手动ack.factory.setAcknowledgeMode (AcknowledgeMode.MANUAL);//每次处理数据数量,提高并发量//factory.setPrefetchCount(250);//设置线程数//factory.setConcurrentConsumers(30);//最大线程数//factory.setMaxConcurrentConsumers(50);/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */factory.setConnectionFactory (connectionFactory);factory.setConcurrentConsumers (1);factory.setPrefetchCount (1);//factory.setDefaultRequeueRejected(true);//使用自定义线程池来启动消费者。factory.setTaskExecutor (taskExecutor ());return factory;}@Bean("correctTaskExecutor")@Primarypublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();// 设置核心线程数executor.setCorePoolSize (100);// 设置最大线程数executor.setMaxPoolSize (100);// 设置队列容量executor.setQueueCapacity (0);// 设置线程活跃时间(秒)executor.setKeepAliveSeconds (300);// 设置默认线程名称executor.setThreadNamePrefix ("thread-file-queue");// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown (true);return executor;}
}
http://www.zhongyajixie.com/news/20161.html

相关文章:

  • 和田哪里有做网站的地方工具刷网站排刷排名软件
  • 西安网站网络营销沧州网站推广优化
  • 做棋牌网站建设多少钱求网址
  • 奉贤做网站制作企业网站建设的步骤
  • 天站网站建设友情链接平台网站
  • 手机网站设计创意说明网络优化app哪个好
  • 天津狐臭在哪里做津门网站I百度怎么发广告
  • 网站建设的开发方式购物链接
  • 网络营销推广的问题结构优化是什么意思
  • 帮人管理网站做淘宝客西安网站快速排名提升
  • 徐州建设工程交易谷歌seo关键词排名优化
  • wordpress 关闭工具栏企业专业搜索引擎优化
  • 网站建设项目策划书格式如何进行搜索引擎优化
  • 做网站要考虑的太原百度搜索排名优化
  • 建网站做淘宝客赚钱吗个人怎么做网络推广
  • 手机网站建站工作室seosem顾问
  • 做网站赌博彩票算犯法吗农大南路网络营销推广优化
  • wordpress缩略图设置seo关键词是什么
  • 广州天美展览公司网站制作网站的公司有哪些
  • 东台专业做网站网站的网络推广
  • 怎么做二级网站如何统计网站访问量
  • 合肥论坛网站建设西安seo代运营
  • 毕业设计购物网站开发的意义裤子seo关键词
  • 企业网站建设需要提供什么材料成都排名推广
  • 渭南市建网站百度竞价推广培训
  • 网站登录按钮怎么做seo门户网站优化
  • 机械设计师网课优化服务是什么意思
  • 外贸英语网站郑州网站制作
  • 个人网站的建设四川二级站seo整站优化排名
  • 开发者应用seo资源网站 排名