APP下载

基于websocket实现电网下的消息推送

2016-05-14冶莉娟付大伟

中国科技纵横 2016年6期

冶莉娟 付大伟

【摘 要】随着青海电网的信息化发展需要的系统增多,在消息处理中每个系统各自为政,而且消息格式不统一、不及时、甚至误发的情况,借此统一消息处理,是非常有必要的,本文介绍为解决这个问题,我们提出了通过redis实现消息存储,消息队列管理,使用websocket实现服务端与客户端的通讯。从而实现消息统一管理,统一发送渠道讨论说明。

【关键词】redis websocket 消息服务

1引言

随着移动互联网的发展,手机、平板电脑等智能移动终端成为了信息的重要载体,在移动化的趋势下,传统业务开始逐步向移动业务扩展,企业应用系统移动化的需求也随之日益增长。开始出现消息不统一,无法跨业务系统,已无法满足现有业务增长需求。

消息统一服务,为各个业务系统与移动端、web端提供统一的消息通道、统一的消息格式,加快业务系统的信息建设,剔除各业务系统繁杂不统一的消息处理,业务系统的开发人员只需要引入服务端、客户端SDK,消息接口调用就可实现消息发送与接收。

这里我们通过使用websocket提供消息通道、redis完成消息存储通过send服务器与状态服务器完成消息统一处理。

2. 关键技术

2.1 Query实现技术

方便消息数据的快速读写操作,使用redis技术 ,redis支持主从的模式、读写分离模型、数据分片模型,以及他高速操作:SET操作每秒钟 110000 次,GET操作每秒钟 81000 次。结合redis支持分布式的特性,使用多服务器更高效的为消息服务提供存储查询服务。

2.2线程池实现技术

线程池作用就是限制系统中执行线程的数量。

为实现系统运行效果的最优化,可以根据系统的环境情况,对线程数量进行自动或手动设定;少了浪费了系统资源,多了造成系统拥挤效率不高。通过线程池进行线程数量的控制,根据命令的先后依次执行。一个任务完成以后,再执行任务列表中时间最考前的任务。当任务列表中没有可执行进程时,线程池则进入等待状态。当一个新任务需要完成时,如果线程池中没有正在运行的工作线程,就可以开始运行了;否则进入等待队列。

为什么要用线程池:

(1)减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

(2)可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

比较重要的几个类:

ExecutorService 真正的线程池接口。

ScheduledExecutorService 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

ThreadPoolExecutor ExecutorService的默认实现。

ScheduledThreadPoolExecutor 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,线程池的配置很可能不是最完美的,因此在Executors类中设置了一些配方,专门由于形成经常用到的线程池。

2.2.1 newSingleThreadExecutor

形成用于单线程工作的线程池。运行过程中只有单一线程在线程池中运行,即所有的任务通过单线程的工作方式执行。一旦所执行的线程因某种原因而停止,则会有队列中时间最靠前的线程进入运行。该线程池的一切任务按照时间的先后顺序依次进行。

2.2.2 newFixedThreadPool

形成固定处理能力的线程池。每当有任务提交时,则对应形成一个线程,直到达到线程池的最大处理能力。线程池满负荷工作后则不再创建新的线程,当正在运行的线程完成或者因为其他原因结束时,则会在队列中选择最靠前的线程执行。

2.2.3 newCachedThreadPool

形成具有缓存空间的线程池。当需处理的线程较少,不能充满线程池时,则会将线程池中较长时间不运行的空闲线程收回,一旦增加任务量,线程池又会自动投入线程执行新的任务。该线程池的大小不进行设定,其大小完全根据操作系统线程的最大处理能力而定。

2.2.4 newScheduledThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。综上述技术表现这里使用newFixedThreadPool线程管理。解决资源的合理利用,同时又不影响多操作的性能。

2.3 websocket实现技术

WebSocket protocol 是HTML5一种新的协议(protocol)。它是实现了浏览器与服务器全双工通信(full-duplex)。

现很多网站为了实现即时通讯(real-time),轮询(polling)是最常用的方式。轮询是通过设定固定的时间间隔(time interval)(如每1秒),由浏览器向服务器发出传输请求,收到请求后,服务器将最新的数据信返回服务端浏览器。这种常规的传输请求模式的缺点显而易见,浏览器的请求(request)必须不停的向服务器发出,可是即使信息中的数据很小,通常请求的头部信息也会很长,从而造成带宽的浪费。

而最新的轮询技术效果是Comet – 用了AJAX。但这种技术虽然采用全双工通信,但请求(reuqest)仍旧不可避免需要发出。

在 WebSocket API,仅仅需要在浏览器和服务器进行一个握手动作,浏览器和服务器之间的传输通道就形成了。数据通讯就可以在彼此之间直接进行。通过WebSocket 协议进行实时服务的好处有两点:

(1)Header。通讯所用的头部信息是很小的-通常仅有2 Bytes。

(2)Server Push。服务器到客户端的数据可以进行主动传输,编辑本段握手协议。

在实现websocket连线过程中,需要通过浏览器发出websocket连线请求,然后服务器发出回应,这个过程通常称为“握手” (handshaking)。

(1)握手协议在后期的版本中,会标明版本编号,下面的例子属于早期的协定之一,对于新版的 chrome 和 Firefox 皆不适用。

(2)后期的版本大多属于功能上的扩充,例如使用第7版的握手协议同样也适用于第8版的握手协议。

3 整体实现(图1)

(1)客户端client调用Login中的登录接口实现登录,login服务端通过人员Map查询得到登录人员分配的send消息服务器。并返回给登录人员。

(2)登录人员得到分配的send服务器后,通过websocket技术与send服务器第一次握手,打开消息通道。

(3)业务系统服务端调用线程池中的write接口,将消息写入redis中,同时将消息调用各个send服务器的send接口将消息发送到client端。

(4)当send服务的send接口被调用时,触发send服务,将消息发送予已与send握手的客户端client。

websocket实现基础代码:

java

protected StreamInbound createWebSocketInbound(String arg0,

HttpServletRequest arg1) {

// TODO Auto-generated method stub

return new ChatWebSocket(users);

}

创建一个chatwebsocket用于处理这个请求,触发该chatwebsocket对象的onOpen事件

@Override

protected void onOpen(WsOutbound outbound) {

// this.connection=connection;

this.username = "#" + String.valueOf(USERNUMBER);

USERNUMBER++;

try {

String message = "NAME" + "\t" + this.username;

CharBuffer buffer = CharBuffer.wrap(message);

this.getWsOutbound().writeTextMessage(buffer);

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

users.add(this);

}

websocket的数据传输是frame形式传输的,比如会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处:

(1)大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。

(2)和http的chunk一样,可以边生成数据边传递消息,即提高传输效率。

传输协议:

FIN:1位,用来表明这是一个消息的最后的消息片断,当然第一个消息片断也可能成为最终的消息片断。

RSV1, RSV2, RSV3, 分别都是1位,若互相之间的自定义协议没有进行约定,则这几位的状态被赋0,否则WebSocket连接必须被切除。

Opcode操作码为4位,对有效负载数据进行定义,当收到的操作码未知时,也必须将连接关断,以下是定义的操作码:

* %x0 代表消息片断是连续的;

* %x1 代表消息片断为文本格式;

* %x2 代表消息片断为二进制格式;

* %x3-7 为非控制消息片段预留操作码,未进行定义;

* %x8 代表已经关断连接;

* %x9 代表心跳检查的ping;

* %xA 代表心跳检查的pong;

* %xB-F 为控制消息片段预留操作码,未进行定义。

Mask:1位,代表掩码是否存在于传输的数据中,若该位为1,则masking-key中必须有掩码,该位在客户端传输到服务端的消息中状态都为1。

Payload length: 发送数据的长度,其表示形式为字节:7位、7+16位、或者7+64位。若该数值在0-125的范围内,则该值代表所发送数据的长度;若该数值为126,那么其后面的两个字节代表一个16位无符号整数,代表所发送数据的长度;若该数值为127,那么它后面的8各字节为一个64位无符号整数,该数值代表所发送数据的长度。网络字节的顺序代表多字节长度的量。扩展数据和应用数据相加即为负载数据的长度,当扩展数据的长度是0时,负载数据的长度与应用数据的长度相等。

Masking-key:0或4个字节,客户端传输到服务端的数据,都是通过内嵌的一个32位值作为掩码的;掩码键只有在掩码位设置为1的时候存在。

Payload data: (x+y)位,负载数据长度等于扩展数据和应用数据长度相加的结果。

Extension data:x位,扩展数据的长度在当客户端和服务端没有特殊约定时保持为0,扩展数据的长度必须在扩展时直接或者通过计算的方式被指定,同时其握手方式也必须提前确定。若扩展数据存在,那么其必定包含在负载数据之中。

Application data:y位,自由定义的应用数据,其往往放置于扩展数据的后面,应用数据的长度为负载数据和扩展数据的长度之差。

读取数据需要按照这个格式读取,发送数据也需要按照这个格式发送返回。

通过websocket实现客户端与服务端的握手连接,当业务系统发送消息时,query服务会将服务发送到send服务器并处发send服务,将消息发送到与此send服务器握手的客户端上。

4 结语

通过Redis实现消息存储,消息队列,websocket实现客户端与服务端的通讯。从而实现客户端与业务端的通讯连接,提供消息推送,消息订阅服务。

参考文献:

[1] 罗永刚.大型网络棋牌游戏服务器端设计与实现[D].山东大学,2011年.

[2] 李蔚.脑动力游戏服务器端子系统的分析与设计[D].北京邮电大学,2011年.

[3] 杜松波.企业即时通讯系统服务器的设计与实现[D].电子科技大学,2005年.

[4] 刘晓宇.基于SIP的即时通讯系统的实现与应用[D].中国科学院研究生院(计算技术研究所),2006年.

[5] 蔡文健.航海距离系统的服务器端设计与实现[D].北京邮电大学,2010年.

[6] 孔鹏.即时通讯系统的研究与实现[D].山东大学,2006年.

[7] 陈春源.基于服务器端的HTTP信息过滤系统设计与实现[D].华南理工大学,2012年.

作者简介:冶莉娟(1982一),女,山西浑源人,本科,工程师,主要电网调度自动化工作;付大伟(1976一),男,吉林四平人,研究生,高级工程师,主要负责电力自动化技术工作。