网站建设所需资料及费用电商运营推广怎么做
Netty高级应用之:编解码器与群聊天室开发
文章目录
- Netty高级应用之:编解码器与群聊天室开发
- Netty编解码器
- Java的编解码
- Netty编解码器
- 概念
- 解码器(Decoder)
- 编码器(Encoder)
- 编码解码器Codec
- Netty案例-群聊天室
- 聊天室服务端编写
- 聊天室客户端编写
Netty编解码器
Java的编解码
- 编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
- 解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。
java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
Java序列化目的:1.网络传输。2.对象持久化。
Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低
Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。
Netty编解码器
概念
在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。
对于Netty而言,编解码器由两部分组成:编码器、解码器。
- 解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
- 编码器:将消息对象转成字节或其他序列形式在网络上传输。
Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
Netty里面的编解码:
- 解码器:负责处理“入站 InboundHandler”数据。
- 编码器:负责“出站OutboundHandler” 数据。
解码器(Decoder)
解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder
抽象解码器
- ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
- ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
- MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)
核心方法
decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
代码实现
解码器:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;import java.util.List;/*** 消息解码器*/
public class MessageDecoder extends MessageToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println("正在进行消息解码....");ByteBuf byteBuf = (ByteBuf) msg;out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler}
}
通道读取方法
/*** 通道读取事件** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("客户端发送过来的消息:" + msg);}
启动类
protected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new MessageDecoder());//添加解码器ch.pipeline().addLast(new NettyServerHandler());}
编码器(Encoder)
与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。
抽象编码器
- MessageToByteEncoder: 将消息转化成字节
- MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)
核心方法
encode(ChannelHandlerContext ctx, String msg, List<Object> out)
代码实现
编码器:
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;import java.util.List;/*** 消息的编码器*/
public class MessageEncoder extends MessageToMessageEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println("消息正在进行编码....");String str = (String) msg;out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));}
}
消息发送:
/*** 通道就绪事件** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ChannelFuture future = ctx.writeAndFlush("你好呀.我是Netty客户端");future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("数据发送成功!");} else {System.out.println("数据发送失败!");}}});}
启动类:
@Override
protected void initChannel(SocketChannel ch) throws Exception {//6. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new MessageDecoder());//添加解码器ch.pipeline().addLast(new MessageEncoder());//添加编码器ch.pipeline().addLast(new NettyClientHandler());
}
编码解码器Codec
编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。
Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec ,MessageToMessageCodec都继承与此类.
代码实现
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;import java.util.List;/*** 消息编解码器*/
public class MessageCodec extends MessageToMessageCodec {/*** 编码** @param ctx* @param msg* @param out* @throws Exception*/@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println("消息正在进行编码....");String str = (String) msg;out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));}/*** 解码** @param ctx* @param msg* @param out* @throws Exception*/@Overrideprotected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println("正在进行消息解码....");ByteBuf byteBuf = (ByteBuf) msg;out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler}
}
启动类
protected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new MessageCoder());//添加编解码器ch.pipeline().addLast(new NettyServerHandler());
}
Netty案例-群聊天室
案例要求:
- 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
- 实现多人群聊
- 服务器端:可以监测用户上线,离线,并实现消息转发功能
- 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息
聊天室服务端编写
NettyChatServer
import com.lagou.demo.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** 聊天室服务端*/
public class NettyChatServer {//端口号private int port;public NettyChatServer(int port) {this.port = port;}public void run() throws InterruptedException {//1. 创建bossGroup线程组: 处理网络事件--连接事件EventLoopGroup bossGroup = null;//2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数EventLoopGroup workerGroup = null;try {bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();//3. 创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();//4. 设置bossGroup线程组和workerGroup线程组serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO.option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置.childHandler(new ChannelInitializer<SocketChannel>() { //7. 创建一个通道初始化对象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handler//添加编解码器ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());// todoch.pipeline().addLast(new NettyChatServerHandler());}});//9. 启动服务端并绑定端口,同时将异步改为同步ChannelFuture future = serverBootstrap.bind(port);future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("端口绑定成功!");} else {System.out.println("端口绑定失败!");}}});System.out.println("聊天室服务端启动成功.");//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new NettyChatServer(9998).run();}
}
NettyChatServerHandle
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.ArrayList;
import java.util.List;/*** 聊天室业务处理类*/
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {public static List<Channel> channelList = new ArrayList<>();/*** 通道就绪事件** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();//当有新的客户端连接的时候, 将通道放入集合channelList.add(channel);System.out.println("[Server]:" +channel.remoteAddress().toString().substring(1) + "在线.");}/*** 通道未就绪--channel下线** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();//当有客户端断开连接的时候,就移除对应的通道channelList.remove(channel);System.out.println("[Server]:" +channel.remoteAddress().toString().substring(1) + "下线.");}/*** 通道读取事件** @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {//当前发送消息的通道, 当前发送的客户端连接Channel channel = ctx.channel();for (Channel channel1 : channelList) {//排除自身通道if (channel != channel1) {channel1.writeAndFlush("[" + channel.remoteAddress().toString().substring(1)+ "]说:" + msg);}}}/*** 异常处理事件** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();Channel channel = ctx.channel();//移除集合channelList.remove(channel);System.out.println("[Server]:" +channel.remoteAddress().toString().substring(1) + "异常.");}
}
聊天室客户端编写
NettyChatClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;/*** 聊天室的客户端*/
public class NettyChatClient {private String ip;//服务端IPprivate int port;//服务端端口号public NettyChatClient(String ip, int port) {this.ip = ip;this.port = port;}public void run() throws InterruptedException {//1. 创建线程组EventLoopGroup group = null;try {group = new NioEventLoopGroup();//2. 创建客户端启动助手Bootstrap bootstrap = new Bootstrap();//3. 设置线程组bootstrap.group(group).channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO.handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//6. 向pipeline中添加自定义业务处理handler//添加编解码器ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());//添加客户端的处理类ch.pipeline().addLast(new NettyChatClientHandler());}});//7. 启动客户端,等待连接服务端,同时将异步改为同步ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();Channel channel = channelFuture.channel();System.out.println("-------" + channel.localAddress().toString().substring(1) + "--------");Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String msg = scanner.nextLine();//向服务端发送消息channel.writeAndFlush(msg);}//8. 关闭通道和关闭连接池channelFuture.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new NettyChatClient("127.0.0.1", 9998).run();}
}
NettyChatClientHandle
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** 聊天室处理类*/
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {/*** 通道读取就绪事件** @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);}
}