A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始


web项目当遇上分布式,需要考虑各个服务应用管理的session如何让session里面的数据共享。这里可以使用redis作为应用服务的共享中间站。
public class StandardSession implements HttpSession, Session, Serializable
通过查看源代码发现web项目中的session是有实现serializable接口,所以我们可以通过将每个session对象序列化再存储到redis里面,因为各个应用服务都使用相同的redis,所以当请求过来的时候可以通过cookies里面的sessionID去redis里面反序列化回来session。达到共享session的效果。使用技术可以百度spring-session提供的技术来实现。这个是比较常用比较常见的一种session共享解决方案。


但是想象总是美好的。当我们使用websocket做简易聊天功能的时候,通过查看WebSocketSession发现该实现类是没有继承serializable,也就是说按照上面的方案是解决不了共享session的问题。


但是当集群情况出现的时候,每个节点存储的session各个都不相同。我们采用这种方案。




每个服务节点的WebSocketSession保存初次连接的session,当第一次访问在第一台服务器上面,并存储了他的session,在第二次访问nginx服务器被随机访问第二台节点服务器上面,在存储的Map里面是找不到对应的session,做相对于的推送消息,这个时候我们通过消息中间件rocketMQ的发布订阅的模式,将相对应的sessionID分发到各个节点上面,每个节点监听接受消息,找到对应session并将对应的消息推送回客户端。完成这种操作。
代码实现如下:

@Component
@RocketMQMessageListener(
        topic = "haoke-im-send-message-topic",
        selectorExpression = "SEND_MSG",
        messageModel = MessageModel.BROADCASTING,
        consumerGroup = "haoke-im-group"
)
public class MessageHandler extends TextWebSocketHandler implements RocketMQListener<String> {

    @Autowired
    private MessageDAO messageDAO;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Long uid = (Long) session.getAttributes().get("uid");
        // 将当前用户的session放置到map中,后面会使用相应的session通信
        SESSIONS.put(uid, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage
            textMessage) throws Exception {
        Long uid = (Long) session.getAttributes().get("uid");

        JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());
        Long toId = jsonNode.get("toId").asLong();
        String msg = jsonNode.get("msg").asText();

        Message message = Message.builder()
                .from(UserData.USER_MAP.get(uid))
                .to(UserData.USER_MAP.get(toId))
                .msg(msg)
                .build();

        // 将消息保存到MongoDB
        message = this.messageDAO.saveMessage(message);

        String msgJson = MAPPER.writeValueAsString(message);

        // 判断to用户是否在线
        WebSocketSession toSession = SESSIONS.get(toId);
        if (toSession != null && toSession.isOpen()) {
            //TODO 具体格式需要和前端对接
            toSession.sendMessage(new TextMessage(msgJson));
            // 更新消息状态为已读
            this.messageDAO.updateMessageState(message.getId(), 2);
        }else{
            // 该用户可能下线,可能在其他的节点中,发送消息到MQ系统
            // 需求:添加tag,便于消费者对消息的筛选
            this.rocketMQTemplate.convertAndSend("haoke-im-send-message-topic:SEND_MSG", msgJson);
        }

    }

    @Override
    public void onMessage(String msg) {
        try {
            JsonNode jsonNode = MAPPER.readTree(msg);
            long toId = jsonNode.get("to").get("id").longValue();

            // 判断to用户是否在线
            WebSocketSession toSession = SESSIONS.get(toId);
            if (toSession != null && toSession.isOpen()) {
                //TODO 具体格式需要和前端对接
                toSession.sendMessage(new TextMessage(msg));
                // 更新消息状态为已读
                this.messageDAO.updateMessageState(new ObjectId(jsonNode.get("id").asText()), 2);
            }else{
                // 不需要做处理
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}


redis取代session.png (18.1 KB, 下载次数: 6)

redis取代session.png

分布式session方案.png (17.42 KB, 下载次数: 10)

分布式session方案.png

微信图片_20190515171223.png (8.47 KB, 下载次数: 5)

微信图片_20190515171223.png

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马