A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

在我初学编程的时候,还没写过完整点的项目就看过了一些高阶概念。在没有实践时,这些概念的神奇和强大之处很难被完全体会的。而一旦自己在摸索中应用了,瞬间觉得打开了一扇大门,技能又提升了一个层次。控制反转(Inversion of Control)就是这些强大概念之一。一年前在 MPJ 老师的频道上了解到了,但一直没自己独立创造场景用过。直到最近在项目中遇到个坑才用起来。
其实控制反转或者依赖注入(这两个感觉是同一个东西,看你从什么角度看)在前端框架中已经大量使用了。最早由 Angular 普及,后续的现代框架都有应用。比如 React 开发中目前最火的组件设计模式 Render Props,就是控制反转的一种应用。离开框架,在日常开发中,应用这种技巧可以帮助我们解决很多棘手的问题,今天就讲下我在开发中的一次应用。
项目场景是这样的:技术栈是 Nuxt + Vuex。项目需要连接 Web Socket,然后根据 Socket 传来的数据,对 Vuex 里面相应的数据进行修改。公司为了节约成本,将 Socket 数据压缩了,而且不是全量推送,这要求前端收到数据后对数据进行解压,然后对数据进行遍历查找,更新,重新计算和排序,总之对 Socket 数据的处理非常复杂。为了不影响性能,我把 Socket 连接和数据处理放进了 Web Worker。先来看下项目结构。
下面是我封装的一个 Socket 工厂函数:
// utils/socket.jsexport default function Socket() {  let heartBeat; // 心跳记录  let lost = 0; // 心跳失败次数  function decLost() {    lost -= 1;  }  function connect() {    const socket = new WebSocket("wss://xx.com");    socket.onopen = () => {      heartBeat = setInterval(() => {        socket.send(2);        lost += 1;        if (lost === 3) {          // 心跳失败超过 3 次,断开重连          clearInterval(heartBeat);          socket.close();          connect();        }      }, 5000);    };    socket.onerror = () => {      clearInterval(heartBeat);      socket.close();    };    socket.onclose = () => {      setTimeout(() => {        clearInterval(heart);        connect();      }, 3000);    };    return socket;  }  return Object.freeze({ decLost, connect });}复制代码Socket 连接实现了心跳机制。onopen 之后,每隔 5 秒向服务器发送 2,并把心跳失败次数加 1;服务器收到 2 之后会返回 3,客户端收到 3 之后再把心跳失败次数减 1。工厂函数暴露的 decLost 方法是为了外部在收到 3 之后把心跳次数减 1.
在 Web Worker 文件里面,调用 Socket 工厂函数,并连接 socket:
// workers/socket.jsimport Socket from "~/utils/socket";const socket = Socket();const socketConnection = socket.connect();socketConnection.onmessage = ({ data }) => {  // 处理 socket 接收到的数据,  // 处理完后通过 web worker 接口发出去  postMessage(result);};// web worker 收到外部的数据后,把数据发给 socketonmessage = ({ data }) => {  socketConnection.send(data);};复制代码然后在一个 Nuxt 插件里,引入 socket worker,收到 worker 里传来的数据后,把数据交给 Vuex Store,反之,监听到相关 Vuex Mutation 后,把 payload 传给 worker:
// plugins/socket.js// webpack 下导入 web worker 的方式:import SocketWorker from "worker-loader!~/workers/socket.js";const socketWorker = new SocketWorker();export default ({ store }) => {  store.subscribe((mutation, state) => {    // 监听到相关 Vuex Mutation  });  socketWorker.onmessage = ({ data }) => {    //监听 socket 发来的数据,收到数据后,    // 通过 store.commit() 来把数据存入 vuex store  };};复制代码这是我一开始写的 naive 版本,看起来主要功能实现了,而且封装和 Separation of concerns 做的也不错。写完刚跑起来,问题出现了。
挑战一:等 socket 连接成功后再发起订阅当应用打开后,需要立即订阅推送数据,包括用户登录状态下的私有数据和其它基础数据等。但是当发起订阅时,socket 可能连接成功了,也可能还没连接成功。一开始我想设置个定时器,过两秒后再发起订阅。可是想想这种做法也太挫了。第二个思路是在 socket 连接的 onopen 事件里执行订阅。可是这样子会直接把以前的 onopen 覆盖掉,而且这样做违反了封装原则。剩下就一个办法了,等连接成功后再发请求。来看怎么做的:
// workers/socket.js// ...// const socketConnection ...const waitForConnection = timeout =>  new Promise((resolve, reject) => {    const check = () => {      if (socketConnection.readyState === 1) {        resolve();      } else if ((timeout -= 100) < 0) {        reject("socket connection timed out");      } else {        setTimeout(check, 100);      }    };    setTimeout(check, 100);  });// ... 其它细节onmessage = async ({ data }) => {  try {    await waitForConnection(2000);  } catch (e) {    console.error(e);    return;  }  socketConnection.send(data);};复制代码waitForConnection 函数会每隔 100 ms 检查 socket 连接状态,如果连接状态是 1(成功),则 resolve Promise,否则一直隔 100 ms 检查一次,直到连接成功或者超过指定时间。
在向 socket 发送数据之前,先调用 waitForConnection,并指定最多等 2 秒,确保连接成功后再发送数据。
问题看起来解决了。奇淫技巧都用上了,让我满意了一会儿。直到……
需求来了!在 socket 断开重连后,需要续订之前的订阅。而包括用户 token 等订阅参数全都在 Vuex Store 里面。那这下头疼了,Vuex store 里面是没法知道断开重连的,而 worker 里面则根本没法读取 vuex store。知道这个需求后我内心是崩溃的,这根本没法写下去了啊!就在我都快要打算调整架构重写时,一拍脑袋灵光一闪,试试控制反转!
首先要让 Socket 工厂函数有个判断重连的机制。这个简单。
// utils/socket.jsexport default function Socket() {  let connectCount = 0; // 连接成功次数  // ...细节,见文章前面  socket.onopen = () => {    // 每次连接成功,连接次数加1    connectCount += 1;    if (connectCount > 1) {      // 若连接次数超过一次,则说明此次是重连      // 在这里可以做些重连之后的操作了    }  };  // ...}复制代码重连之后具体做什么事,这可以用依赖注入来实现。先在 worker 文件里定义要做的事情,然后在调用 Socket 工厂函数时注入方法:
// worker/socket// 通过 postMessage 通知外部重连const notifyReconnect = () => {  postMessage({ type: "reconnect" });};const socket = Socket(notifyReconnect);复制代码然后在 Socket 函数里接收一下:
// utils/socket.jsexport default function Socket(notifyReconnect) {  // ...细节,见文章前面  socket.onopen = () => {    connectCount += 1;    if (connectCount > 1) {      notifyReconnect();    }  };  // ...}复制代码我以为写到这里应该就可以了的,然而我还是太天真了。运行后,plugins/socket 文件里能接收到重连消息,但是一直连接失败。这个问题很诡异,最后发现还是因为我对 Web Socket 掌握的不深导致的。每次 socket 连接后,生成的连接实例都是新的。而我在 waitForConnection 方法里监听的 socketConnection 在关闭后,readyState 一直是 3(关闭状态),导致 waitForConnection 方法一直报 timeout 错误。
剩下的最后问题是每次重连,都更新连接实例。方法如下:
// worker/socketlet socketConnection;const notifyReconnect = connection => {  postMessage({ type: "reconnect" });  socketConnection = connection;};const socket = Socket(notifyReconnect);socketConnection = socket.connect();复制代码// utils/socketexport default function Socket(notifyReconnect) {  // ...细节,见文章前面  socket.onopen = () => {    connectCount += 1;    if (connectCount > 1) {      notifyReconnect(socket);    }  };  // ...}复制代码Socket 函数在调用 notifyReconnect 时,传入最新的连接实例 socket。
至此,功能都实现了。完整代码如下:
// utils/socket.jsexport default function Socket(notifyReconnect) {  let heartBeat;  let lost = 0;  let connectCount = 0;  function decLost() {    lost -= 1;  }  function connect() {    const socket = new WebSocket("wss://xx.com");    socket.onopen = () => {      connectCount += 1;      if (connectCount > 1) {        notifyReconnect(socket);      }      heartBeat = setInterval(() => {        socket.send(2);        lost += 1;        if (lost === 3) {          clearInterval(heartBeat);          socket.close();          connect();        }      }, 5000);    };    socket.onerror = () => {      clearInterval(heartBeat);      socket.close();    };    socket.onclose = () => {      setTimeout(() => {        clearInterval(heart);        connect();      }, 3000);    };    return socket;  }  return Object.freeze({ decLost, connect });}复制代码// workers/socket.jsimport Socket from "~/utils/socket";let socketConnection;const notifyReconnect = connection => {  postMessage({ type: "reconnect" });  socketConnection = connection;};const socket = Socket(notifyReconnect);socketConnection = socket.connect();const waitForConnection = timeout =>  new Promise((resolve, reject) => {    const check = () => {      if (socketConnection.readyState === 1) {        resolve();      } else if ((timeout -= 100) < 0) {        reject("socket connection timed out");      } else {        setTimeout(check, 100);      }    };    setTimeout(check, 100);  });socketConnection.onmessage = ({ data }) => {  // 处理完数据后通过 web worker 接口发出去  postMessage(result);};onmessage = async ({ data }) => {  try {    await waitForConnection(2000);  } catch (e) {    console.error(e);    return;  }  socketConnection.send(data);};复制代码// plugins/socket.jsimport SocketWorker from "worker-loader!~/workers/socket.js";const socketWorker = new SocketWorker();export default ({ store }) => {  store.subscribe((mutation, state) => {});  socketWorker.onmessage = ({ data }) => {    if (data.type === "reconnect") {      socketWorker.postMessage(/* 订阅参数 */);    }  };};复制代码

作者:leihuang
链接:https://juejin.im/post/5bac8e906fb9a05d1228133b



1 个回复

倒序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马