本文由竹子爱熊猫分享,原题“(十一)netty实战篇:基于netty框架打造一款高性能的im即时通讯程序”,本文有修订和改动。
关于netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的im聊天程序。
原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的im聊天程序,既简单,又能加深对netty的理解。
技术交流:
- 移动端im开发入门文章:《》
- 开源im框架源码:()
(本文已同步发布于:)
本文配套源码的开源托管地址是:
关于 netty 是什么,这里简单介绍下:
netty 是一个 java 开源框架。netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,netty 是一个基于 nio 的客户、服务器端编程框架,使用netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。
netty 相当简化和流线化了网络应用的编程开发过程,例如,tcp 和 udp 的 socket 服务开发。
有关netty的入门文章:
如果你连java nio都不知道,下面的文章建议优先读:
netty源码和api 在线查阅地址:
协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。
当自己想要打造一款im通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。
但为什么需要通信协议呢?
因为tcp/ip中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。
不知大家是否还记得之前我聊到的resp客户端协议,这是redis提供的一种客户端通信协议。如果想要操作redis,就必须遵守该协议的格式发送数据。
这个协议特别简单,如下:
- 1)首先要求所有命令,都以*开头,后面跟着具体的子命令数量,接着用换行符分割;
- 2)接着需要先用$符号声明每个子命令的长度,然后再用换行符分割;
- 3)最后再拼接上具体的子命令,同样用换行符分割。
这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。
如下:
客户端命令:
setname zhuzi
转变为resp指令:
*3
$3
set
$4
name
$5
zhuzi
按照redis的规定,但凡满足resp协议的客户端,都可以直接连接并操作redis服务端,这也就意味着咱们可以直接通过netty来手写一个redis客户端。
代码如下:
// 基于netty、resp协议实现的redis客户端
publicclassredisclient {
// 换行符的ascii码
staticfinalbyte[] line = {13, 10};
publicstaticvoidmain(string[] args) {
eventloopgroup worker = newnioeventloopgroup();
bootstrap client = newbootstrap();
try{
client.group(worker);
client.channel(niosocketchannel.class);
client.handler(newchannelinitializer() {
@override
protectedvoidinitchannel(socketchannel socketchannel)
throwsexception {
channelpipeline pipeline = socketchannel.pipeline();
pipeline.addlast(newchannelinboundhandleradapter(){
// 通道建立成功后调用:向redis发送一条set命令
@override
publicvoidchannelactive(channelhandlercontext ctx)
throwsexception {
string command = "set name zhuzi";
bytebuf buffer = respcommand(command);
ctx.channel().writeandflush(buffer);
}
// redis响应数据时触发:打印redis的响应结果
@override
publicvoidchannelread(channelhandlercontext ctx,
object msg) throwsexception {
// 接受redis服务端执行指令后的结果
bytebuf buffer = (bytebuf) msg;
system.out.println(buffer.tostring(charsetutil.utf_8));
}
});
}
});
// 根据ip、端口连接redis服务端
client.connect("192.168.12.129", 6379).sync();
} catch(exception e){
e.printstacktrace();
}
}
privatestaticbytebuf respcommand(string command){
// 先对传入的命令以空格进行分割
string[] commands = command.split(" ");
bytebuf buffer = bytebufallocator.default.buffer();
// 遵循resp协议:先写入指令的个数
buffer.writebytes(("*" commands.length).getbytes());
buffer.writebytes(line);
// 接着分别写入每个指令的长度以及具体值
for(string s : commands) {
buffer.writebytes(("$" s.length()).getbytes());
buffer.writebytes(line);
buffer.writebytes(s.getbytes());
buffer.writebytes(line);
}
// 把转换成resp格式的命令返回
returnbuffer;
}
}
在上述这个案例中,也仅仅只是通过respcommand()这个方法,对用户输入的指令进行了转换。同时在上面通过netty,与redis的地址、端口建立了连接。在连接建立成功后,就会向redis发送一条转换成resp指令的set命令。接着等待redis的响应结果并输出,如下:
ok
因为这是一条写指令,所以当redis收到执行完成后,最终就会返回一个ok,大家也可直接去redis中查询,也依旧能够查询到刚刚写入的name这个键值。
前面咱们自己针对于redis的resp协议,对用户指令进行了封装,然后发往redis执行。
但对于这些常用的协议,netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。
比如现在咱们可以基于netty提供的处理器,实现一个简单的http服务器。
代码如下:
// 基于netty提供的处理器实现http服务器
publicclasshttpserver {
publicstaticvoidmain(string[] args) throwsinterruptedexception {
eventloopgroup boss = newnioeventloopgroup();
eventloopgroup worker = newnioeventloopgroup();
serverbootstrap server = newserverbootstrap();
server
.group(boss,worker)
.channel(nioserversocketchannel.class)
.childhandler(newchannelinitializer() {
@override
protectedvoidinitchannel(niosocketchannel ch) {
channelpipeline pipeline = ch.pipeline();
// 添加一个netty提供的http处理器
pipeline.addlast(newhttpservercodec());
pipeline.addlast(newchannelinboundhandleradapter() {
@override
publicvoidchannelread(channelhandlercontext ctx,
object msg) throwsexception {
// 在这里输出一下消息的类型
system.out.println("消息类型:" msg.getclass());
super.channelread(ctx, msg);
}
});
pipeline.addlast(newsimplechannelinboundhandler() {
@override
protectedvoidchannelread0(channelhandlercontext ctx,
httprequest msg) throwsexception {
system.out.println("客户端的请求路径:" msg.uri());
// 创建一个响应对象,版本号与客户端保持一致,状态码为ok/200
defaultfullhttpresponse response =
newdefaultfullhttpresponse(
msg.protocolversion(),
httpresponsestatus.ok);
// 构造响应内容
byte[] content = "
".getbytes();
// 设置响应头:告诉客户端本次响应的数据长度
response.headers().setint(
httpheadernames.content_length,content.length);
// 设置响应主体
response.content().writebytes(content);
// 向客户端写入响应数据
ctx.writeandflush(response);
}
});
}
})
.bind("127.0.0.1",8888)
.sync();
}
}
在该案例中,咱们就未曾手动对http的数据包进行拆包处理了,而是在服务端的pipeline上添加了一个处理器,这个处理器是netty官方提供的。
其类继承关系如下:
publicfinalclasshttpservercodec
extendscombinedchannelduplexhandler
implementssourcecodec {
// ......
}
观察会发现,该类继承自这个组合类,它组合了编码器、解码器。
这也就意味着httpservercodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。
同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。
此时在浏览器输入,结果如下:
消息类型:classio.netty.handler.codec.http.defaulthttprequest
消息类型:classio.netty.handler.codec.http.lasthttpcontent$1
客户端的请求路径:/index.html
此时来看结果,客户端的请求会被解析成两个部分:
但按理来说浏览器发出的请求,属于get类型的请求,get请求是没有请求体信息的,但netty依旧会解析成两部分~,只不过get请求的第二部分是空的。
在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。
7.1概述
netty除开提供了http协议的处理器外,还提供了dns、haproxy、memcache、mqtt、protobuf、redis、sctp、rtsp.....一系列协议的实现,具体定义位于这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。
很多基于netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。
所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。
7.2自定义协议的要素
在自定义传输协议时,咱们必然需要考虑几个因素,如下:
- 1)魔数:用来第一时间判断是否为自己需要的数据包;
- 2)版本号:提高协议的拓展性,方便后续对协议进行升级;
- 3)序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如json、protobuf、jdk...;
- 4)消息类型:第一时间判断出当前消息的类型;
- 5)消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞;
- 6)正文长度:提供给ltc解码器使用,防止解码时出现粘包、半包的现象;
- 7)消息正文:本次消息要传输的具体数据。
在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。
基于上述几个字段,能够在第一时间内判断出:
- 1)消息是否可用;
- 2)当前协议版本;
- 3)消息的具体类型;
- 4)消息的长度等各类信息。
从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。
7.3自定义协议实战
前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。
因此实现如下:
@channelhandler.sharable
publicclasschatmessagecodec extendsmessagetomessagecodec {
// 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)
@override
protectedvoidencode(channelhandlercontext ctx, message msg,
list