worke
rman动态订阅mqtt主题
代码为整个订阅的文件,直接搜索
此处为动态订阅,每10秒检查一次是否有新设备,有则订阅 即可定位
动态订阅 代码的位置
<?php namespace appworkercontroller; use appadminvalidateAuthRule; use thinkCache; use thinkDb; use thinkworkerServer; use WorkermanConnectionAsyncTcpConnection; use WorkermanLibTimer; use WorkermanMqttClient; use WorkermanProtocolsWs; date_default_timezone_set("PRC"); class Sub extends Server { protected $socket = 'tcp://0.0.0.0:8002'; protected $processes = 1; protected $mqtt; protected static $heartbeat_time = 30; /** * 收到信息 * @param $connection * @param $data */ public function onMessage($topic, $content) { Subdata::getmqttdata($topic,$content); } /** * 当连接建立时触发的回调函数 * @param $connection */ public function onConnect($connection) { dump_log("连接 建立" . $connection->getRemoteIp() . ":" . $connection->getRemotePort()); $connection->send(json_encode('connect success')); } /** * 当连接断开时触发的回调函数 * @param $connection */ public function onClose($connection) { dump_log("连接 关闭"); #断线1秒后重新连接 $connection->reConnect(1); } /** * 当客户端的连接上发生错误时触发 * @param $connection * @param $code * @param $msg */ public function onError($connection, $code, $msg) { dump_log("错误: $code $msg"); } /** * 每个进程启动 * @param $worker */ public function onWorkerStart($worker) { dump_log("TCP服务启动了"); //$serial = "test001"; // 设备序列号 $server_address = "mqtt://*.*.*.*:1883"; // mqtt 服务器地址 $user_name = "zhanghao"; // 帐号 $pass_word = "mima"; // 密码 $options = [ 'username' => $user_name, 'password' => $pass_word, 'qos' => 1 ]; #这里订阅多个主题,查库,拿到所有的序列号,然后批量订阅--------------------- //$serial_list = ['test001', 'JMC1403100077']; $serial_list = Db::name('r_device')->column('code'); Cache::set('smart_home__mqtt__', $serial_list,0); $mqtt = new Client($server_address, $options); $mqtt->onConnect = function ($mqtt) use ($serial_list) { $count = count($serial_list); for ($i = 0; $i < $count; $i++) { $mqtt->subscribe("subTopic/socket/{$serial_list[$i]}"); dump('订阅启动成功' . $serial_list[$i]); } }; $mqtt->onMessage = function ($topic, $content) { $this->onMessage($topic, $content); }; $mqtt->connect(); $this->mqtt = $mqtt; $count = count($serial_list); #此处为动态订阅,每10秒检查一次是否有新设备,有则订阅---start Timer::add(10, function () { $new_serial_list = Db::name('r_device')->column('code'); $cache_list = Cache::get('smart_home__mqtt__'); $difference = array_diff($new_serial_list, $cache_list); dump($difference); if ($difference) { if (count($difference) > 0) { $count = count($difference); for ($i = 0; $i < $count; $i++) { $this->mqtt->subscribe("subTopic/socket/" . $difference[$i]); dump('新设备订阅启动成功' . $difference[$i]); } } Cache::set('smart_home__mqtt__', $new_serial_list,0); }else{ dump_log("没有发现新设备"); } }); #此处为动态订阅,每10秒检查一次是否有新设备,有则订阅---end for ($i = 0; $i < $count; $i++) { #定时发送心跳 Timer::add(5, function () use ($mqtt, $serial_list,$i) { $mqtt->publish("pubTopic/socket/{$serial_list[$i]}", 'ping', ['qos'=>1]); }); } } public function onWorkerReload($worker) { foreach ($worker->connections as $connection) { $connection->send('worker reloading'); } } }