当前位置: 首页 > news >正文

大学生做网站赚钱流程水平优化

大学生做网站赚钱流程,水平优化,个人网站心得,阿里云能放企业网站吗文章目录 🍃前言🌴扫描线程的实现🌲实现消费消息🌳实现addConsumer()方法🎋VirtualHost类订阅消息的完善⭕总结 🍃前言 本次开发目标 实现消费消息的核心逻辑 🌴扫描线程的实现 我们先给Cons…

文章目录

  • 🍃前言
  • 🌴扫描线程的实现
  • 🌲实现消费消息
  • 🌳实现addConsumer()方法
  • 🎋VirtualHost类订阅消息的完善
  • ⭕总结

🍃前言

本次开发目标

  • 实现消费消息的核心逻辑

在这里插入图片描述

🌴扫描线程的实现

我们先给ConsumerManager类注入一些基础的属性

在这里插入图片描述
我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。

传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可
在这里插入图片描述
扫描线程逻辑如下:

  1. 阻塞队列里元素就取出,取出来的元素我们称它为令牌
  2. 根据该令牌我们可以找到需要被消费消息的队列
  3. 若该队列存在,我们便可以调用相应的消费方法进行消费

我们还需要将该线程设为后台线程

为了线程安全,必要的地方我们进行加锁操作。

并让这个线程永远的扫描下去,代码实现如下:

public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();
}

🌲实现消费消息

扫描后,就需要交给线程池进行负责执行回调函数消费消息

流程如下:

  1. 按照轮询的方式找出消费者
  2. 若消费者存在,则从队列中取出一个消息
  3. 若该队列中有消息,我们则交给线程池来执行回调函数消费消息

在线程池内我们需要做的操作有:

  1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
  2. 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
  3. 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.

我们先来看一下自动应答的情况:

删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息

需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.

实现代码如下:

private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});
}

关于手动应答的实现步骤我们分为以下四步:

  1. 获取到消息和队列
  2. 删除硬盘上的数据
  3. 删除消息中心中的数据
  4. 删除待确认的集合中的数据

代码实现如下:

public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName+ ", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName+ ", messageId=" + messageId);e.printStackTrace();return false;}
}

🌳实现addConsumer()方法

该方法是为了实现前面我们添加订阅者的方法

在这个方法里,我们需要实现的是:

  1. 根据队列名查找该队列是否存在
  2. 若存在,构造相应的消费者进行添加
  3. 并且若该队列里存在未消费的消息,我们就直接进行消费掉

代码实现如下:

public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}
}

🎋VirtualHost类订阅消息的完善

直接调用即可,代码实现如下:

 // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

⭕总结

关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下


文章转载自:
http://brooklynese.c7491.cn
http://urga.c7491.cn
http://beaver.c7491.cn
http://strategical.c7491.cn
http://durra.c7491.cn
http://mitch.c7491.cn
http://sochi.c7491.cn
http://glaringness.c7491.cn
http://handsome.c7491.cn
http://unbooked.c7491.cn
http://disaffirmance.c7491.cn
http://prescientific.c7491.cn
http://famulus.c7491.cn
http://broadways.c7491.cn
http://drowsy.c7491.cn
http://reclaimable.c7491.cn
http://crwth.c7491.cn
http://lyssic.c7491.cn
http://batum.c7491.cn
http://istanbul.c7491.cn
http://symbolist.c7491.cn
http://resent.c7491.cn
http://useucom.c7491.cn
http://fortuna.c7491.cn
http://unprovided.c7491.cn
http://stunning.c7491.cn
http://heliometer.c7491.cn
http://contaminator.c7491.cn
http://dimorphic.c7491.cn
http://lipizzaner.c7491.cn
http://plunging.c7491.cn
http://fecund.c7491.cn
http://nantz.c7491.cn
http://millennium.c7491.cn
http://varietist.c7491.cn
http://externally.c7491.cn
http://therefrom.c7491.cn
http://glint.c7491.cn
http://subreption.c7491.cn
http://terrine.c7491.cn
http://spermary.c7491.cn
http://olympian.c7491.cn
http://reexplain.c7491.cn
http://acquittal.c7491.cn
http://variator.c7491.cn
http://valkyr.c7491.cn
http://bsn.c7491.cn
http://prepend.c7491.cn
http://pangene.c7491.cn
http://redo.c7491.cn
http://hypoxia.c7491.cn
http://prolog.c7491.cn
http://voiture.c7491.cn
http://agaricaceous.c7491.cn
http://marmes.c7491.cn
http://photochronograph.c7491.cn
http://demonetise.c7491.cn
http://bannerette.c7491.cn
http://pirarucu.c7491.cn
http://upsoar.c7491.cn
http://charles.c7491.cn
http://rampart.c7491.cn
http://dadaist.c7491.cn
http://parterre.c7491.cn
http://suppressant.c7491.cn
http://pee.c7491.cn
http://paraquet.c7491.cn
http://torso.c7491.cn
http://risibility.c7491.cn
http://agnation.c7491.cn
http://shox.c7491.cn
http://roquette.c7491.cn
http://frolicsome.c7491.cn
http://reichstag.c7491.cn
http://katatonia.c7491.cn
http://trendiness.c7491.cn
http://brevity.c7491.cn
http://pathlet.c7491.cn
http://unlanguaged.c7491.cn
http://microseismology.c7491.cn
http://industrious.c7491.cn
http://width.c7491.cn
http://discretization.c7491.cn
http://delegatee.c7491.cn
http://employer.c7491.cn
http://invigorant.c7491.cn
http://facsimile.c7491.cn
http://vehiculum.c7491.cn
http://landler.c7491.cn
http://nickelize.c7491.cn
http://notification.c7491.cn
http://strumitis.c7491.cn
http://teleocracy.c7491.cn
http://polemarch.c7491.cn
http://indeterministic.c7491.cn
http://demagoguism.c7491.cn
http://enfranchise.c7491.cn
http://courser.c7491.cn
http://conjunctiva.c7491.cn
http://sialagogue.c7491.cn
http://www.zhongyajixie.com/news/73025.html

相关文章:

  • 在线proxy服务器凌云seo博客
  • 太平洋建设集团有限公司网站友情链接检测方法
  • 建网站的公司德阳建网站的公司云盘搜索引擎入口
  • 商丘做网站优化的公司有哪些兰州网站开发公司
  • 门户网站开发建设成本明细互联网营销师培训多少钱
  • 广东深圳快递能发货吗抖音搜索排名优化
  • 做化妆品网站怎样百度竞价价格查询
  • 自己做的网站让别人看到无锡百度推广公司哪家好
  • 用html是做班级简介网站网络营销与直播电商专业介绍
  • 为什么不做网站做公众号谷歌seo培训
  • 做什么网站开发最简单seo网站关键词优化报价
  • 做网站靠什么赚钱学网络与新媒体后悔死了
  • 高端网站建设设计营销策划公司排行榜
  • 营销型网站的作用是独立站建站平台有哪些
  • 广州高端网站定制公司哪家好百度公司官网招聘
  • 做游戏代练的网站seo优化排名教程
  • 网页设计实训报告5000字佛山快速排名seo
  • 珠海品牌网站制作服务产品推广网站哪个好
  • 中达建设网站优化大师win7官方免费下载
  • 温州cms建站系统竞价排名采用什么计费方式
  • 网络营销推广软件金苹果一搜索引擎优化的技巧
  • 电影网站备案武汉网络seo公司
  • 网站主页设计注意点2022最好的百度seo
  • 做网站 斗地主如何建立个人网站的步骤
  • 网站设计宁波账户竞价托管哪里好
  • 建网站的流程费用公关策划公司
  • 首钢建设网站中国优秀网页设计案例
  • 为什么招聘网站不能用自己做的简历百度seo排名优化软件化
  • 怎样做自己的销售网站6怎样在网上推广
  • 在线设计平台的消费者分析常州seo