workerman
是一款开源高性能PHP应用容器,它大大突破了传统PHP应用范围,被广泛的用于互联网、即时通讯、APP开发、硬件通讯、智能家居、物联网等领域的开发。
WebSocket
是HTML5下一种新的协议(websocket协议本质上是一个基于tcp的协议)
它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的
Websocket是一个持久化的协议
1.安装 workerman/workerman
composer require workerman/workerman
2.根目录新建 socket.php
作为入口文件, 绑定的是 application/worker/controller/Websocket.php
<?php
define('APP_PATH', __DIR__ . '/application/');
define('BIND_MODULE','worker/Websocket'); //绑定模块
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
3.根目录新建 Websocket.php
文件路径 application/worker/controller/Websocket.php
<?php
use Workerman\Worker;
use app\common\library\ShopToken;
use app\utils\CommonUtil;
use app\dao\ShopUserDao;
//require_once '../vendor/Workerman/Autoloader.php';
// 初始化一个worker容器,监听28761端口
global $worker;
$worker = new Worker('websocket://0.0.0.0:28761');
// 这里进程数必须设置为1
$worker->count = 1;
// worker进程启动后建立一个内部通讯端口
$worker->onWorkerStart = function($worker)
{
// 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
$inner_text_worker = new Worker('Text://0.0.0.0:28762');
$inner_text_worker->onMessage = function($connection, $buffer)
{
global $worker;
// $data数组格式,里面有uid,表示向那个uid的页面推送数据
$data = json_decode($buffer, true);
$uid = $data['uid'];
// 通过workerman,向uid的页面推送数据
$ret = sendMessageByUid($uid, $data);
// 返回推送结果
$connection->send($ret ? 'ok' : "uid $uid not online");
};
$inner_text_worker->listen();
};
$worker->onConnect = function ($connection) {
return wsReturnSuccess($connection, '连接成功');
};
// 新增加一个属性,用来保存uid到connection的映射
$worker->uidConnections = array();
// 当有客户端发来消息时执行的回调函数
$worker->onMessage = function($connection, $data)use($worker)
{
if (empty($data)) return wsReturnError($connection, "数据不能为空");
$data = json_decode($data, true);
if(!is_array($data)) return wsReturnError($connection, "数据解析失败");
if(!isset($data['token'])) return wsReturnError($connection, "缺少token参数");
if(empty($data['token'])) return wsReturnError($connection, "token不能为空");
$token = $data['token'];
$data = ShopToken::get($data['token']);
if (!$data) {
return wsReturnError($connection, 'token无效');
}
$user_id = intval($data['user_id']);
if($user_id == 0)return wsReturnError($connection, '登录失效');
//查找店铺信息
$shop_user = ShopUserDao::getOneById($user_id);
if(!$shop_user) return wsReturnError($connection, '店铺用户不存在');
// 判断当前客户端是否已经验证,既是否设置了uid
if(!isset($connection->uid))
{
$connection->uid = 'shop_user_' . $token;
/* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
* 实现针对特定uid推送数据
*/
$worker->uidConnections[$connection->uid] = $connection;
}
return wsReturnSuccess($connection, '账号 ' . $shop_user['mobile'] . ' 已连接');
};
// 当有客户端连接断开时
$worker->onClose = function($connection)use($worker)
{
global $worker;
if(isset($connection->uid))
{
// 连接断开时删除映射
unset($worker->uidConnections[$connection->uid]);
}
};
// 向所有验证的用户推送数据
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// 针对uid推送数据
function sendMessageByUid($uid, $data)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send(json_encode($data, JSON_UNESCAPED_UNICODE));
return true;
}
return false;
}
function wsReturnError($connection, $msg, $data = []){
$connection->send(json_encode([
'uid' => $connection->uid ?? '', 'msg' => $msg, 'code' => 0, 'data' => ['errcode' => 1, 'msg' => $msg, 'action' =>''] //和api返回保持一致
], JSON_UNESCAPED_UNICODE));
}
function wsReturnSuccess($connection, $msg, $data = []){
$connection->send(json_encode([
'uid' => $connection->uid ?? '', 'msg' => $msg, 'code' => 1, 'data' => $data, 'action' =>'' //和api返回保持一致
], JSON_UNESCAPED_UNICODE));
}
// 运行所有的worker(其实当前只定义了一个)
Worker::runAll();
4.启动
php socket.php start //debug模式
php socket.php start -d //守护模式,永驻
5.发起推送
$client = stream_socket_client('tcp://127.0.0.1:28762', $errno, $errmsg, 1);
// 推送的数据,包含uid字段,表示是给这个uid推送
$data = ['uid' => 'shop_user_xxxxxx', 'msg' => '支付成功', 'code' => 1, 'data' => []];
// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
fwrite($client, json_encode($data)."\n");
// 读取推送结果
//echo fread($client, 8192);
echo fread($client, 8192);
6.VUE3前端代码
6.1 定义数据结构
const returnData = ref({});
const wsUrl = ref("ws://1.2.3.666:28761"); //链接地址
const websock = ref(null);
6.2 打开页面,初始化连接
onMounted(async () => {
initWebSocket();
});
const initWebSocket = () => {
//初始化
websocket.value = new WebSocket(wsUrl.value);
websocket.value.onopen = () => {
console.log("连接成功");
sendMessage("开始链接");
};
//收到消息时候, 将消息内容赋值给 returnData
websocket.value.onmessage = (e) => {
console.log(JSON.parse(e.data), "--广播返回的消息");
returnData.value = JSON.parse(e.data);
};
websocket.value.onerror = () => {
console.log("连接错误");
};
};
6.3 发消息验证身份
将 token 发送, 将当前token 绑定 当前客户端
let obj = { token: localStorage.getItem("token") };
sendMessage(JSON.stringify(obj));
//发送消息
const sendMessage = (msg) => {
websocket.value.send(msg);
};
//关闭链接(在页面销毁时可销毁链接)
const closeWebSocket = () => {
websocket.value.close();
};
6.4 收到消息处理
监听 returnData
变化
watch(
() => returnData.value,
async (newValue, oldValue) => {
console.log("监听到returnData.value的改变");
console.log(returnData.value);
if (returnData.value) {
if (returnData.value.action) {
if ((returnData.value.action = "cashier_order_pay_success_notify")) {
ElMessage({
showClose: true,
message: "用户已支付成功",
type: "success",
});
}
}
}
},
{ deep: true }
);