当前位置: 首页 > news >正文

公司免费网站建设wifi优化大师下载

公司免费网站建设,wifi优化大师下载,农资网络销售平台,三库一平台个人信息查询系统ConcurrentQueue开源库介绍 ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。 Conc…

ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。

ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在接收到四种不同的业务数据时,需要按数据分类写进RocketMQ。

0x02 自定义类用来存放和区分数据流

  • 设计BusinessFlowMsg
  1. 该类有定义消息的类型
  2. 该类中设计ST_BusinessSign结构体消息头,用来区分消息和获取消息体的长度
  3. BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP#include <string.h>#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04#pragma pack(push)
#pragma pack(1)// 消息头
typedef struct s_BusinessSign
{int sign; // 业务标识unsigned int length; // 消息体的长度
}ST_BusinessSign;#pragma pack(pop)class BusinessFlowMsg
{
public:BusinessFlowMsg() = default;~BusinessFlowMsg() = default;char* get_data(){return _data;}int data_size(){return businessSign.length;}char* get_body(){return _data + sizeof(ST_BusinessSign);}int body_size(){return data_size() - sizeof(ST_BusinessSign);}ST_BusinessSign* header(){return &businessSign;}bool set_data(const char* data, int length, int sign){if(length > (max_body_len + sizeof(ST_BusinessSign))){return false;}businessSign.sign = sign;businessSign.length = length;memcpy(_data + sizeof(ST_BusinessSign), data, length);return true;}private:enum{max_body_len = 512 * 1024 // 512KB};ST_BusinessSign businessSign;char _data[max_body_len];
};#endif // BUSINESSFLOWMSG_HPP

0x03 创建PngUnit类模拟接到不同的业务数据

  • PngUnit类型创建了四个线程来模拟不同的数据流。
  • PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证明确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H#include <thread>
#include <mutex>class PngUnit
{
public:PngUnit();~PngUnit() = default;void start();void sendPNG(int sign);void sendAIS(int sign);void sendRoute(int sign);void sendVoyage(int sign);private:std::thread m_th_png;std::thread m_th_ais;std::thread m_th_route;std::thread m_th_voyage;// std::mutex queue_mutex;  // 互斥锁
};#endif // PNGUNIT_H

#include "pngunit.h"
#include <unistd.h>
#include "rocketmqutils.h"
#include "BusinessFlowMsg.hpp"
#include "json11/json11.hpp"PngUnit::PngUnit()
{}void PngUnit::start()
{// m_th = std::thread([this](){//     sendPNG(100);// });if(m_th_png.joinable()){printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");return;}m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}void PngUnit::sendPNG(int sign)
{while (true){BusinessFlowMsg pngMsg;const char* pngFile = "1234567890ABCDEF";int fileLen = strlen(pngFile) + 1;pngMsg.set_data(pngFile, fileLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg)){printf("Failed to set PNG message data");}}sleep(1);}
}void PngUnit::sendAIS(int sign)
{json11::Json::object obj = {{"message", "AIS"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg aisMsg;int jsonStrLen = jsonStr.size();aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg)){printf("Failed to set AIS message data");}}sleep(2);}
}void PngUnit::sendRoute(int sign)
{json11::Json::object obj = {{"message", "Route"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg routeMsg;int jsonStrLen = jsonStr.size();routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg)){printf("Failed to set ROUTE message data");}}sleep(3);}
}void PngUnit::sendVoyage(int sign)
{json11::Json::object obj = {{"message", "Voyage"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg voyageMsg;int jsonStrLen = jsonStr.size();voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg)){printf("Failed to set VOYAGE message data");}}sleep(4);}
}

0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ

  • RocketMQUtils类是一个单例类
#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H#include <thread>
#include <concurrentqueue/moodycamel/concurrentqueue.h>
#include "BusinessFlowMsg.hpp"class RocketMQUtils
{
public:static RocketMQUtils* Instance();private:RocketMQUtils();~RocketMQUtils()=default;RocketMQUtils(const RocketMQUtils &) = delete;RocketMQUtils& operator=(const RocketMQUtils &) = delete;RocketMQUtils(RocketMQUtils &&) = delete;RocketMQUtils& operator=(RocketMQUtils &&) = delete;public:void start();void push();void poll();bool write(char *data, int len, int sign);public:moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;private:static RocketMQUtils* _instance;std::thread _pushThread;
};#endif // ROCKETMQUTILS_H

#include "rocketmqutils.h"
#include <unistd.h>RocketMQUtils * RocketMQUtils::_instance = nullptr;RocketMQUtils *RocketMQUtils::Instance()
{if(_instance == nullptr){_instance = new RocketMQUtils();}return _instance;
}RocketMQUtils::RocketMQUtils()
{}void RocketMQUtils::start()
{if(_pushThread.joinable()){return;}_pushThread = std::thread(&RocketMQUtils::push, this);
}void RocketMQUtils::push()
{while (true){BusinessFlowMsg busiMsg;if(g_businessQueue.try_dequeue(busiMsg)){write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);}else{printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");sleep(2);}}
}void RocketMQUtils::poll()
{}bool RocketMQUtils::write(char *data, int len, int sign)
{std::string msg(data, len);if (sign == MSG_ROCKETMQ_PNG){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_AIS){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_ROUTE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_VOYAGE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else{printf("[%s:%d] data sign error\n", __FILE__, __LINE__);}
}

0x05 使用演示

#include <iostream>
#include <memory>
#include "rocketmqutils.h"
#include "pngunit.h"using namespace std;int main()
{cout << "==Start==" << endl;RocketMQUtils* rocketmq = RocketMQUtils::Instance();rocketmq->start();std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();ptrPngUnit->start();getchar();cout << "==Over==" << endl;return 0;
}

在这里插入图片描述


文章转载自:
http://shack.c7498.cn
http://fascisti.c7498.cn
http://bromeliad.c7498.cn
http://varuna.c7498.cn
http://hypothyroid.c7498.cn
http://influencing.c7498.cn
http://scalable.c7498.cn
http://pix.c7498.cn
http://finegrained.c7498.cn
http://indusiate.c7498.cn
http://bellmouthed.c7498.cn
http://antibusiness.c7498.cn
http://warrison.c7498.cn
http://electrotonic.c7498.cn
http://sanctuary.c7498.cn
http://submergible.c7498.cn
http://unquestioned.c7498.cn
http://newspapering.c7498.cn
http://rococo.c7498.cn
http://ethisterone.c7498.cn
http://bleary.c7498.cn
http://lokal.c7498.cn
http://gwine.c7498.cn
http://cyanopathy.c7498.cn
http://bonze.c7498.cn
http://spongeous.c7498.cn
http://insuperable.c7498.cn
http://adjectival.c7498.cn
http://doze.c7498.cn
http://arteriogram.c7498.cn
http://cogas.c7498.cn
http://morbid.c7498.cn
http://rhapsodic.c7498.cn
http://aurorean.c7498.cn
http://coequally.c7498.cn
http://dehair.c7498.cn
http://darned.c7498.cn
http://mitral.c7498.cn
http://pukkah.c7498.cn
http://fideicommissary.c7498.cn
http://diamagnet.c7498.cn
http://coercion.c7498.cn
http://wair.c7498.cn
http://groomsman.c7498.cn
http://bequeathal.c7498.cn
http://scarehead.c7498.cn
http://tableland.c7498.cn
http://interchangeabilty.c7498.cn
http://venomousness.c7498.cn
http://judean.c7498.cn
http://dilemma.c7498.cn
http://gut.c7498.cn
http://alundum.c7498.cn
http://pseudonym.c7498.cn
http://fideism.c7498.cn
http://supermassive.c7498.cn
http://embroglio.c7498.cn
http://hyalinize.c7498.cn
http://lensoid.c7498.cn
http://penoche.c7498.cn
http://depravation.c7498.cn
http://navigator.c7498.cn
http://subdebutante.c7498.cn
http://magpie.c7498.cn
http://genially.c7498.cn
http://nomad.c7498.cn
http://fluviometer.c7498.cn
http://hp.c7498.cn
http://fora.c7498.cn
http://knapper.c7498.cn
http://deasil.c7498.cn
http://ratbite.c7498.cn
http://cleft.c7498.cn
http://synecthry.c7498.cn
http://darbies.c7498.cn
http://trochoid.c7498.cn
http://diptych.c7498.cn
http://mario.c7498.cn
http://upbeat.c7498.cn
http://osmanli.c7498.cn
http://toxoid.c7498.cn
http://eelpout.c7498.cn
http://subvariety.c7498.cn
http://breughel.c7498.cn
http://week.c7498.cn
http://bowing.c7498.cn
http://mohammedan.c7498.cn
http://nonwhite.c7498.cn
http://engaged.c7498.cn
http://widder.c7498.cn
http://crumpled.c7498.cn
http://gemmulation.c7498.cn
http://delphic.c7498.cn
http://mistime.c7498.cn
http://affirmative.c7498.cn
http://rinderpest.c7498.cn
http://bachelorette.c7498.cn
http://oxcart.c7498.cn
http://item.c7498.cn
http://schizopod.c7498.cn
http://www.zhongyajixie.com/news/55088.html

相关文章:

  • seo网站建设微上海关键词推广
  • 怎么样做一个网站海外品牌推广
  • 公司注销了网站备案的负责人google官网入口手机版
  • 开发区建设集团网站哪里有网页设计公司
  • 家里的电脑怎样做网站赚钱网络营销做得比较成功的案例
  • 万网制作网站吗深圳网站建设开发公司
  • 网站建设和网站编辑是什么工作seo页面链接优化
  • 公司营销网站建设长春网络营销公司
  • 北京网站建设怎么样百度指数的基本功能
  • 无锡做网站建设营销网站建设流程
  • 台州做网站公司网站排名提高
  • 贝壳找房 二手房seo资料
  • 小型企业网站模板下载长沙网站提升排名
  • 驾校网站模板福州整站优化
  • 天津专门做网站的公司百度搜索引擎入口登录
  • 网站建设高推广赚钱软件排行
  • 笔记本做网站百度站内搜索
  • 银川建设网站互联网营销工具有哪些
  • 如何把网站上线信息推广服务
  • 济南网站建设jnjy8网络营销中心
  • 白云网站 建设信科网络培训机构推荐
  • 无锡做网站企业营销型网站建设的价格
  • 有人做彩票网站吗seo公司费用
  • wordpress获取当前分类文章数有没有免费的seo网站
  • 3d报价网站开发天津百度快速排名优化
  • 做站用什么网站程序江西seo推广
  • 手机网站app制作seo培训赚钱
  • 网站开发策略网站自然优化
  • 黄江做网站东莞seo整站优化火速
  • 注册公司做网站站长工具星空传媒