需求分析
我们假设有一个需求,我在后端点击按钮 1,首页弹出 “后端触发了按钮 1”。后端点了按钮 2,列表页弹出 “后端触发了按钮 2”。做到根据不同场景推送到不同页面。
代码思路
Swoole fd
客户端浏览器打开或者刷新界面,在 swoole 服务会生成一个进程句柄 fd ,每次浏览器页面有打开链接 websocket 的 js 代码,便会生成,每次刷新的时候,会关闭之前打开的 fd,重新生成一个新的,关闭界面的时候会生成一个新的。swoole 的 fd 生成规则是从 1 开始递增。
Redis Hash 存储 fd
我们建立一个 key 为 swoole:fds redis 哈希类型数据,fd 为 hash 的字段,每个字段的值我们存储前端 websocket 请求的 url 参数信息 (根据业务复杂度自己灵活变通,我在项目中会在 url 带上 sessionId)。每次链接打开 swoole 服务的时候我们存储其信息,每次关闭页面时候我们清除其字段。在 redis 存储如下
触发分场景推送
在界面上当进行了触发操作的时候,通过后台 curl 请求 swoole http 服务,swoole http 服务根据你向我传递的参数分发给对应的逻辑处理。如 curl 请求 127.0.0.1:9502page=back&func=pushHomeLogic&token=123456 我们可以根据传入的 func 参数,在后台分发给对应逻辑处理。如分发给 pushHomeLogic 方法。在其里面实现自己的逻辑。为防止过多的 ifelse 以及 foreach 操作,我们采用的是闭包,call_user_func 等方法实现如下
复制代码
1 public function onRequest($request,$response)
2 {
3 if ($this->checkAccess("", $request)) {
4 $param = $request->get;
5 // 分发处理请求逻辑
6 if (isset($param['func'])) {
7 if (method_exists($this,$param['func'])) {
8 call_user_func([$this,$param['func']],$request);
9 }
10 }
11 }
12 }// 往首页推送逻辑处理
13 public function pushHomeLogic($request)
14 {
15 $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
16 if ($aContent && $aContent['page'] == "home") {
17 $aRes['message'] = "后端按了按钮1";
18 $aRes['code'] = "200";
19 $oSwoole::$server->push($fd,xss_json($aRes));
20 }
21 };
22 $this->eachFdLogic($callback);
23 }
复制代码
完整代码
swool 脚本代码逻辑
复制代码
1 <?php
2
3 namespace App\Console\Commands;
4
5 use Closure;
6 use Illuminate\Console\Command;
7 use Illuminate\Support\Facades\Redis;
8
9 class SwooleDemo extends Command
10 {
11 // 命令名称
12 protected $signature = 'swoole:demo';
13 // 命令说明
14 protected $description = '这是关于swoole websocket的一个测试demo';
15 // swoole websocket服务
16 private static $server = null;
17
18 public function __construct()
19 {
20 parent::__construct();
21 }
22
23 // 入口
24 public function handle()
25 {
26 $this->redis = Redis::connection('websocket');
27 $server = self::getWebSocketServer();
28 $server->on('open',[$this,'onOpen']);
29 $server->on('message', [$this, 'onMessage']);
30 $server->on('close', [$this, 'onClose']);
31 $server->on('request', [$this, 'onRequest']);
32 $this->line("swoole服务启动成功 ...");
33 $server->start();
34 }
35
36 // 获取服务
37 public static function getWebSocketServer()
38 {
39 if (!(self::$server instanceof \swoole_websocket_server)) {
40 self::setWebSocketServer();
41 }
42 return self::$server;
43 }
44 // 服务处始设置
45 protected static function setWebSocketServer():void
46 {
47 self::$server = new \swoole_websocket_server("0.0.0.0", 9502);
48 self::$server->set([
49 'worker_num' => 1,
50 'heartbeat_check_interval' => 60, // 60秒检测一次
51 'heartbeat_idle_time' => 121, // 121秒没活动的
52 ]);
53 }
54
55 // 打开swoole websocket服务回调代码
56 public function onOpen($server, $request)
57 {
58 if ($this->checkAccess($server, $request)) {
59 self::$server->push($request->fd,xss_json(["code"=>200,"message"=>"打开swoole服务成功"]));
60 }
61 }
62 // 给swoole websocket 发送消息回调代码
63 public function onMessage($server, $frame)
64 {
65
66 }
67 // http请求swoole websocket 回调代码
68 public function onRequest($request,$response)
69 {
70 if ($this->checkAccess("", $request)) {
71 $param = $request->get;
72 // 分发处理请求逻辑
73 if (isset($param['func'])) {
74 if (method_exists($this,$param['func'])) {
75 call_user_func([$this,$param['func']],$request);
76 }
77 }
78 }
79 }
80
81 // websocket 关闭回调代码
82 public function onClose($serv,$fd)
83 {
84 $this->redis->hdel('swoole:fds', $fd);
85 $this->line("客户端 {$fd} 关闭");
86 }
87
88 // 校验客户端连接的合法性,无效的连接不允许连接
89 public function checkAccess($server, $request):bool
90 {
91 $bRes = true;
92 if (!isset($request->get) || !isset($request->get['token'])) {
93 self::$server->close($request->fd);
94 $this->line("接口验证字段不全");
95 $bRes = false;
96 } else if ($request->get['token'] != 123456) {
97 $this->line("接口验证错误");
98 $bRes = false;
99 }
100 $this->storeUrlParamToRedis($request);
101 return $bRes;
102 }
103
104 // 将每个界面打开websocket的url 存储起来
105 public function storeUrlParamToRedis($request):void
106 {
107 // 存储请求url带的信息
108 $sContent = json_encode(
109 [
110 'page' => $request->get['page'],
111 'fd' => $request->fd,
112 ], true);
113 $this->redis->hset("swoole:fds", $request->fd, $sContent);
114 }
115
116 /**
117 * @param $request
118 * @see 循环逻辑处理
119 */
120 public function eachFdLogic(Closure $callback = null)
121 {
122 foreach (self::$server->connections as $fd) {
123 if (self::$server->isEstablished($fd)) {
124 $aContent = json_decode($this->redis->hget("swoole:fds",$fd),true);
125 $callback($aContent,$fd,$this);
126 } else {
127 $this->redis->hdel("swoole:fds",$fd);
128 }
129 }
130 }
131 // 往首页推送逻辑处理
132 public function pushHomeLogic($request)
133 {
134 $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
135 if ($aContent && $aContent['page'] == "home") {
136 $aRes['message'] = "后端按了按钮1";
137 $aRes['code'] = "200";
138 $oSwoole::$server->push($fd,xss_json($aRes));
139 }
140 };
141 $this->eachFdLogic($callback);
142 }
143 // 往列表页推送逻辑处理
144 public function pushListLogic($request)
145 {
146 $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
147 if ($aContent && $aContent['page'] == "list") {
148 $aRes['message'] = "后端按了按钮2";
149 $aRes['code'] = "200";
150 $oSwoole::$server->push($fd,xss_json($aRes));
151 }
152 };
153 $this->eachFdLogic($callback);
154 }
155
156 // 启动websocket服务
157 public function start()
158 {
159 self::$server->start();
160 }
161 }
162 控制器代码
163
164 <?php
165
166 namespace App\Http\Controllers;
167
168 use Illuminate\Http\Request;
169 use Illuminate\Support\Facades\Redis;
170 class TestController extends Controller
171 {
172 // 首页
173 public function home()
174 {
175 return view("home");
176 }
177 // 列表
178 public function list()
179 {
180 return view("list");
181 }
182 // 后端控制
183 public function back()
184 {
185 if (request()->method() == 'POST') {
186 $this->curl_get($this->getUrl());
187 return json_encode(['code'=>200,"message"=>"成功"]);
188 } else {
189 return view("back");
190 }
191
192 }
193 // 获取要请求swoole websocet服务地址
194 public function getUrl():string
195 {
196 // 域名 端口 请求swoole服务的方法
197 $sBase = request()->server('HTTP_HOST');
198 $iPort = 9502;
199 $sFunc = request()->post('func');
200 $sPage = "back";
201 return $sBase.":".$iPort."?func=".$sFunc."&token=123456&page=".$sPage;
202 }
203 // curl 推送
204 public function curl_get(string $url):string
205 {
206 $ch_curl = curl_init();
207 curl_setopt ($ch_curl, CURLOPT_TIMEOUT_MS, 3000);
208 curl_setopt($ch_curl, CURLOPT_SSL_VERIFYPEER, 0);
209 curl_setopt ($ch_curl, CURLOPT_HEADER,false);
210 curl_setopt($ch_curl, CURLOPT_HTTPGET, 1);
211 curl_setopt($ch_curl, CURLOPT_RETURNTRANSFER,true);
212 curl_setopt ($ch_curl, CURLOPT_URL,$url);
213 $str = curl_exec($ch_curl);
214 curl_close($ch_curl);
215 return $str;
216 }
217 }
复制代码
页面 js 代码
后端控制页
复制代码
1 <!DOCTYPE html>
2 <html lang="en">
3 <head>
4 <meta charset="UTF-8">
5 <title>后端界面</title>
6 <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">
7 </head>
8 <body>
9 <button class="push" data-func="pushHomeLogic">按钮1</button>
10 <button class="push" data-func="pushListLogic">按钮2</button>
11 </body>
12 <script src="{{ asset("/vendor/tw/global/jQuery/jquery-2.2.3.min.js")}} "></script>
13 <script>
14 $(function () {
15 $(".push").on('click',function(){
16 var func = $(this).attr('data-func').trim();
17 ajaxGet(func)
18 })
19 function ajaxGet(func) {
20 url = "{{route('back')}}";
21 token = "{{csrf_token()}}";
22 $.ajax({
23 url: url,
24 type: 'post',
25 dataType: "json",
26 data:{func:func,_token:token},
27 error: function (data) {
28 alert("服务器繁忙, 请联系管理员!");
29 return;
30 },
31 success: function (result) {
32
33 },
34 })
35 }
36
37 })
38 </script>
39 </html>
复制代码
首页
复制代码
1 <!DOCTYPE html>
2 <html lang="en">
3 <head>
4 <meta charset="UTF-8">
5 <title>swoole首页</title>
6 <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">
7 </head>
8 <body>
9 <h1>这是首页</h1>
10 </body>
11 <script>
12 var ws;//websocket实例
13 var lockReconnect = false;//避免重复连接
14 var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=home&token=123456';
15
16 function initEventHandle() {
17 ws.onclose = function () {
18 reconnect(wsUrl);
19 };
20 ws.onerror = function () {
21 reconnect(wsUrl);
22 };
23 ws.onopen = function () {
24 //心跳检测重置
25 heartCheck.reset().start();
26 };
27 ws.onmessage = function (event) {
28 //如果获取到消息,心跳检测重置
29 //拿到任何消息都说明当前连接是正常的
30 var data = JSON.parse(event.data);
31 if (data.code == 200) {
32 console.log(data.message)
33 }
34 heartCheck.reset().start();
35 }
36 }
37 createWebSocket(wsUrl);
38 /**
39 * 创建链接
40 * @param url
41 */
42 function createWebSocket(url) {
43 try {
44 ws = new WebSocket(url);
45 initEventHandle();
46 } catch (e) {
47 reconnect(url);
48 }
49 }
50 function reconnect(url) {
51 if(lockReconnect) return;
52 lockReconnect = true;
53 //没连接上会一直重连,设置延迟避免请求过多
54 setTimeout(function () {
55 createWebSocket(url);
56 lockReconnect = false;
57 }, 2000);
58 }
59 //心跳检测
60 var heartCheck = {
61 timeout: 60000,//60秒
62 timeoutObj: null,
63 serverTimeoutObj: null,
64 reset: function(){
65 clearTimeout(this.timeoutObj);
66 clearTimeout(this.serverTimeoutObj);
67 return this;
68 },
69 start: function(){
70 var self = this;
71 this.timeoutObj = setTimeout(function(){
72 //这里发送一个心跳,后端收到后,返回一个心跳消息,
73 //onmessage拿到返回的心跳就说明连接正常
74 ws.send("heartbeat");
75 self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
76 ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
77 }, self.timeout);
78 }, this.timeout);
79 },
80 header:function(url) {
81 window.location.href=url
82 }
83
84 }
85 </script>
86 </html>
复制代码
列表页面
复制代码
1 <!DOCTYPE html>
2 <html lang="en">
3 <head>
4 <meta charset="UTF-8">
5 <title>swoole列表页</title>
6 <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">
7 </head>
8 <body>
9 <h1>swoole列表页</h1>
10 </body>
11 <script>
12 var ws;//websocket实例
13 var lockReconnect = false;//避免重复连接
14 var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=list&token=123456';
15
16 function initEventHandle() {
17 ws.onclose = function () {
18 reconnect(wsUrl);
19 };
20 ws.onerror = function () {
21 reconnect(wsUrl);
22 };
23 ws.onopen = function () {
24 //心跳检测重置
25 heartCheck.reset().start();
26 };
27 ws.onmessage = function (event) {
28 //如果获取到消息,心跳检测重置
29 //拿到任何消息都说明当前连接是正常的
30 var data = JSON.parse(event.data);
31 if (data.code == 200) {
32 console.log(data.message)
33 }
34 heartCheck.reset().start();
35 }
36 }
37 createWebSocket(wsUrl);
38 /**
39 * 创建链接
40 * @param url
41 */
42 function createWebSocket(url) {
43 try {
44 ws = new WebSocket(url);
45 initEventHandle();
46 } catch (e) {
47 reconnect(url);
48 }
49 }
50 function reconnect(url) {
51 if(lockReconnect) return;
52 lockReconnect = true;
53 //没连接上会一直重连,设置延迟避免请求过多
54 setTimeout(function () {
55 createWebSocket(url);
56 lockReconnect = false;
57 }, 2000);
58 }
59 //心跳检测
60 var heartCheck = {
61 timeout: 60000,//60秒
62 timeoutObj: null,
63 serverTimeoutObj: null,
64 reset: function(){
65 clearTimeout(this.timeoutObj);
66 clearTimeout(this.serverTimeoutObj);
67 return this;
68 },
69 start: function(){
70 var self = this;
71 this.timeoutObj = setTimeout(function(){
72 //这里发送一个心跳,后端收到后,返回一个心跳消息,
73 //onmessage拿到返回的心跳就说明连接正常
74 ws.send("heartbeat");
75 self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
76 ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
77 }, self.timeout);
78 }, this.timeout);
79 },
80 header:function(url) {
81 window.location.href=url
82 }
83
84 }
85 </script>
86 </html>
复制代码
界面效果
后台控制点击按钮 1
后端界面点击按钮 2
|
|