跟着源码学im(十二):基于netty打造一款高性能的im即时通讯程序 -凯发k8网页登录

我的最新工程mobileimsdk:http://git.oschina.net/jackjiang/mobileimsdk
posts - 399, comments - 13, trackbacks - 0, articles - 0

本文由竹子爱熊猫分享,原题“(十一)netty实战篇:基于netty框架打造一款高性能的im即时通讯程序”,本文有修订和改动。

关于netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的im聊天程序。

原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的im聊天程序,既简单,又能加深对netty的理解。

技术交流:

- 移动端im开发入门文章:《》

- 开源im框架源码:()

(本文已同步发布于:)

本文配套源码的开源托管地址是:

  • 1)主地址:
  • 2)备地址:

关于 netty 是什么,这里简单介绍下:

netty 是一个 java 开源框架。netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,netty 是一个基于 nio 的客户、服务器端编程框架,使用netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

netty 相当简化和流线化了网络应用的编程开发过程,例如,tcp 和 udp 的 socket 服务开发。

有关netty的入门文章:

  • 1)
  • 2)
  • 3)

如果你连java nio都不知道,下面的文章建议优先读:

  • 1)
  • 2)
  • 3)

netty源码和api 在线查阅地址:

  • 1)
  • 2)

协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。

当自己想要打造一款im通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。

但为什么需要通信协议呢?

因为tcp/ip中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。

不知大家是否还记得之前我聊到的resp客户端协议,这是redis提供的一种客户端通信协议。如果想要操作redis,就必须遵守该协议的格式发送数据。

这个协议特别简单,如下:

  • 1)首先要求所有命令,都以*开头,后面跟着具体的子命令数量,接着用换行符分割;
  • 2)接着需要先用$符号声明每个子命令的长度,然后再用换行符分割;
  • 3)最后再拼接上具体的子命令,同样用换行符分割。

这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。

如下:

客户端命令:

    setname zhuzi

转变为resp指令:

    *3

    $3

    set

    $4

    name

    $5

    zhuzi

按照redis的规定,但凡满足resp协议的客户端,都可以直接连接并操作redis服务端,这也就意味着咱们可以直接通过netty来手写一个redis客户端。

代码如下:

// 基于netty、resp协议实现的redis客户端

publicclassredisclient {

    // 换行符的ascii码

    staticfinalbyte[] line = {13, 10};

 

    publicstaticvoidmain(string[] args) {

        eventloopgroup worker = newnioeventloopgroup();

        bootstrap client = newbootstrap();

 

        try{

            client.group(worker);

            client.channel(niosocketchannel.class);

            client.handler(newchannelinitializer() {

                @override

                protectedvoidinitchannel(socketchannel socketchannel)

                                                        throwsexception {

                    channelpipeline pipeline = socketchannel.pipeline();

 

                    pipeline.addlast(newchannelinboundhandleradapter(){

 

                        // 通道建立成功后调用:向redis发送一条set命令

                        @override

                        publicvoidchannelactive(channelhandlercontext ctx)

                                                            throwsexception {

                            string command = "set name zhuzi";

                            bytebuf buffer = respcommand(command);

                            ctx.channel().writeandflush(buffer);

                        }

 

                        // redis响应数据时触发:打印redis的响应结果

                        @override

                        publicvoidchannelread(channelhandlercontext ctx,

                                                object msg) throwsexception {

                            // 接受redis服务端执行指令后的结果

                            bytebuf buffer = (bytebuf) msg;

                            system.out.println(buffer.tostring(charsetutil.utf_8));

                        }

                    });

                }

            });

 

            // 根据ip、端口连接redis服务端

            client.connect("192.168.12.129", 6379).sync();

        } catch(exception e){

            e.printstacktrace();

        }

    }

 

    privatestaticbytebuf respcommand(string command){

        // 先对传入的命令以空格进行分割

        string[] commands = command.split(" ");

        bytebuf buffer = bytebufallocator.default.buffer();

 

        // 遵循resp协议:先写入指令的个数

        buffer.writebytes(("*" commands.length).getbytes());

        buffer.writebytes(line);

 

        // 接着分别写入每个指令的长度以及具体值

        for(string s : commands) {

            buffer.writebytes(("$" s.length()).getbytes());

            buffer.writebytes(line);

            buffer.writebytes(s.getbytes());

            buffer.writebytes(line);

        }

        // 把转换成resp格式的命令返回

        returnbuffer;

    }

}

在上述这个案例中,也仅仅只是通过respcommand()这个方法,对用户输入的指令进行了转换。同时在上面通过netty,与redis的地址、端口建立了连接。在连接建立成功后,就会向redis发送一条转换成resp指令的set命令。接着等待redis的响应结果并输出,如下:

ok

因为这是一条写指令,所以当redis收到执行完成后,最终就会返回一个ok,大家也可直接去redis中查询,也依旧能够查询到刚刚写入的name这个键值。

前面咱们自己针对于redis的resp协议,对用户指令进行了封装,然后发往redis执行。

但对于这些常用的协议,netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。

比如现在咱们可以基于netty提供的处理器,实现一个简单的http服务器。

代码如下:

// 基于netty提供的处理器实现http服务器

publicclasshttpserver {

    publicstaticvoidmain(string[] args) throwsinterruptedexception {

        eventloopgroup boss = newnioeventloopgroup();

        eventloopgroup worker = newnioeventloopgroup();

        serverbootstrap server = newserverbootstrap();

        server

            .group(boss,worker)

            .channel(nioserversocketchannel.class)

            .childhandler(newchannelinitializer() {

                @override

                protectedvoidinitchannel(niosocketchannel ch) {

                    channelpipeline pipeline = ch.pipeline();

 

                    // 添加一个netty提供的http处理器

                    pipeline.addlast(newhttpservercodec());

                    pipeline.addlast(newchannelinboundhandleradapter() {

                        @override

                        publicvoidchannelread(channelhandlercontext ctx,

                                                object msg) throwsexception {

                            // 在这里输出一下消息的类型

                            system.out.println("消息类型:" msg.getclass());

                            super.channelread(ctx, msg);

                        }

                    });

                    pipeline.addlast(newsimplechannelinboundhandler() {

                        @override

                        protectedvoidchannelread0(channelhandlercontext ctx,

                                                    httprequest msg) throwsexception {

                            system.out.println("客户端的请求路径:" msg.uri());

 

                            // 创建一个响应对象,版本号与客户端保持一致,状态码为ok/200

                            defaultfullhttpresponse response =

                                    newdefaultfullhttpresponse(

                                            msg.protocolversion(),

                                            httpresponsestatus.ok);

 

                            // 构造响应内容

                            byte[] content = "

".getbytes();

 

                            // 设置响应头:告诉客户端本次响应的数据长度

                            response.headers().setint(

                                httpheadernames.content_length,content.length);

                            // 设置响应主体

                            response.content().writebytes(content);

 

                            // 向客户端写入响应数据

                            ctx.writeandflush(response);

                        }

                    });

                }

            })

            .bind("127.0.0.1",8888)

            .sync();

    }

}

在该案例中,咱们就未曾手动对http的数据包进行拆包处理了,而是在服务端的pipeline上添加了一个处理器,这个处理器是netty官方提供的。

其类继承关系如下:

publicfinalclasshttpservercodec

    extendscombinedchannelduplexhandler

    implementssourcecodec {

    // ......

}

观察会发现,该类继承自这个组合类,它组合了编码器、解码器。

这也就意味着httpservercodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。

同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。

此时在浏览器输入,结果如下:

消息类型:classio.netty.handler.codec.http.defaulthttprequest

消息类型:classio.netty.handler.codec.http.lasthttpcontent$1

客户端的请求路径:/index.html

此时来看结果,客户端的请求会被解析成两个部分:

  • 1)第一个是请求信息;
  • 2)第二个是主体信息。

但按理来说浏览器发出的请求,属于get类型的请求,get请求是没有请求体信息的,但netty依旧会解析成两部分~,只不过get请求的第二部分是空的。

在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。

7.1概述

netty除开提供了http协议的处理器外,还提供了dns、haproxy、memcache、mqtt、protobuf、redis、sctp、rtsp.....一系列协议的实现,具体定义位于这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。

很多基于netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。

所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。

7.2自定义协议的要素

在自定义传输协议时,咱们必然需要考虑几个因素,如下:

  • 1)魔数:用来第一时间判断是否为自己需要的数据包;
  • 2)版本号:提高协议的拓展性,方便后续对协议进行升级;
  • 3)序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如json、protobuf、jdk...;
  • 4)消息类型:第一时间判断出当前消息的类型;
  • 5)消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞;
  • 6)正文长度:提供给ltc解码器使用,防止解码时出现粘包、半包的现象;
  • 7)消息正文:本次消息要传输的具体数据。

在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。

基于上述几个字段,能够在第一时间内判断出:

  • 1)消息是否可用;
  • 2)当前协议版本;
  • 3)消息的具体类型;
  • 4)消息的长度等各类信息。

从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。

7.3自定义协议实战

前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。

因此实现如下:

@channelhandler.sharable

publicclasschatmessagecodec extendsmessagetomessagecodec {

 

    // 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)

    @override

    protectedvoidencode(channelhandlercontext ctx, message msg,

                          list

jack jiang的 mail: jb2011@163.com, 联系qq: 413980957, 微信: hellojackjiang
网站地图