当前位置: 首页 > news >正文

网站备案 上一级服务商名称seo资讯

网站备案 上一级服务商名称,seo资讯,squarespace wordpress,广东东莞划定多个高风险区Spring Boot 和 RocketMQ 在Spring Boot项目中实现“本地事务 消息队列事务”的方案,可以按照以下步骤实现: 先执行MySQL本地事务操作(未提交)随后发送消息到消息队列(如RocketMQ事务消息)等待消息队列确…

Spring Boot 和 RocketMQ

在Spring Boot项目中实现“本地事务 + 消息队列事务”的方案,可以按照以下步骤实现:

  1. 先执行MySQL本地事务操作(未提交)
  2. 随后发送消息到消息队列(如RocketMQ事务消息)
  3. 等待消息队列确认消息投递成功
  4. 提交MySQL事务

以下是基于Spring Boot和RocketMQ的完整代码示例:
确保pom.xml中包含RocketMQ的依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency>

业务场景:订单创建和库存更新
需求:创建订单时,使用本地事务处理订单操作,并发送事务消息给库存服务,通知更新库存。

1. 订单服务:OrderService

@Slf4j
@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderRepository orderRepository;/*** 创建订单并发送事务消息*/@Transactionalpublic void createOrderAndSendMessage(String orderId, String productId, int quantity) {// Step 1: 先执行本地事务(保存订单)log.info("开始创建订单...");Order order = new Order();order.setOrderId(orderId);order.setProductId(productId);order.setQuantity(quantity);order.setStatus("PENDING");orderRepository.save(order);// Step 2: 构造事务消息Message<OrderMessage> message = MessageBuilder.withPayload(new OrderMessage(orderId, productId, quantity)).build();// Step 3: 发送事务消息rocketMQTemplate.sendMessageInTransaction("order-topic",  // 消息主题message,null            // 额外参数);}
}

2. 事务监听器:OrderTransactionListener
事务监听器中,包含本地事务执行逻辑和事务状态回查逻辑。

@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener {@Autowiredprivate OrderRepository orderRepository;/*** 执行本地事务逻辑*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {OrderMessage orderMessage = (OrderMessage) SerializationUtils.deserialize(msg.getBody());try {log.info("执行本地事务 - 更新订单状态: {}", orderMessage.getOrderId());// 本地事务:更新订单状态为“CONFIRMED”orderRepository.updateStatus(orderMessage.getOrderId(), "CONFIRMED");return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {log.error("本地事务执行失败: {}", e.getMessage());return LocalTransactionState.ROLLBACK_MESSAGE;}}/*** 回查本地事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(Message msg) {OrderMessage orderMessage = (OrderMessage) SerializationUtils.deserialize(msg.getBody());String orderId = orderMessage.getOrderId();log.info("回查本地事务状态 - 订单ID: {}", orderId);String status = orderRepository.findStatusByOrderId(orderId);if ("CONFIRMED".equals(status)) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.ROLLBACK_MESSAGE;}}
}

3. RocketMQ配置
将事务监听器和生产者绑定。

@Configuration
public class RocketMQConfig {@Beanpublic TransactionMQProducer transactionMQProducer(OrderTransactionListener listener) {TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");producer.setNamesrvAddr("127.0.0.1:9876");producer.setTransactionListener(listener);return producer;}
}

4. 消息对象:OrderMessage
用于传递订单信息的消息对象。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage implements Serializable {private String orderId;private String productId;private int quantity;
}

5. 数据库操作:OrderRepository

@Repository
public interface OrderRepository extends JpaRepository<Order, String> {@Modifying@Transactional@Query("UPDATE Order o SET o.status = :status WHERE o.orderId = :orderId")void updateStatus(@Param("orderId") String orderId, @Param("status") String status);@Query("SELECT o.status FROM Order o WHERE o.orderId = :orderId")String findStatusByOrderId(@Param("orderId") String orderId);
}

运行流程

  1. 客户端调用createOrderAndSendMessage方法:
    • 先在MySQL数据库中插入订单数据。
    • 发送“半消息”到RocketMQ。
  2. RocketMQ事务监听器executeLocalTransaction执行:
    • 更新订单状态,表示本地事务已完成。
  3. RocketMQ提交或回滚事务消息:
    • 若本地事务成功,则消息被消费者消费。
    • 若本地事务失败,消息被回滚,不可消费。
  4. RocketMQ自动触发回查逻辑(若消息超时未确认):
    • 查询订单状态,判断事务状态。

优点

  • 保证强一致性:通过事务消息,确保MySQL和消息队列状态一致。
  • 容灾能力:通过回查机制避免网络异常或服务故障导致消息丢失。
  • 解耦性:消息队列将订单服务和库存服务解耦。

注意事项

  • 幂等性处理:消费者侧必须支持幂等逻辑,避免重复消费。
  • 回查性能优化:本地事务状态应快速可查,如可使用缓存或事务日志表。
  • 事务超时:根据业务需求设置合理的事务超时参数,避免长时间占用资源。

Spring Boot 和 RabbitMQ

使用 RabbitMQ 也可以实现“本地事务 + 消息队列事务”的一致性方案,但 RabbitMQ 本身不支持事务消息(不像 RocketMQ)。因此,可以通过以下方式实现类似的机制:

核心思路

  1. 先执行 MySQL 的本地事务(未提交)。
  2. 发送消息到 RabbitMQ,但消息暂存(不被消费者消费)。
  3. 本地事务提交后,确认 RabbitMQ 消息投递(通过 RabbitMQ 的 ConfirmCallback 和手动 ACK)。
  4. 如果 MySQL 事务失败,则丢弃消息或不确认消息投递。

依赖配置

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

使用 RabbitMQ 的 Confirm 模式,确保消息投递到交换机和队列的可靠性

@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 开启消息投递到交换机的确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功到达交换机log.info("消息成功投递到交换机: {}", correlationData);} else {// 消息投递到交换机失败log.error("消息投递到交换机失败: {}, 原因: {}", correlationData, cause);}});// 开启消息投递到队列的回退回调rabbitTemplate.setReturnsCallback(returned -> {log.error("消息未成功投递到队列: {}", returned.getMessage());});return rabbitTemplate;}@Beanpublic Queue orderQueue() {return new Queue("order-queue", true);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order-exchange", true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.routing.key");}
}

1. 订单服务:OrderService

@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate OrderRepository orderRepository;@Transactionalpublic void createOrderAndSendMessage(String orderId, String productId, int quantity) {// Step 1: 本地事务 - 保存订单到数据库log.info("开始创建订单...");Order order = new Order();order.setOrderId(orderId);order.setProductId(productId);order.setQuantity(quantity);order.setStatus("PENDING");orderRepository.save(order);// Step 2: 发送 RabbitMQ 消息String messageContent = String.format("订单ID: %s, 产品ID: %s, 数量: %d", orderId, productId, quantity);Message message = MessageBuilder.withBody(messageContent.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(orderId).build();try {rabbitTemplate.convertAndSend("order-exchange", "order.routing.key", message);log.info("事务消息发送成功: {}", messageContent);} catch (Exception e) {log.error("消息发送失败: {}", e.getMessage());throw new RuntimeException("消息发送失败,事务回滚");}}
}

消费者:OrderConsumer

@Slf4j
@Component
public class OrderConsumer {@Autowiredprivate StockService stockService;@RabbitListener(queues = "order-queue")public void handleMessage(String message) {log.info("收到订单消息: {}", message);// Step 1: 解析消息内容String[] parts = message.split(",");String orderId = parts[0].split(":")[1].trim();String productId = parts[1].split(":")[1].trim();int quantity = Integer.parseInt(parts[2].split(":")[1].trim());// Step 2: 执行库存更新逻辑stockService.updateStock(productId, quantity);log.info("库存更新成功,订单ID: {}", orderId);}
}

库存服务:StockService

@Service
public class StockService {@Autowiredprivate RedisTemplate<String, Integer> redisTemplate;public void updateStock(String productId, int quantity) {String redisStockKey = "product_stock_" + productId;Integer stock = redisTemplate.opsForValue().get(redisStockKey);if (stock == null || stock < quantity) {throw new RuntimeException("库存不足");}redisTemplate.opsForValue().set(redisStockKey, stock - quantity);log.info("库存更新成功,产品ID: {}, 剩余库存: {}", productId, stock - quantity);}
}

数据库模型

@Entity
@Data
public class Order {@Idprivate String orderId;private String productId;private int quantity;private String status; // PENDING, CONFIRMED
}

关键点解析

  1. 事务控制
    • Spring 的 @Transactional 确保本地数据库操作是事务性的。
    • 如果 RabbitMQ 消息发送失败,直接抛出异常回滚数据库事务。
  2. 消息可靠性
    • 开启 RabbitMQ 的 Confirm 模式,确保消息成功到达交换机和队列。
    • 消息发送失败时,本地事务回滚,确保 MySQL 和 RabbitMQ 的数据一致性。
  3. 消费者幂等性
    • 需要确保消息消费的幂等性(如使用 Redis 或数据库记录已消费消息的ID)。

虽然 RabbitMQ 不原生支持事务消息,但通过这种“本地事务 + 消息确认机制”的组合,仍可以保证 MySQL 和 RabbitMQ 的一致性。相比 RocketMQ 的事务消息,RabbitMQ 的实现稍复杂,但性能更高,适合对消息投递延迟要求较高的场景。

Spring Boot 和 Kafka

使用 Kafka 同样可以实现“本地事务 + 消息队列事务”的一致性方案,得益于 Kafka 的 事务功能(Kafka Transactions)。Kafka 的事务支持允许将生产消息和消费消息的处理绑定在一个事务中,这样可以保证消息的原子性和一致性。

核心思路

  1. 使用 Kafka 的事务性生产者(TransactionalProducer),确保消息生产的事务性。
  2. 在本地事务中完成 MySQL 数据库操作,同时向 Kafka 提交消息。
  3. 如果事务失败,回滚本地事务,同时 Kafka 消息不会被提交。

Maven 依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

Kafka 配置

@Configuration
@EnableTransactionManagement
public class KafkaConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transactional-id"); // 配置事务IDreturn new DefaultKafkaProducerFactory<>(props);}@Beanpublic KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}
}

订单服务实现

@Service
public class OrderService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate OrderRepository orderRepository;@Transactionalpublic void createOrderAndSendMessage(String orderId, String productId, int quantity) {// Step 1: 本地事务 - 保存订单到数据库log.info("开始创建订单...");Order order = new Order();order.setOrderId(orderId);order.setProductId(productId);order.setQuantity(quantity);order.setStatus("PENDING");orderRepository.save(order);// Step 2: 向 Kafka 发送事务性消息try {kafkaTemplate.executeInTransaction(operations -> {String message = String.format("订单ID: %s, 产品ID: %s, 数量: %d", orderId, productId, quantity);operations.send("order-topic", orderId, message);log.info("事务消息已发送: {}", message);return true;});} catch (Exception e) {log.error("消息发送失败,事务回滚: {}", e.getMessage());throw new RuntimeException("消息发送失败,事务回滚");}}
}

消费者服务实现
消费者服务需要保证幂等性,避免重复消费消息对业务数据造成影响。以下是一个简单的消费者实现示例。

@Slf4j
@Component
public class OrderConsumer {@Autowiredprivate StockService stockService;@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void handleOrderMessage(ConsumerRecord<String, String> record) {log.info("收到订单消息: {}", record.value());// Step 1: 解析消息内容String[] parts = record.value().split(",");String orderId = parts[0].split(":")[1].trim();String productId = parts[1].split(":")[1].trim();int quantity = Integer.parseInt(parts[2].split(":")[1].trim());// Step 2: 执行库存更新逻辑stockService.updateStock(productId, quantity);log.info("库存更新成功,订单ID: {}", orderId);}
}

库存服务实现

@Service
public class StockService {@Autowiredprivate RedisTemplate<String, Integer> redisTemplate;public void updateStock(String productId, int quantity) {String redisStockKey = "product_stock_" + productId;Integer stock = redisTemplate.opsForValue().get(redisStockKey);if (stock == null || stock < quantity) {throw new RuntimeException("库存不足");}redisTemplate.opsForValue().set(redisStockKey, stock - quantity);log.info("库存更新成功,产品ID: {}, 剩余库存: {}", productId, stock - quantity);}
}

数据库模型

@Entity
@Data
public class Order {@Idprivate String orderId;private String productId;private int quantity;private String status; // PENDING, CONFIRMED
}

关键点解析

  1. Kafka 的事务支持
    • Kafka 支持事务性生产者,kafkaTemplate.executeInTransaction 确保消息生产和本地事务绑定在一起,保证了最终一致性。
  2. 幂等性消费
    • 消费者需要设计幂等性逻辑,比如通过 Redis 或数据库记录已消费的消息 ID,避免重复消费导致库存多次扣减。
  3. 性能和可靠性
    • Kafka 的事务性能优于 RabbitMQ,但需要注意事务超时和重复消费的情况。

Kafka 的事务支持可以较为优雅地实现本地事务和消息队列事务的统一,与 RabbitMQ 相比,Kafka 的事务机制更适合处理分布式一致性问题,尤其是在高吞吐量场景中表现更加出色。

http://www.zhongyajixie.com/news/21787.html

相关文章:

  • 中企动力公司湖南seo优化价格
  • 做网站 花时间关于seo的行业岗位有哪些
  • 免费咨询大夫优化培训课程
  • 大连市城乡建设厅网站深圳seo优化公司搜索引擎优化方案
  • 南通高端网站建设开发优化的定义
  • 动漫做a视频网站有哪些百度网盘破解版
  • 嘉兴seo公司网站网站注册页面
  • 中国核工业华兴建设有限公司网站建立网站的详细步骤
  • 金融类的网站怎么做山东搜索引擎优化
  • 网站手机端跳转页面模板深圳大鹏新区葵涌街道
  • php+mysql+dreamweaver网站建设全程揭秘百度账户推广登陆
  • 四网合一网站建设海南网站制作
  • 现在网站开发哪个语言好百度网盘人工客服电话多少
  • 富阳网站建设报价小红书seo是什么
  • 单页网站制作南宁网络推广培训机构
  • 上海集团网站建设公司网站怎么优化排名的方法
  • 微信官方网站登陆苏州seo培训
  • 小电影网站怎么做的河南纯手工seo
  • 微信网站开发工具软文推广网
  • 手机网站建设平台seo基础入门教程
  • 企业网站 个人备案自助建站seo
  • 合肥网站建设平台网站seo推广优化
  • 昆山做网站公司哪家好谷歌seo运营
  • 成立学校网站建设小组小熊猫seo博客
  • 茶叶网站建设策划方案u001f今日国际新闻最新消息
  • 做宣传单用什么网站seo是什么职位缩写
  • b2b网站建设需要多少费用没广告的视频播放器app
  • 一级做爰A视频免费网站免费seo在线工具
  • 网站哪些功能是PHP做的seo网络推广外包公司
  • 网站套用重庆百度推广优化排名