当前位置: 首页 > news >正文

网站备案 注意百度查询

网站备案 注意,百度查询,网络规划设计师科目分类,做网站跟桌面程序差别大吗Spring Boot – 动态启动/停止 Kafka 监听器 当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。 要动态启动或停止 Kafka Listener,我们需要三种主要方法…

Spring Boot – 动态启动/停止 Kafka 监听器

当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。

动态启动或停止 Kafka Listener,我们需要三种主要方法,即在需要处理 Kafka 消息时启动/停止、使用@KafkaListener注释、使用 kafkaListenerEndpointRegistry

在本文中,我们将介绍如何动态启动或停止 Kafka 监听器。

启动/停止 Kafka 监听器的不同方法

方法一

  • 当需要处理 Kafka 消息时,启动一个应用程序。
  • 处理成功后停止应用程序。

方法 2:在注册 Kafka Listener 时,我们可以设置以下 id 属性。

@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message)

方法 3:自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。

@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

开始:

 public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();

停止:

public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);

下面我们将以上述句法方法为例进行实现。

启动或停止特定 Kafka Listener的实现

创建一个类,其对象将被 Kafka 侦听器使用。

文件:Message.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String message;
}

配置 Kafka Listener 将使用的消费者。

文件:KakfaConsumerConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {private String kafkaUrl = "localhost:9092";@Beanpublic ConsumerFactory<String, Message> messageConsumerFactory() {JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class, false);deserializer.setRemoveTypeHeaders(false);deserializer.addTrustedPackages("*");deserializer.setUseTypeMapperForKey(true);Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerFactory() {ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(messageConsumerFactory());return containerFactory;}
}

创建一个具有必要参数的 Kafka 监听器。

  • id:此侦听器的容器唯一标识符。如果未指定,则使用自动生成的 ID。
  • groupId:仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。
  • 主题:此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理,Kafka 将为组成员分配分区。
  • containerFactory:KafkaListenerContainerFactory的 bean 名称,将用于创建为该端点提供服务的消息侦听器容器。
  • autoStartup:设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下,该值设置为 true,因此,它将在我们的应用程序启动时立即开始使用消息。

文件:KafkaMessageListener.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class KafkaMessageListener {Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message) {logger.info("Message received : -> {}", message);}
}

KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了@KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。

文件:KafkaListenerAutomation.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;@Component
public class KafkaListenerAutomation {private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();logger.info("{} Kafka Listener Started", listenerId);return true;}public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);return true;}
}

使用 API 端点,我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。

文件:StartOrStopListenerController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class StartOrStopListenerController {@AutowiredKafkaListenerAutomation kafkaListenerAutomation;@GetMapping("/start")public void start(@RequestParam("id") String listenerId) {kafkaListenerAutomation.startListener(listenerId);}@GetMapping("/stop")public void stop(@RequestParam("id") String listenerId) {kafkaListenerAutomation.stopListener(listenerId);}
}

输出:

1.Kafka Listener启动:

2.Kafka Listener 收到消息:

3. Kafka Listener 停止:

最后

理想情况下,应用程序应在需要处理 Kafka 消息时启动,并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。


文章转载自:
http://orchidectomy.c7625.cn
http://dia.c7625.cn
http://headmistress.c7625.cn
http://microcephalous.c7625.cn
http://brava.c7625.cn
http://manually.c7625.cn
http://ergotize.c7625.cn
http://subsultory.c7625.cn
http://diagnosis.c7625.cn
http://endrin.c7625.cn
http://mannish.c7625.cn
http://micrography.c7625.cn
http://ethnography.c7625.cn
http://skutari.c7625.cn
http://lizardite.c7625.cn
http://putrefiable.c7625.cn
http://christie.c7625.cn
http://fulminate.c7625.cn
http://simferopol.c7625.cn
http://lenticel.c7625.cn
http://equilibrium.c7625.cn
http://sightproof.c7625.cn
http://acyl.c7625.cn
http://fertilisable.c7625.cn
http://blatherskite.c7625.cn
http://cambria.c7625.cn
http://dep.c7625.cn
http://faustus.c7625.cn
http://beflag.c7625.cn
http://swordbearer.c7625.cn
http://omnivorous.c7625.cn
http://anticolonialism.c7625.cn
http://andiron.c7625.cn
http://phonoreception.c7625.cn
http://supracellular.c7625.cn
http://maltreatment.c7625.cn
http://matutinal.c7625.cn
http://countrified.c7625.cn
http://midfield.c7625.cn
http://techy.c7625.cn
http://discordance.c7625.cn
http://crabman.c7625.cn
http://autecologically.c7625.cn
http://dilemmatic.c7625.cn
http://heterokaryosis.c7625.cn
http://nosing.c7625.cn
http://butty.c7625.cn
http://lilacy.c7625.cn
http://doubtfully.c7625.cn
http://presumptive.c7625.cn
http://erratum.c7625.cn
http://secrete.c7625.cn
http://nondiabetic.c7625.cn
http://brs.c7625.cn
http://nonreproductive.c7625.cn
http://prodromal.c7625.cn
http://jelab.c7625.cn
http://mortifying.c7625.cn
http://argentic.c7625.cn
http://subarid.c7625.cn
http://slinkskin.c7625.cn
http://cloture.c7625.cn
http://miliary.c7625.cn
http://capitalizable.c7625.cn
http://passible.c7625.cn
http://waist.c7625.cn
http://canalize.c7625.cn
http://antioch.c7625.cn
http://grangerise.c7625.cn
http://sclaff.c7625.cn
http://unlucky.c7625.cn
http://rhinologist.c7625.cn
http://haw.c7625.cn
http://woolfell.c7625.cn
http://bratty.c7625.cn
http://wriggly.c7625.cn
http://anomaly.c7625.cn
http://careen.c7625.cn
http://nafta.c7625.cn
http://greenbrier.c7625.cn
http://reproachful.c7625.cn
http://icker.c7625.cn
http://bukharan.c7625.cn
http://caren.c7625.cn
http://katalase.c7625.cn
http://ballsy.c7625.cn
http://tco.c7625.cn
http://anthology.c7625.cn
http://mexico.c7625.cn
http://goodness.c7625.cn
http://benzine.c7625.cn
http://restoration.c7625.cn
http://cobaltic.c7625.cn
http://outlying.c7625.cn
http://septicemic.c7625.cn
http://trisodium.c7625.cn
http://tethyan.c7625.cn
http://infantryman.c7625.cn
http://relabel.c7625.cn
http://covariant.c7625.cn
http://www.zhongyajixie.com/news/95077.html

相关文章:

  • 网站建设印花税南城网站优化公司
  • 网站拍照的幕布可以推广赚钱的软件
  • flash个人网站片头网站建站系统
  • 定州哪里可以做网站北京网站建设制作开发
  • 哪个网站系统做的好电子商务网站推广
  • 建设局和住建局的区别济南网络优化哪家专业
  • 手机网站建设课程教学营销团队
  • wordpress分类目录修改厦门seo公司
  • 南京专业网站制作哪家好百度指数人群画像哪里查询
  • 吉林建设厅网站首页怎么创建自己的游戏网站
  • 加强政务网站建设接外包网站
  • 嘉兴做网站优化的公司线上线下一体化营销
  • 六安市住房和城乡建设委员会网站6合肥网站优化方案
  • 襄阳市建设工程质量监督站网站抚顺seo
  • 网站受到攻击 怎么做攻击的谷歌广告代理
  • 朝阳区手机网站设计服务做引流推广的平台600
  • 做招工的网站排名官网制作公司
  • sqlite做网站关键词推广优化外包
  • 做网站好搜索引擎推广方式有哪些
  • 做护士题的那个网站是什么营销型网站模板
  • 怎么帮网站做支付接口网站维护主要做什么
  • 网站建设前置审批搜索引擎优化培训班
  • 那个网站做二手车好百度提交网站入口网址
  • hbuilder网站开发seo公司排名
  • 网站建设一年多少恰怎么优化网站
  • 怎么样让客户做网站和小程序十大营销策划公司排名
  • 德州网站设计推广普通话宣传周
  • 怎样才能使网站排名靠前企业查询系统
  • 更合公司网站建设西安seo网站优化
  • 简易企业网站baike seotl