当前位置: 首页 > news >正文

如何给网站添加统计代码百度下载安装

如何给网站添加统计代码,百度下载安装,微信小程序官方电话,郑州网站建设首选创新文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者 1.引入库 引入需要依赖的jar包&#xff0c;引入POM文件&#xff1a; <depend…

文章目录

  • 一、生产者
    • 1.引入库
    • 2.配置文件
    • 3.配置类
      • PublicConfig.java
      • MessageProducer.java
    • 4.业务处理类
  • 三、消费者
    • 1.引入库
    • 2.配置类
      • PublicConfig.java
      • MessageConsumer.java
    • 3.业务类

一、生产者

1.引入库

引入需要依赖的jar包,引入POM文件:

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2.配置文件

配置Kafka的相关参数(或者你项目的cacos或者yaml文件里添加)

以下是一个示例配置:application.properties

ccm.kafka.servers:192.168.1.95:9092,192.168.1.96:9092,192.168.1.97:9092
ccm.kafka.topics.xxx:xxx_content_dev

Tip:建议topic命名规则:租户简称+项目关键词+系统环境的方式,更容易区分

3.配置类

PublicConfig.java

@Data
@Configuration
@ConfigurationProperties(prefix = "ccm.kafka")
//配置信息nacos中配置
public class PublicConfig {private String servers;private String alertTopic;}

MessageProducer.java


@Slf4j
@Component
public class MessageProducer {private Producer producerKafka;@AutowiredPublicConfig publicConfig;/*** 初始化方法*/@PostConstructpublic String init() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publicConfig.getServers());props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(30 * 1000));props.put(ProducerConfig.ACKS_CONFIG, "all");producerKafka = new KafkaProducer(props);log.info("kafka message channel created successfully");return "OK";}public ResponseData send(String content, String topic) {long startTime = System.currentTimeMillis();try {String key = UUID.randomUUID().toString().replace("-", "");ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, key, content);log.info("MessageProducer send key {},message{}", key, content);Future<RecordMetadata> send = producerKafka.send(kafkaMessage);send.get();log.info("MessageProducer send cost time:{}", System.currentTimeMillis() - startTime);} catch (Exception e) {log.error("MessageProducer Failed to push message:{}", e.getMessage());return ResponseData.errorWithMsg("MessageProducer Failed to push message:" + e.getMessage());}return null;}}

4.业务处理类

示例代码的业务场景:定时生成预警消息发送给下游系统调用。

//启动类注意增加定时注解的支持
@SpringBootApplication
@MapperScan(basePackages = {"com.xx.xx.mapper","com.xx.xx.crawler.mapper"})
@EnableScheduling
public class CATApp {public static void main(String[] args) {SpringApplication.run(CATApp.class,args);}}@Service
@Slf4j
public class CrawlerService {@Scheduled(cron = "${crawler.scheduled.cron:0 */1 * * * ?}") // 每5分钟执行一次//   @Scheduled(cron = "${crawler.scheduled.cron:0 0 0/1 * * ?}") // 每小时执行一次public void crawlAndSaveAlertInfos() {log.info(">>>>>>>>>>>>> crawlAndSaveAlertInfos  ");//替换成具体的业务场景 List<AlertInfo> alertInfos = fetchAlertInfoList();if (!alertInfos.isEmpty()) {for (AlertInfo alertInfo : alertInfos) {//发送预警信息到kafka供下游调用crawlerAlertSyncService.sendCrawlerAlertMsgKafka(alertInfo);}}}
/**** 预警消息通过Kafka异步同步其他应用*/
public interface CrawlerAlertSyncService {void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) ;}@Slf4j
@Service
public class CrawlerAlertSyncServiceImpl implements CrawlerAlertSyncService {@Autowiredprivate MessageProducer messageProducer;@Resourceprivate PublicConfig publicConfig;@Overridepublic void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) {String topic = publicConfig.getAlertTopic();String servers = publicConfig.getServers();log.info("send publish msg to kafka  ,topic:{},bizId:{}", topic, alertInfo.getAlertid());log.info("send publish msg to kafka  ,servers:{}", servers);String content = JSON.toJSONString(alertInfo);log.info("send publish msg to kafka  ,content:{}", content);if (StringUtils.isNotBlank(topic)) {messageProducer.send(content, topic);}}
}

三、消费者

1.引入库

在消费者工程pom文件中配置依赖

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2.配置类

同样根据该项目情况编写配置类,示例代码中仍为读取naco配置

PublicConfig.java

@Data
@Configuration
@Slf4j
@ConfigurationProperties(prefix = "xman.kafka")
public class PublicConfig {private String servers;private Map<String,String> topics;public String getTopic(String appCode) {if(Objects.isNull(topics) || topics.isEmpty()){return null;}return topics.get(appCode);}private String alertTopic;private String group;
}

MessageConsumer.java

@Slf4j
@Component
public abstract class MessageConsumer {// 用于持续监听kafka消息的专用线程池private ExecutorService threadPool;// 用于持续消费kafka消息的专用线程池private ExecutorService consumerThreadPool;@Resourceprivate PublicConfig publicConfig;/*** 初始化方法*/@PostConstructpublic String init() {MessageConfigField messageConfig = MessageConfigField.builder().servers(publicConfig.getServers()).topic(publicConfig.getAlertTopic()).group(publicConfig.getGroup()).build();if (StringUtils.isBlank(messageConfig.getServers())) {//没有配置kafka信息return "OK";}initThreadPool();KafkaConsumer<String, String> instance = kafkaInstance(messageConfig.getServers(),messageConfig.getGroup(), messageConfig.getTopic(), messageConfig.getClientName(),messageConfig.getUsername(), messageConfig.getPassword());startListen(instance);log.info("ccm kafka消息订阅成功:clientId:" + messageConfig.getClientName());return "OK";}private void initThreadPool() {if (null == threadPool) {log.info("initThreadPool start");threadPool = Executors.newFixedThreadPool(1);log.info("initThreadPool done");}}private void startListen(KafkaConsumer<String, String> consumer) {threadPool.submit(() -> {TenantContext.setContextCode(CommonConstants.TENANT_CODE);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));if (records == null || records.isEmpty()) {continue;}for (ConsumerRecord<String, String> record : records) {Optional<String> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {String msg = kafkaMessage.get();if (StringUtils.isNotBlank(msg)) {log.info("msgJson:" + msg);consumeMsg(msg);}}}} catch (Exception e) {TimeUnit.SECONDS.sleep(1);log.error("consume error", e);}}});}public static KafkaConsumer<String, String> kafkaInstance(String servers, String group,String topic, String clientId, String username, String password) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);if (StringUtils.isNotBlank(group)) {props.put(ConsumerConfig.GROUP_ID_CONFIG, group);}props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);List<String> subscribedTopics = new ArrayList<>();subscribedTopics.add(topic);consumer.subscribe(subscribedTopics);return consumer;}/*** 核心逻辑,由子类继承实现** @param msgData msg*/public abstract void consumeMsg(String msgData) throws Exception;}

3.业务类

@Slf4j
@Service
@RefreshScope
public class CmsInfoConsumer extends MessageConsumer {@Resourceprivate InfoService infoService;@Overridepublic void consumeMsg(String msgData) throws Exception {log.info("CmsWeatherConsumer收到mq消息message:{}", msgData);CcmAlertInfoDTO alertInfoDTO = JSONObject.parseObject(msgData, CcmAlertInfoDTO.class);try {//to_do 处理消费内容infoService.saveInfoContent(alertInfoDTO);} catch (Exception e) {e.printStackTrace();log.info("同步用户消息失败:" + e);}}
}

至此,一个简单的通过kafka同步预警消息的应用就开发完了。


文章转载自:
http://suckling.c7629.cn
http://necessity.c7629.cn
http://psro.c7629.cn
http://sell.c7629.cn
http://inaudibility.c7629.cn
http://shorthead.c7629.cn
http://policier.c7629.cn
http://periclean.c7629.cn
http://subemployed.c7629.cn
http://adonai.c7629.cn
http://englishment.c7629.cn
http://cautious.c7629.cn
http://coppernob.c7629.cn
http://prolificacy.c7629.cn
http://speckled.c7629.cn
http://piperaceous.c7629.cn
http://dropwort.c7629.cn
http://municipalize.c7629.cn
http://unaccompanied.c7629.cn
http://antirachitic.c7629.cn
http://paternalism.c7629.cn
http://goyish.c7629.cn
http://integrallty.c7629.cn
http://brownie.c7629.cn
http://abdomino.c7629.cn
http://kenogenesis.c7629.cn
http://aloetic.c7629.cn
http://autochthonal.c7629.cn
http://brotherliness.c7629.cn
http://insular.c7629.cn
http://chargeable.c7629.cn
http://jiujitsu.c7629.cn
http://antimitotic.c7629.cn
http://benefic.c7629.cn
http://hypermnestra.c7629.cn
http://precipe.c7629.cn
http://emetine.c7629.cn
http://liqueur.c7629.cn
http://pyrolignic.c7629.cn
http://carbamyl.c7629.cn
http://masterful.c7629.cn
http://indissociable.c7629.cn
http://hairdo.c7629.cn
http://cursoriness.c7629.cn
http://chaparejos.c7629.cn
http://unspeakable.c7629.cn
http://electrologist.c7629.cn
http://rainworm.c7629.cn
http://stickleback.c7629.cn
http://intuitivist.c7629.cn
http://disengaged.c7629.cn
http://illegally.c7629.cn
http://aerodonetics.c7629.cn
http://usha.c7629.cn
http://badge.c7629.cn
http://ardeid.c7629.cn
http://syndicate.c7629.cn
http://licit.c7629.cn
http://tolidine.c7629.cn
http://conchology.c7629.cn
http://galbulus.c7629.cn
http://fargoing.c7629.cn
http://pectinated.c7629.cn
http://stainless.c7629.cn
http://hotbrained.c7629.cn
http://camalig.c7629.cn
http://collisional.c7629.cn
http://nonprincipled.c7629.cn
http://hoarhound.c7629.cn
http://brace.c7629.cn
http://kidlet.c7629.cn
http://nonentanglement.c7629.cn
http://rooftop.c7629.cn
http://arsenious.c7629.cn
http://keir.c7629.cn
http://hereinbelow.c7629.cn
http://osteosis.c7629.cn
http://manoletina.c7629.cn
http://flapdoor.c7629.cn
http://roadbook.c7629.cn
http://luminescent.c7629.cn
http://jaguarundi.c7629.cn
http://wage.c7629.cn
http://exanthema.c7629.cn
http://zugzwang.c7629.cn
http://tiltmeter.c7629.cn
http://diastem.c7629.cn
http://caballine.c7629.cn
http://situla.c7629.cn
http://subgovernment.c7629.cn
http://puppetoon.c7629.cn
http://sideward.c7629.cn
http://sioux.c7629.cn
http://emissary.c7629.cn
http://psn.c7629.cn
http://millimicron.c7629.cn
http://elope.c7629.cn
http://languor.c7629.cn
http://subtilty.c7629.cn
http://baseness.c7629.cn
http://www.zhongyajixie.com/news/67169.html

相关文章:

  • 狠狠做新网站网站制作免费
  • 美食网站设计的基本思路网络推广平台几大类
  • 查询网站whois品牌宣传策划方案
  • 俄罗斯网站建设公司汕头seo网站建设
  • 中地海外路桥建设有限公司网站百度代运营推广
  • 如何看网站做没做推广别人恶意点击我们竞价网站
  • 邯郸网站建设邯郸网站制作网站推广的平台
  • 网上接单网站公司网站制作需要多少钱
  • dreamweaver 网站模板竞价推广账户竞价托管费用
  • 网站建设需要的条件百度电脑版网页版
  • 代码做网站常用单词线上营销推广的公司
  • 网站建设的几点体会深圳将进一步优化防控措施
  • 网站建设公司 南京外贸平台推广
  • 厦门做网站培训百度注册
  • 威海建设委员会网站域名服务器ip地址查询
  • 做网站后台教程视频百度怎么优化排名
  • 网站栏目类别是什么意思广州外贸推广
  • 手机端网站建设广告词百度推广售后客服电话
  • 网站页面相似度检测网站权重什么意思
  • wordpress个人博客模版青岛seo关键词优化排名
  • 大型行业网站网站关键词推广优化
  • 广告去哪个网站做电子商务网店运营推广
  • 广东如何做网站设计厦门人才网唯一官网招聘
  • 汽车装饰网站源码搜索引擎营销分析
  • 网络建站东北苏州网站制作公司
  • 网站建设找酷风佛山网页搜索排名提升
  • app网站开发方案seo新站如何快速排名
  • 网站是做o2o还是b2c好google下载安卓版
  • dedecms做中英文网站十大it教育培训机构排名
  • 淘宝接网站开发的活秘密入口3秒自动进入