您现在的位置是:首页 > 技术教程 正文

【JiChat】基于Netty打造百万级用户IM平台:探索可扩展和高性能通信的威力

admin 阅读: 2024-03-16
后台-插件-广告管理-内容页头部广告(手机)

前言

不久前,笔者学习了《System Design Interview》中关于即时通讯(IM)系统设计的内容。为了更深入地理解和应用这些概念,我决定亲手实现一个名为 ‘JiChat’ 的IM系统。该项目不仅支持多客户端登录,还实现了历史消息同步、消息顺序一致性和零消息丢失的特性。为确保系统的可扩展性,整个架构按照百万级用户流量的标准进行设计,并支持无缝的横向扩展。在这篇博客中,我将分享这个IM项目的设计和实现过程,希望能够为大家提供有价值的经验和启发。

业务流程

图片来源

消息流程

  1. 用户A向聊天服务器1发送聊天消息。
  2. 聊天服务器1从ID生成器获取消息ID。
  3. 聊天服务器1将消息发送到消息同步队列。
  4. 消息存储在键值存储中。
  5. a.如果用户 B 在线,则消息将转发到用户 B 所在的聊天服务器 2连接的。
  6. b. 如果用户 B 离线,则从推送通知 (PN) 服务器发送推送通知。
  7. 聊天服务器2将消息转发给用户B。有一个持久的TCP用户 B 和聊天服务器 2 之间的连接。

消息大略流程: 客户端A→服务端a→服务端b→客户端B

问题分析

1.如何保证消息顺序

因为网络波动和一些系统异常,用户发送的消息和用户收到消息的顺序并不一致。比如小明发给小红的三条消息是[A,B,C],而小红收到的消息是[A,C,B]。也就是说系统无法保证发送和接收的消息顺序是一致,消息从客户端发送到服务端,再经过服务端转发到客户端。

方案1: 时间戳

每条消息都带上时间戳,消息顺序就按照时间顺序倒序。这个比较简单,但是有个问题就是如何保证几百万个客户端的时间都是一致。我们知道对于服务端来说保证多个服务器时间一致,都是比较难。

方案2:消息id递增

对于每条消息,到达服务端就分配一个递增的消息id,消息的顺序根据id顺序。用递增messageId,有个好处是消息丢失了,客户端能够立马知道。比如客户端A,收到消息messageId=16,而当前会话里最后一条messageId=12,说明有前面消息还没收到。那么可以请求服务端同步消息,先不将这条消息展示。即保证了消息顺序,又保证消息不丢失。这边保证了,服务端收到消息转发给接收客户端一致性。同时为了保证客户端发给服务端消息顺序,客户端发给服务端消息需要等待服务端返回messageId,后再发送下一条消息。这个可以将消息放入队列中,保证一条一条发送。

messageId递增,因为只要是会话里消息顺序一致即可。私聊消息,那么就是两个用户id就保证一致性;群聊,每个群会话一致即可。在分布式环境中用户id使用雪花id生成,直接拼接两个long太长了,将他们转成Base62编码(形如channelKey=cOvWLMNlsma_cOkLiKpPVqd)。将他们作为Redis Key,放入分布式缓存中。

public static String getChannelKey(long userId, long userId2) { if (userId > userId2) { // 保证较小的id排前面 long temp = userId; userId = userId2; userId2 = temp; } return Base62Util.encodeBase62(userId) + "_" + Base62Util.encodeBase62(userId2); } public static String getChannelKey(long groupId) { return Base62Util.encodeBase62(groupId); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

这个方案缺点是,很多messageId都没有使用放在Redis内存中。比如我们微信有200个好友,但是实际有聊天可能也就50个不到。那么将近150个messageId放在Redis是无用。假设有100万用户,那么有将近1000万个key是没用,且永久存在Redis里。(不能设置过期,不知道哪天他们之间有消息了)。同时Redis变成单点故障了,一旦Redis出问题。那么整个系统都不能用。

以上是消息顺序的方案,大家有没更好的方案

单点故障(Single Point of Failure,SPOF)是指系统中的一个组件、节点或子系统,如果发生故障,就会导致整个系统无法正常运行。 换句话说,系统的可用性完全依赖于这个单一的组件。如果这个组件失效,整个系统可能会宕机或无法提供正常的服务.(单节点的Redis和MySQL) 笔者之前公司Redis是单节点,有一次Redis部署的服务器宕机了。结果导致生产所有的服务都不能使用,整个现网瘫痪了。一次非常严重的生产事故。所以一般生产的关键节点类似Redis,Nacos,MySql都需要部署集群。
  • 1
  • 2
  • 3

2.服务调度

客户端要和服务端建立TCP连接,因为我们的架构是100万用户。那么单节点服务端不能满足100万用户的连接,tcp不像http短连接直接使用nginx或者网关转发即可。tcp是长连接,那么就需要实现将现有的服务器,分配给客户端。

如何知道当前哪些服务可用

如果我们自己维护可用的服务列表,这边比较麻烦。我们可以通过注册中心监听指定服务,服务上线和下线那么我们都能知道。网上看到用zookeeper,这边我使用nacos。因为目前在国内nacos用的人会比较多些。

public void init() throws NacosException { Properties properties = new Properties(); properties.put("serverAddr", serverAddr); properties.put("namespace", namespace); // 创建Nacos NamingService NamingService namingService = NamingFactory.createNamingService(properties); // 订阅服务变化 namingService.subscribe(SERVICE_NAME, event -> { if (event instanceof NamingEvent) { final NamingEvent namingEvent = (NamingEvent) event; List<Instance> instances = namingEvent.getInstances(); log.info("服务个数{},服务列表:{}", instances.size(), instances); final List<String> curInstances = instances.stream().map(t -> t.getIp() + ":" + t.getPort()).collect(Collectors.toList()); serverLoadBalancer.syncServer(curInstances); } }); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

调度服务算法

(1)一致性Hash算法,一致性hash算法主要是对于数据分片。对于节点上线和下线,使用一致性Hash算法可以减少数据迁移。一致性hash算法,让每个节点均匀分布比较难。如果大家感兴趣可以自己实现一个,发现有的节点关联客户端特别多,有的节点又特别少。想要分布均匀比较难实现。

一致哈希是一种特殊的哈希技术,当调整哈希表的大小时,平均只需要 n/m 重新映射键,其中 n 是键数和 m 槽数。相比之下,在大多数传统哈希表中,数组槽数量的变化会导致几乎所有键被重新映射,因为键和槽之间的映射是由模块化操作定义的。(reSize时候) Consistent Hashing主要目标是在节点的动态增减时尽量减小数据迁移的开销。 核心原理在于通过哈希函数将数据和节点映射到一个环形空间,然后通过在环上的顺时针方向找到最近的节点,从而实现负载均衡和节点的动态增减。为了映射更加均匀节点会用多个虚拟节点,hash更加均匀分布。
  • 1
  • 2
  • 3

(2)随机算法,随机算法是取[0,n]之间随机数。n(可用服务数量)这个比较简单。随机算法,对于服务数量从来不变化没问题。但是如果服务A宕机下线了,那么他的连接数都到其他数量了。而服务A重新上线,那么总的连接客户端都会比其他服务少。一致性Hash算法,也存在这个问题。

以上两种算法,一致性Hahs适合数据分片;而随机算法适合http短链接。

(3)最少连接(Least Connections),就是统计每个服务端当前的连接客户端数量,每次客户端请求获取连接服务端ip,返回当前连接数最少。这个能保证服务分配比较均匀,但是这也有个问题就是需要额外维护服务端的连接数。
以下是将服务端连接客户端数放在redis ZSet数据结构中

public void syncServer(List<String> curServers) { log.info("当前服务列表{}", curServers); Set<String> cacheServers = new HashSet<>(Objects.requireNonNull(redisTemplate.opsForZSet().range(CacheConstant.CHAT_SERVER_CLIENT_COUNT, 0, -1))); log.info("缓存服务列表{}", cacheServers); // 删除在缓存中但不在当前服务器中的元素 for (String cs : cacheServers) { if (!curServers.contains(cs)) { redisTemplate.opsForZSet().remove(CacheConstant.CHAT_SERVER_CLIENT_COUNT, cs); } } // 添加在当前服务器中但不在缓存中的元素 for (String s : curServers) { // 需要添加 if (!cacheServers.contains(s)) { redisTemplate.opsForZSet().add(CacheConstant.CHAT_SERVER_CLIENT_COUNT, s, 0); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4.多客户端登录

多客户端登录,这边每个用户都有对应设备表。不同类型设备,可以同时在线。例如手机和电脑,单不能有多部手机同时在线。当同一个设备类型多个在线,会将之前登录的设备顶下线。逻辑和微信一样。这样导致对于tcp服务端来说用户的唯一性是userId +deviceType,这边使用userKey=userId + “_” + deviceType。来表示登录用户的维度。

CREATE TABLE `t_device` ( `id` bigint NOT NULL COMMENT 'id主键', `device_identifier` varchar(64) COLLATE utf8mb4_general_ci NOT NULL COMMENT '设备标识', `device_name` varchar(100) COLLATE utf8mb4_general_ci NOT NULL COMMENT '设备名称', `device_type` int NOT NULL DEFAULT '1' COMMENT '设备类型(1手机 2电脑 3平板)', `os_type` int NOT NULL DEFAULT '1' COMMENT '操作系统类型', `online_status` int NOT NULL DEFAULT '1' COMMENT '在线状态(0离线 1在线)', `login_ip` varchar(32) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '最后登录IP', `login_date` datetime DEFAULT NULL COMMENT '最后登录时间', `user_id` bigint NOT NULL COMMENT '用户id', `create_time` datetime NOT NULL COMMENT '创建时间', `create_user` varchar(32) COLLATE utf8mb4_general_ci NOT NULL COMMENT '创建人', `update_time` datetime NOT NULL COMMENT '更新时间', `update_user` varchar(32) COLLATE utf8mb4_general_ci NOT NULL COMMENT '更新人', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='设备表';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.消息转发

因为客户端A和客户端B连接的服务端可能不是同一个,那么需要服务端将消息转发到客户端B连接的服务端消息。刚开始想用http实现服务端之间的消息转发,但是设计场景是百万级用户。那么需要使用MQ实现消息的转发,这样提高系统的吞吐量。为了能够将消息转发到客户端B连接的服务端,将消息队列,用配置的方式加入IP和端口号。例如:

@Value("${inner-ip}") private String innerIp; @Value("${server.port}") private String serverPort; @Bean Queue queueChatMsgNetty() { //当当前服务的ip和端口创建,这样避免mq发消息转发到错误的chat服务器上 String queueName = RabbitMQConstants.QUEUE_CHAT_MSG_NETTY + innerIp + ":" + serverPort; return new Queue(queueName, true); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
@RabbitListener(queues = RabbitMQConstants.QUEUE_CHAT_MSG_NETTY + "#{environment.getProperty('inner-ip')}:#{environment.getProperty('server.port')}") public void receiveMessage(String message) { System.out.println("Message received: " + message); final DownMessage downMessage = JSON.parseObject(message, DownMessage.class); final Channel channel = ChannelRepository.get(downMessage.getUserKey()); if (Objects.isNull(channel)) { log.warn("连接客户端连接关闭了,下发消息到客户端失败:{}", message); return; } final ChannelFuture channelFuture = channel.writeAndFlush(downMessage); channelFuture.addListener(future -> { if (future.isSuccess()) { log.info("转发接收消息到客户端成功:{}", downMessage.getNonce()); } else { log.warn("转发接收消息到客户端失败:{}", downMessage.getNonce()); } }); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

因为是多客户端登录,不仅要将消息发送到客户端B的多个设备。那么客户端A的其他设备也需要同步,例如你在手机发消息,那么电脑端也是实时同步你的消息。

为了能够支持离线消息同步,需要将消息存入数据库中。一开始想着将消息放在MQ中,等客户端什么时候上线了再消防。但是这会导致消息积压,降低系统的吞吐量。而且消息没用持久化就是不安全。这边使用的数据库是MySQL,实际对于聊天消息一般用的是Hbase(Facebook用的是Hbase)。这边笔者学会Hbase将消息改成Hbase,同时增加一个data_server服务。

4.消息同步

每当我们用电脑登录微信,微信都会全量同步我们的聊天信息。这边实现也是,在用户登录会从服务端拉取聊天频道,每个的最新messageId.聊天频道就是用户和群的会话。这边先全量读取,是为了防止有聊天chanel本地客户端没有。客户端对比每个聊天channel的messageId和本地是否维护已经读取到的curMaxMessageId值。这样只需要同步本地没有的消息。

系统设计

系统采用了分布式架构,基于 Spring Cloud 实现微服务化,服务注册和发现方面使用 Nacos,消息中间件选择 RabbitMQ,持久性数据存储采用 MySQL 数据库,缓存层使用 Redis。在数据访问层面,使用 MyBatis-Plus 简化数据库操作。服务之间通过 OpenFeign 实现远程调用,以提高服务之间的通信效率。为了满足即时通讯需求,引入了 Netty 框架,以实现高性能、实时的消息传递。

项目目录

在这里插入图片描述

doc

项目启动脚本和数据库脚本

jichat-dependencies

定义 JiChat项目的所有依赖的版本

jichat-framework

所有封装的技术框架都放在这里,基本都是使用spring-boot自动注入功能封装框架代码。
(1)jichat-common,公共包
(2)jichat-mybatis-gen:使用mybatis代码生成,封装了DTO,VO,Convert自定义类生成。网上很多都代码自动生成一个压缩包,还要手动复制比较麻烦。这边将代码生成在对应的目录里。因为要使用到mybatis里基类,引用了jichat-spring-boot-starter-mybatis。
(3)jichat-spring-boot-starter-mybatis:使用了mybatis-puls自动写入新增时间/更新时间/新增用户/更新用字段。之前项目分页用的都是PageHelper,而且这个比较简洁。这边也使用了,但是返回的PageInfo字段又太多了,这边封装了一个方法只返回total和list。

public class JiPageHelper { /** * 封装PageHelper的分页(原来返回的PageInfo字段太多了,这边自定义返回PageVO) * * @param pageDTO 分页内容 * @param select 接口中调用的查询方法 * @return com.ji.jichat.common.pojo.PageVO * @author jisl on 2024/1/28 18:11 **/ public static <E> PageVO<E> doSelectPageInfo(PageDTO pageDTO, ISelect select) { PageInfo<E> pageInfo = PageHelper.startPage(pageDTO.getPageNum(), pageDTO.getPageSize()).doSelectPageInfo(select); return new PageVO<>(pageInfo.getList(), pageInfo.getTotal()); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

(4)jichat-spring-boot-starter-web:web服务功能封装,请求日志打印AccessLogAspect,TraceId和SpanId拦截器(这边没有使用第三方Zipkin,个人感觉太重了。这边服务还不是很多),统一异常处理,CommonResult返回给前端TraceId这样方便定位异常日志。

public class GlobalResponseBodyHandler implements ResponseBodyAdvice<CommonResult> { @Override @SuppressWarnings("NullableProblems") // 避免 IDEA 警告 public boolean supports(MethodParameter returnType, Class converterType) { if (returnType.getMethod() == null) { return false; } // 只拦截返回结果为 CommonResult 类型 return returnType.getMethod().getReturnType() == CommonResult.class; } @Override public CommonResult beforeBodyWrite(CommonResult commonResult, MethodParameter methodParameter, MediaType mediaType, Class<? extends HttpMessageConverter<?>> aClass, ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse) { //每个返回值,都加入traceId,这样方便定位日志 commonResult.setTraceId(CommonWebUtil.getTraceId(Objects.requireNonNull(HttpContextUtil.getHttpServletRequest()))); return commonResult; } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

(5)jichat-spring-boot-starter-security:用户鉴权,将来考虑使用jichat-gateway来实现。

jichat-gateway

网关,还未实现

jichat-modules

所有的业务相关项目放在这个目录里,因为是分布式项目这边统一每个业务项目都有一个api和app。api项目放RPC接口,VO和DTO,枚举等;app是业务具体实现。
目前就user-service和chat-service两个服务,这边没有再封装一个web应用比较麻烦。将来使用Hbase会增加一个data-service,来处理消息数据。chat-client是客户端,实际环境中也不会用Java写客户端这边是为了实现调试。

log

日志一开始想将配置文件logback-spring.xml放在jichat-spring-boot-starter-web项目里,但是启动项目无法识别项目名称。看网上是要使用bootstrap.yaml,比较麻烦没有放在那里。debug模式下,框架日志输出太多了,屏幕一些内容。同时不同环境使用的日志模式不一样,一般现在生产都是用ELK。有时间可以将日志转入ELK中。

<!-- io开头的(io.netty,lettuce) logger --> <logger name="io" level="info"/> <!--org开头的( Spring Boot,redisson,apache) logger --> <logger name="org" level="info"/> <!-- netflix-ribbon logger --> <logger name="com.netflix" level="INFO"/> <!--开发环境:打印控制台--> <springProfile name="dev"> <root level="DEBUG"> <appender-ref ref="CONSOLE" /> <appender-ref ref="INFO_FILE" /> <appender-ref ref="ERROR_FILE" /> </root> </springProfile> <!--生产环境:输出到文件--> <springProfile name="!dev"> <root level="INFO"> <appender-ref ref="INFO_FILE" /> <appender-ref ref="ERROR_FILE" /> </root> </springProfile>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

项目部署

(1)安装docker和docker-compose,安装组件

执行doc目录里docker-compose.yml文件。即可安装rabbitmq,nacos,mysql,reids等组件。如果nacos安装失败,是因为需要在MySQL初始化nacos-db.sql文件数据。

(2)启动user-service-app和chat-service-app服务

修改user-service-app和chat-service-app服务配置文件连接rabbitmq,nacos,mysql,reids地址和账号,即可启动user-service-app和chat-service-app服务

(3)启动chat-client客户端

使用swagger注册账号,修改chat-client配置即可启动客户端。为了方便实现通信,这边dev和test配置文件,使用不同端口和用户id

(4)访问地址

chat-server swagger: http://localhost:18080/chat-api/doc.html#/home user-server swagger: http://localhost:18081/user-api/doc.html#/home chat-client swagger: http://localhost:9192/doc.html#/home
  • 1
  • 2
  • 3

GitHub源码

JiChat GitHub https://github.com/jsl1992/JiChat

总结

通过以上实现了JiChat,这边也有几个IM功能没有实现
(1)群消息,消息实现就是将消息都发一遍给群里其他人。剩下的逻辑和私聊消息一致。
(2)多媒体消息,多媒体消息就是图片和语音,视频,文件。这边想到的思路是小文件使用和普通消息一样通过netty传递。比较大的消息先将文件上传到文件服务器,然后将url传递给用户。
(3)端到端加密通信,如果是当客户端比较好实现。客户端A将RSA公钥发给客户端B,客户端B用RSA公钥加密AES密钥。客户端A用私钥解密收到的AES密钥。这样就完成AES密钥传递。我们每天上网使用的HTTPS都用到TLS(Transport Layer Security)协议,里面的密钥传递流程就是基于此。对密码学感兴趣的读者可以参考这边文章密码学总结,实现开放接口验签和加密
但是这边如果是多端登录,就不太好实现。那么只能将AES加密的密钥存在服务端,这样服务端是可以看到密码。还有一个问题是需要解决,密钥更新。该如何协商更新。

端到端加密(End-to-End Encryption, E2EE)是一种安全通信的方式,其中只有通信的两个端点(通常是两个终端设备,例如用户的手机或计算机)能够理解或解密消息,而在通信路径的中间节点,包括服务提供者,都无法直接访问或理解消息内容。(Whatsapp支持)
  • 1

(4)数据分片。对于百万级用户那么单裤单表,是没法满足需求。一般是通过ShardingSphere-JDBC实现分库分表,这边业务比较清晰根据用户维度进行分库分表即可。分库分表也会带来一些问题,查询都需要带上用户id,不然会查询所有库表。关联查询等比较复杂的查询没法支持了。还有一个问题是如何查询用户名或者手机号是否存在,单表直接查询就好。而现在需要查询所有库表,放在Redis那也太浪费资源了。使用布隆过滤器,用户注销比较麻烦,很多API不支持布隆过滤器删除,碰上假阳性还是需要重新查一遍。

Bloom filter 是由 Howard Bloom 在 1970 年提出的二进制向量数据结构,它具有很好的空间和时间效率,被用来检测一个元素是不是集合中的一个成员。如果检测结果为是,该元素不一定在集合中;但如果检测结果为否,该元素一定不在集合中。

这边项目代码实现,一些技术框架参考了yudao-cloud代码。

标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索