IM消息需要面对的另一个难题:如何保证收到的消息不乱序。下面先展开看看要解决这个难题有哪些障碍。

消息乱序的原因

时间难以保证

既然谈到“顺序”,就必然有一个衡量的标准,然而无论是使用客户端时间还是服务器时间都难以作为这个标准来衡量消息的先后顺序。

msg

如上图中,一个IM系统在多个客户端,在不同的接入网关进行接入,进而又在不同的逻辑处理服务器上进行处理,不论是客户端本身,还是服务器(网络、逻辑服务器),各自机器上的时间都不相同,因此无法以机器本地的时间来作为衡量消息顺序的标准。

网络顺序无法保证

考虑到只有一个客户端连接上一个网关的场景,即使在这样的场景中,消息的先后顺序也因为网络的因素难以得到保证。

msg-network

如上图中,网关试图向客户端依次发送消息1、2这两条消息,可能出现下面的问题:

  • 网关向客户端发送消息1,此时客户端的网络状况不好,导致该消息可能会丢失或者重传。
  • 网关没有等待消息1的发送结果,继续发送了消息2,而此时客户端的网络状况变好,这条消息比消息1更快的被客户端收到。

以上的场景,可能会有人想到一种处理模式:网关只有在客户端应答收到了消息1之后再继续发送消息2,这样就不会出现网络原因导致的消息乱序问题了。然而这样的话,消息相当于串行发送了,效率并不高。

多线程因素导致的乱序

客户端、服务器都可能存在多个发送、接收线程,这也是导致消息乱序的原因之一。

解决策略

前面分析了消息乱序的几个成因,下面就逐个分析都应该怎么解决。

消息序列号

前面提过的第一个问题:消息的时序标准问题,无法以客户端或者服务器本地的时间来作为衡量的标准,此时可以引入一个产生递增ID的组件,由这一组件来统一生成递增、不回退的消息序列号用于衡量消息的先后顺序。

然而这里还有可以细化讨论的部分:这个组件生成的ID,是否需要全局唯一?即不论单聊、群聊都需要保证生成出来的序列号唯一。

这个全局唯一性不是必要的,原因在于不同的聊天,能保证消息在自己的频道唯一、递增即可。有了这个前提,这个组件生成ID的流程大体如下:

  • 处理该聊天的逻辑服务器ID。
  • 每个聊天频道(单聊、群聊)有自己一个独立的频道ID。
  • 每个频道内部,保证能够产生一个递增、不回退的序列号。

这样,消息序列号实际上由三部分部分组成:逻辑服务器ID-频道ID-频道内的消息序列号。

群聊消息的处理

有了前面的消息序列号,已经解决了第一个问题:消息的时序标准问题。然而这样还不足够,考虑到下图中的群聊场景:

group-msg

在上图中:

  • 两个客户端依次发出消息A和消息B。
  • 在两个不同的处理群聊消息的服务器中,由于种种原因,反倒是消息B比消息A先到。

从上面可以看出,群聊消息乱序的原因在于:同一个群聊的消息,最后被分派到了两个不同的逻辑服务器上处理。

还是继续沿用上面生成消息序列号的思路:如果是同一个聊天频道的消息,就放在一起处理。因此可以变成下图中的处理方式:

group-msg-2

上图中,根据群聊消息的群ID来选择逻辑服务器,这样同一个群的消息都能落在同一个服务器中来处理了。

可以看到,这里并不需要使用一个“分布式唯一递增ID”这样的组件来产生ID,因为这里的问题简化成了:只需要该消息序列号在所在的逻辑服务器处理的聊天频道中唯一且递增就可以了。问题的重新分析和定义,让这个处理变得简单了很多。

网络乱序的处理

接着处理由于网络原因导致的乱序,TCP协议中也有类似处理网络乱序的手段,简单来说:

  • TCP协议栈中有缓冲区缓存收到的数据。
  • 发送端使用序列号ACK来确认接收端收到的数据,比如1、2、3三个序列号的数据,如果先接收到1,此时发送端会收到ACK 1的消息,但是在这之后如果消息3先于消息2被接收端收到,此时发送端仍然会ACK消息1,表示消息3这条消息是乱序的。
  • 有了缓冲区和确认序列号,就知道哪些数据可以由协议栈提供给应用层。

tcp-stack

如上图中:

  • 接收方TCP协议栈中依次存有消息1和3,而消息2还未接收到。
  • 消息1被发送方确认,此时消息1可以提供给应用层。
  • 由于消息2没有接收到,因此消息3是乱序消息,不能提供给应用层。

从中得到的启发是:收发队列是可以有发送者来掌控的,发送者知道消息的顺序,虽然不能保证消息收发的前后顺序,但是由于引入了缓冲区,只有被确认的消息才可以被消费,这样可以通过发送者的ACK确认,来保证消息的顺序消费。

以上的思路,可以沿用到网络乱序消息的处理中。

最终方案

综合以上的分析,消息乱序问题可以使用下面的方式来解决。

客户端消息缓存队列

客户端内部,维持一个缓存消息的队列,每个消息都有对应的消息序列号,收到消息之后需要与网关进行确认,以此确认这条消息是否是按序接收的消息,只有这样的消息才能提供给应用层消费。

cs-msg-queue

在上图中:

  • 消息序列号格式为:逻辑服务器ID-频道ID-频道内唯一且递增ID,因此上图中的消息,对应的逻辑服务器ID为1,频道ID为2。后面的描述为了方便仅使用频道内ID。
  • 客户端收到消息序号为101的消息,此时客户端会ACK和网关进行消息确认。由于101之前的消息都已经被确认过了,所以网关应答ACK 101,这意味着在这个序列号可以直接提供给应用层进行消息。
  • 客户端收到消息序号为103的消息,此时客户端会ACK和网关进行消息确认。由于在这个消息之前的102消息并没有得到ACK,因此网关应答ACK 101,这意味着通知客户端此时能提交给应用层消息的消息仍然还是101。

消息序列号与逻辑服务器切换

消息序列号由:逻辑服务器ID、频道ID、频道内唯一且递增ID组成。

  • 逻辑服务器ID:可以组合服务所在的hostname、进程号、监听端口号来生成逻辑服务器ID。
  • 频道ID:由于聊天频道仅仅和参与聊天者相关,因此单聊频道可以组合参与聊天的两端客户端ID来生成频道ID,而群聊ID则使用群ID生成频道ID。
  • 频道内唯一且递增ID:这个值甚至可以不用去落盘保存,只需要每次聊天逻辑服务器针对频道ID对应一个与之对应的递增ID即可。

当处理该消息的逻辑服务器发生变化时,逻辑服务器ID也随之发生变化。此时,无论是网关还是客户端,一旦发现与某个客户端建立通信维持的消息缓存队列其逻辑服务器ID发生了变化,以前缓存的消息都将被丢弃重新进行通信。

如在上面的图中,接收完毕序列号为1-2-103的消息之后,逻辑服务器宕机,此时客户端的聊天服务由另一台服务器进行处理,其逻辑服务器ID变成了2,频道ID变成3。此时网关检测到收发的消息中,逻辑服务器ID与频道ID发生了变化,因此会清空之前的消息缓存,并且通知客户端清空消息缓存,重新以这个逻辑服务器ID与频道ID的消息进行通信。这也就意味着101之后的消息需要重新进行收发确认,才能提供给客户端应用层进行消息了。

完整示例

下面以一个完整的流程作为例子结束这篇文章。

客户端收到1-2-101序列号的消息

  • 客户端向服务器应答收到1-2-101序列号的消息。
  • 网关层判断在这个序列号之前的消息已经全部传输成功了,通知逻辑服务器客户端已经接收到这条消息。
  • 逻辑服务器收到网关的消息之后,修改存储层该消息为已读消息,这样下次就不会再发这条消息给客户端了,然后向网关应答这条消息。
  • 网关应答客户端,ACK 1-2-101。
  • 客户端收到ACK 1-2-101消息之后,知道这条消息被服务器认为是按序的消息,可以提供给上层应用消费。
  • 消费完毕之后,客户端消息缓存队列就可以删除这条消息了。

客户端收到1-2-103序列号的消息

  • 客户端向服务器应答收到1-2-103序列号的消息。
  • 网关层判断这个序列号之前的1-2-102消息客户端还没有收到,因此既不会同步这条消息给逻辑服务器,也不会ACK这条消息的序列号,只会继续ACK上一条消息序列号即1-2-101。
  • 客户端收到这条消息的ACK,说明1-2-103序列号的消息既不能提供给业务层消费,也不能删除。

逻辑服务器发生切换

  • 逻辑服务器发生切换,此时逻辑服务器ID变成了2。查询向这个客户端的这个聊天频道同步的消息中,拿出未读消息中的最早的消息,构建其序列号2-2-200,发送给网关。
  • 网关收到逻辑服务器的消息,发现逻辑服务器ID已经发生了变化,遂将之前还没有删除的消息1-2-102和1-2-103从消息队列中删除,并且转发2-2-200消息给客户端。
  • 客户端收到2-2-200消息之后,同样判断逻辑服务器ID已经发生了变化,将之前缓存但是还未提供给业务层消息的消息1-2-103删除。
  • 接着客户端就是走前面的ACK流程向服务器来确认消息的顺序了。