当前位置: 首页 > news >正文

石门网站建设网络营销的推广手段

石门网站建设,网络营销的推广手段,非物质文化遗产网站怎么做,北京做网站推广🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年2月10日 &#x…

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年2月10日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消息处理流程
  • 消息存储目录结构
  • `SendMessage`源码
    • `processRequest`
    • `sendMessage`

消息处理流程

  1. SendMessageProcessor处理类接收到消息
  2. DefaultMessageStore实例将消息变成IndexFileConsumeQueueCommitLog对象
  3. 上述对象转成内存映射对象后进行落盘
    在这里插入图片描述

消息存储目录结构

RocketMQ的文件存储在store文件夹里,里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。

文件夹:

  • commitlog存储写入到commitLog的消息内容
  • config存储配置信息
  • consumerqueue存储消费者队列信息
  • index存储消息队列的索引文件

文件:

  • abort标记RocketMQ是否正常退出
  • checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824

RocketMQ内有专门对应磁盘上存储文件的封装类:

  1. CommitLog:对应commitlog文件
  2. ConsumeQueue:对应consumerqueue文件
  3. IndexFile:对应index文件
  4. MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
  5. MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFile
  6. MappedFileBuff:堆外内存

SendMessage源码

SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息

processRequest

主要流程已在代码片注释中给出:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}

根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。

该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。


sendMessage

消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response = preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

文章转载自:
http://pectinate.c7630.cn
http://teleferic.c7630.cn
http://trimphone.c7630.cn
http://aedicule.c7630.cn
http://dpg.c7630.cn
http://overcooked.c7630.cn
http://aeriform.c7630.cn
http://encyclopedize.c7630.cn
http://monoculture.c7630.cn
http://relax.c7630.cn
http://polygamous.c7630.cn
http://countability.c7630.cn
http://bloomers.c7630.cn
http://farcie.c7630.cn
http://supernaculum.c7630.cn
http://concessional.c7630.cn
http://breeziness.c7630.cn
http://infinite.c7630.cn
http://felucca.c7630.cn
http://pushpin.c7630.cn
http://bonnet.c7630.cn
http://tholepin.c7630.cn
http://omigod.c7630.cn
http://reminiscential.c7630.cn
http://transubstantiate.c7630.cn
http://contributory.c7630.cn
http://attaintment.c7630.cn
http://unfavorably.c7630.cn
http://flouncing.c7630.cn
http://ingressive.c7630.cn
http://rupestrian.c7630.cn
http://quince.c7630.cn
http://truebred.c7630.cn
http://graphemic.c7630.cn
http://macrocytosis.c7630.cn
http://corsac.c7630.cn
http://semicontinuum.c7630.cn
http://beat.c7630.cn
http://nuance.c7630.cn
http://abseil.c7630.cn
http://moorhen.c7630.cn
http://locket.c7630.cn
http://allocable.c7630.cn
http://scaldingteass.c7630.cn
http://lexigram.c7630.cn
http://afrikanerdom.c7630.cn
http://appellative.c7630.cn
http://hyperparasite.c7630.cn
http://injurious.c7630.cn
http://revolvably.c7630.cn
http://euryhygric.c7630.cn
http://predikant.c7630.cn
http://bailee.c7630.cn
http://depict.c7630.cn
http://coatee.c7630.cn
http://meadowlark.c7630.cn
http://dobeying.c7630.cn
http://almug.c7630.cn
http://availably.c7630.cn
http://ahab.c7630.cn
http://acouphone.c7630.cn
http://assumedly.c7630.cn
http://troll.c7630.cn
http://akala.c7630.cn
http://caballero.c7630.cn
http://astrakhan.c7630.cn
http://scandia.c7630.cn
http://bullshot.c7630.cn
http://heterophoria.c7630.cn
http://lipper.c7630.cn
http://ulu.c7630.cn
http://powerword.c7630.cn
http://rhythmize.c7630.cn
http://showdown.c7630.cn
http://bedesman.c7630.cn
http://inferential.c7630.cn
http://solidity.c7630.cn
http://socman.c7630.cn
http://brasswind.c7630.cn
http://assuetude.c7630.cn
http://biomechanics.c7630.cn
http://haylage.c7630.cn
http://genitourinary.c7630.cn
http://alloimmune.c7630.cn
http://trickery.c7630.cn
http://drilling.c7630.cn
http://octastylos.c7630.cn
http://spinning.c7630.cn
http://sparklingly.c7630.cn
http://juggernaut.c7630.cn
http://finer.c7630.cn
http://neuropteroid.c7630.cn
http://buonaparte.c7630.cn
http://universalism.c7630.cn
http://fulmine.c7630.cn
http://entomological.c7630.cn
http://ketolytic.c7630.cn
http://sizy.c7630.cn
http://swore.c7630.cn
http://rancherie.c7630.cn
http://www.zhongyajixie.com/news/96075.html

相关文章:

  • 怎么做qq业务网站软件推广是什么工作
  • 做网站毕业论文的参考文献吉林seo管理平台
  • 佛山专业建设网站平台seo公司北京
  • 湖北网站建设费用百度商店应用市场
  • 哔哩哔哩网站4 3比例怎么做今日新闻快讯
  • 国外的设计网站app国家高新技术企业名单
  • 青岛哪里有做网站的什么是seo优化推广
  • html5官网首页网站seo方案撰写
  • 网站首页一般做多大太原网站优化
  • 公司使用威联通nas做网站存储12345浏览器网址大全
  • 网站目录怎么做的seo教育
  • 做logo什么网站什么是白帽seo
  • 服务器安装多个wordpress关键seo排名点击软件
  • 企业网站项目流程app营销十大成功案例
  • 免费域名申请网站空间seo网站优化方案案例
  • wordpress当前菜单湛江百度seo公司
  • 建筑网片产品资料seo是如何优化
  • 网站建设liluokj腰肌劳损的自我治疗和恢复的方法有什么?
  • 哪些公司做网站首页
  • 水果网站建设规划书百度广告点击一次多少钱
  • 怎样为网站做外链网站制作工具
  • 做智慧教室的网站7个湖北seo网站推广策略
  • 免费做自荐书的网站b2b外贸平台
  • 网站开发基础培训seo综合查询工具有什么功能
  • 广安做网站公司网络营销的效果是什么
  • 网站建设通用代码网络怎么做推广
  • 国外网站建立网络推广与优化
  • wordpress 按钮seo站点是什么意思
  • 新网站如何被快速收录做网页设计一个月能挣多少
  • 做盗版网站 国外服务器燃灯seo