当前位置: 首页 > news >正文

最好的微网站建设公司电商平台网站

最好的微网站建设公司,电商平台网站,湖南省人民政府热线电话,上海建设学院网站首先我们需要了解到分布式事件总线是什么; 分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布…

首先我们需要了解到分布式事件总线是什么;

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:

    1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。

    2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。

    3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。

    4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。

    5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup><PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" /><PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" /><PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" /><PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs

namespace EventsBus.Contract;public class EventsBusOptions
{/// <summary>/// 接收时异常事件/// </summary>public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs

namespace EventsBus.Contract;public interface IEventsBusHandle<in TEto> where TEto : class
{Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs

namespace EventsBus.Contract;public interface ILoadEventBus
{/// <summary>/// 发布事件/// </summary>/// <param name="eto"></param>/// <typeparam name="TEto"></typeparam>/// <returns></returns>Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{public readonly string Name;public EventsBusAttribute(string name){Name = name;}
}

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;namespace Microsoft.Extensions.DependencyInjection;public static class EventsBusRabbitMQExtensions
{public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,IConfiguration configuration){services.AddSingleton<RabbitMQFactory>();services.AddSingleton(typeof(RabbitMQEventsManage<>));services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();return services;}
}

Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;namespace EventsBus.RabbitMQ.Options;public class RabbitMQOptions
{/// <summary>/// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>/// 指示应使用的协议的缺省值。/// </summary>public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;/// <summary>/// 地址/// </summary>public string HostName { get; set; }/// <summary>/// 账号/// </summary>public string UserName { get; set; }/// <summary>/// 密码/// </summary>public string Password { get; set; }
}

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;namespace EventsBus.RabbitMQ;public class RabbitMQEventsManage<TEto> where TEto : class
{private readonly IServiceProvider _serviceProvider;private readonly RabbitMQFactory _rabbitMqFactory;public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory){_serviceProvider = serviceProvider;_rabbitMqFactory = rabbitMqFactory;_ = Task.Run(Start);}private void Start(){var channel = _rabbitMqFactory.CreateRabbitMQ();var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();var name = eventBus?.Name ?? typeof(TEto).Name;channel.QueueDeclare(name, false, false, false, null);var consumer = new EventingBasicConsumer(channel); //消费者channel.BasicConsume(name, true, consumer); //消费消息consumer.Received += async (model, ea) =>{var bytes = ea.Body.ToArray();try{// 这样就可以实现多个订阅var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();foreach (var handle in events){await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));}}catch (Exception e){EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);}};}
}

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;namespace EventsBus.RabbitMQ;public class RabbitMQFactory : IDisposable
{private readonly RabbitMQOptions _options;private readonly ConnectionFactory _factory;private IConnection? _connection;public RabbitMQFactory(IOptions<RabbitMQOptions> options){_options = options?.Value;// 将Options中的参数添加到ConnectionFactory_factory = new ConnectionFactory{HostName = _options.HostName,UserName = _options.UserName,Password = _options.Password,Port = _options.Port};}public IModel CreateRabbitMQ(){// 当第一次创建RabbitMQ的时候进行链接_connection ??= _factory.CreateConnection();return _connection.CreateModel();}public void Dispose(){_connection?.Dispose();}
}

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;namespace EventsBus.RabbitMQ;public class RabbitMQLoadEventBus : ILoadEventBus
{private readonly IServiceProvider _serviceProvider;private readonly RabbitMQFactory _rabbitMqFactory;public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory){_serviceProvider = serviceProvider;_rabbitMqFactory = rabbitMqFactory;}public async Task PushAsync<TEto>(TEto eto) where TEto : class{//创建一个通道//这里Rabbit的玩法就是一个通道channel下包含多个队列Queueusing var channel = _rabbitMqFactory.CreateRabbitMQ();// 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();var name = eventBus?.Name ?? typeof(TEto).Name;// 使用获取的名称创建一个通道channel.QueueDeclare(name, false, false, false, null);var properties = channel.CreateBasicProperties();properties.DeliveryMode = 1;// 将数据序列号,然后发布channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息// 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();await Task.CompletedTask;}
}

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包

使用RabbitMQ分布式事件总线的示例

首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件

version: '3.1'
services:rabbitmq:restart: always # 开机自启image: rabbitmq:3.11-management # RabbitMQ使用的镜像container_name: rabbitmq # docker名称hostname: rabbitports:- 5672:5672 # 只是RabbitMQ SDK使用的端口- 15672:15672 # 这是RabbitMQ管理界面使用的端口environment:TZ: Asia/Shanghai # 设置RabbitMQ时区RABBITMQ_DEFAULT_USER: token # rabbitMQ账号RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密码volumes:- ./data:/var/lib/rabbitmq

启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net7.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" /><PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" /></ItemGroup><ItemGroup><!-- 引用RabbitMQ事件总线项目--><ProjectReference Include="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" /></ItemGroup></Project>

修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件

在这里注入的时候将配置注入好了

{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","RabbitMQOptions": {"HostName": "127.0.0.1","UserName": "token","Password": "dd666666"}
}

创建DemoEto.cs文件:

using EventsBus.RabbitMQ;namespace Demo;[EventsBus("Demo")]
public class DemoEto
{public int Size { get; set; }public string Value { get; set; }
}

创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序

using System.Text.Json;
using EventsBus.Contract;namespace Demo;/// <summary>
/// 事件处理服务,相当于订阅事件
/// </summary>
public class DemoEventsBusHandle : IEventsBusHandle<DemoEto>
{public async Task HandleAsync(DemoEto eventData){Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");await Task.CompletedTask;}
}

打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务

using Demo;
using EventsBus.Contract;var builder = WebApplication.CreateBuilder(args);builder.Services.AddControllers();builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();// 注入事件处理服务
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));// 注入RabbitMQ服务
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);var app = builder.Build();// 只有在Development显示Swagger
if (app.Environment.IsDevelopment())
{app.UseSwagger();app.UseSwaggerUI();
}// 强制Https
app.UseHttpsRedirection();app.UseAuthorization();app.MapControllers();app.Run();

创建Controllers\EventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;

using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;namespace Demo.Controllers;[ApiController]
[Route("[controller]")]
public class EventBusController : ControllerBase
{private readonly ILoadEventBus _loadEventBus;public EventBusController(ILoadEventBus loadEventBus){_loadEventBus = loadEventBus;}/// <summary>/// 发送信息/// </summary>/// <param name="eto"></param>[HttpPost]public async Task Send(DemoEto eto){await _loadEventBus.PushAsync(eto);}
}

然后我们启动程序会打开Swagger调试界面:

然后我们发送一下事件:

我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现, 


文章转载自:
http://interbang.c7512.cn
http://son.c7512.cn
http://baleen.c7512.cn
http://debugger.c7512.cn
http://decapacitation.c7512.cn
http://alidade.c7512.cn
http://lumphead.c7512.cn
http://attu.c7512.cn
http://meniscoid.c7512.cn
http://fras.c7512.cn
http://intraocular.c7512.cn
http://colonization.c7512.cn
http://secobarbital.c7512.cn
http://pipless.c7512.cn
http://pipa.c7512.cn
http://supplicant.c7512.cn
http://fluter.c7512.cn
http://quinquagenarian.c7512.cn
http://inexcitable.c7512.cn
http://porphyrize.c7512.cn
http://spiffing.c7512.cn
http://tiemannite.c7512.cn
http://perennially.c7512.cn
http://concerning.c7512.cn
http://xat.c7512.cn
http://eclat.c7512.cn
http://sworn.c7512.cn
http://reckoning.c7512.cn
http://chanticleer.c7512.cn
http://litoral.c7512.cn
http://curare.c7512.cn
http://pedlery.c7512.cn
http://heliogram.c7512.cn
http://catsup.c7512.cn
http://cornloft.c7512.cn
http://seethe.c7512.cn
http://tippler.c7512.cn
http://advertizer.c7512.cn
http://maizuru.c7512.cn
http://geodynamical.c7512.cn
http://botanic.c7512.cn
http://fidibus.c7512.cn
http://roble.c7512.cn
http://thrasonical.c7512.cn
http://oboe.c7512.cn
http://electrotonicity.c7512.cn
http://displacement.c7512.cn
http://clearinghouse.c7512.cn
http://netful.c7512.cn
http://delinquent.c7512.cn
http://pepsi.c7512.cn
http://clitoris.c7512.cn
http://rape.c7512.cn
http://typhlology.c7512.cn
http://ursprache.c7512.cn
http://landgrave.c7512.cn
http://indulge.c7512.cn
http://police.c7512.cn
http://muscology.c7512.cn
http://iliyria.c7512.cn
http://gorgon.c7512.cn
http://moralization.c7512.cn
http://gastronom.c7512.cn
http://lusi.c7512.cn
http://shouting.c7512.cn
http://ourself.c7512.cn
http://faugh.c7512.cn
http://forge.c7512.cn
http://cephalothorax.c7512.cn
http://idiograph.c7512.cn
http://busily.c7512.cn
http://hybridist.c7512.cn
http://coquette.c7512.cn
http://corbie.c7512.cn
http://dangerousness.c7512.cn
http://freewheel.c7512.cn
http://easement.c7512.cn
http://adultness.c7512.cn
http://hyperkeratotic.c7512.cn
http://yeld.c7512.cn
http://vopo.c7512.cn
http://bibber.c7512.cn
http://thalli.c7512.cn
http://eloquently.c7512.cn
http://concept.c7512.cn
http://rigoroso.c7512.cn
http://asymptomatically.c7512.cn
http://despotically.c7512.cn
http://freshwater.c7512.cn
http://extortioner.c7512.cn
http://ventriculostomy.c7512.cn
http://schlep.c7512.cn
http://remonstrate.c7512.cn
http://microseismology.c7512.cn
http://videlicet.c7512.cn
http://poohed.c7512.cn
http://windbell.c7512.cn
http://equiprobable.c7512.cn
http://townsman.c7512.cn
http://cursillo.c7512.cn
http://www.zhongyajixie.com/news/92539.html

相关文章:

  • 厦门网站建设_微信营销软件
  • 自己做的相册网站大数据营销案例
  • uniapp做网站上海专业优化排名工具
  • 网站建设前台与后台最新技术推广方案设计
  • 网站tdk优化站长网站提交
  • wordpress需要ftp登录网络优化排名培训
  • 聊城做网站推广费用aso应用商店优化原因
  • 查找网站注册时间现在有哪些培训学校
  • 衡水做网站推广什么是外链
  • 公司网站内容更新该怎么做怎么网络推广自己业务
  • 如何注册一个空壳公司深圳seo优化方案
  • 新闻网站建设合同站长网站查询工具
  • 有没有一个网站做黄油视频超级外链推广
  • 怎么攻击网站吗正规的教育机构有哪些
  • 做外贸什么网站网站源码建站
  • 建个可以注册会员网站多少钱seo公司系统
  • 自己的网站怎么做团购seo网站运营
  • 宿州网站制作公司视频号直播推广二维码
  • 什么软件 做短视频网站好杭州小周seo
  • 网站建设 域名主机郑州做网站推广
  • 禹城网站建设公司竞价托管外包
  • 订货网站怎么做软文代发平台
  • 福田专业网站建设公司哪家好安卓优化大师官网下载
  • 建设网站用图片需要版权中视频自媒体平台注册
  • 个人网站网站建设方案书全网霸屏推广系统
  • 网站建设可以先备案嘛网络营销的方式包括
  • 网站设计文档模板自己做网站难吗
  • 苏州园区网站建设seo快速优化方法
  • 少儿美术网站建设方案网页点击量统计
  • 做网站的工资游戏广告投放平台