workerman动态订阅mqtt主题

worke

rman动态订阅mqtt主题

代码为整个订阅的文件,直接搜索
此处为动态订阅,每10秒检查一次是否有新设备,有则订阅 即可定位
动态订阅 代码的位置

<?php

namespace appworkercontroller;

use appadminvalidateAuthRule;
use thinkCache;
use thinkDb;
use thinkworkerServer;
use WorkermanConnectionAsyncTcpConnection;
use WorkermanLibTimer;
use WorkermanMqttClient;
use WorkermanProtocolsWs;

date_default_timezone_set("PRC");

class Sub extends Server
{

    protected $socket = 'tcp://0.0.0.0:8002';

    protected $processes = 1;

    protected $mqtt;

    protected static $heartbeat_time = 30;


    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($topic, $content)
    {
        Subdata::getmqttdata($topic,$content);
    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
    {

        dump_log("连接 建立" . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
        $connection->send(json_encode('connect success'));
    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {
        dump_log("连接 关闭");
        #断线1秒后重新连接
        $connection->reConnect(1);
    }

    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        dump_log("错误: $code $msg");
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {
        dump_log("TCP服务启动了");

        //$serial = "test001"; // 设备序列号
        $server_address = "mqtt://*.*.*.*:1883"; // mqtt 服务器地址
        $user_name = "zhanghao"; // 帐号
        $pass_word = "mima"; // 密码
        $options = [
            'username' => $user_name,
            'password' => $pass_word,
            'qos'      => 1
        ];

        #这里订阅多个主题,查库,拿到所有的序列号,然后批量订阅---------------------
        //$serial_list = ['test001', 'JMC1403100077'];
        $serial_list = Db::name('r_device')->column('code');
        Cache::set('smart_home__mqtt__', $serial_list,0);
        $mqtt = new Client($server_address, $options);
        $mqtt->onConnect = function ($mqtt) use ($serial_list) {
            $count = count($serial_list);
            for ($i = 0; $i < $count; $i++) {
                $mqtt->subscribe("subTopic/socket/{$serial_list[$i]}");
                dump('订阅启动成功' . $serial_list[$i]);
            }
        };

        $mqtt->onMessage = function ($topic, $content) {
            $this->onMessage($topic, $content);
        };
        $mqtt->connect();
        $this->mqtt = $mqtt;
        $count = count($serial_list);

        #此处为动态订阅,每10秒检查一次是否有新设备,有则订阅---start
        Timer::add(10, function () {
            $new_serial_list = Db::name('r_device')->column('code');
            $cache_list = Cache::get('smart_home__mqtt__');
            $difference = array_diff($new_serial_list, $cache_list);
            dump($difference);
            if ($difference) {
                if (count($difference) > 0) {
                    $count = count($difference);
                    for ($i = 0; $i < $count; $i++) {
                        $this->mqtt->subscribe("subTopic/socket/" . $difference[$i]);
                        dump('新设备订阅启动成功' . $difference[$i]);
                    }
                }
                Cache::set('smart_home__mqtt__', $new_serial_list,0);
            }else{
                dump_log("没有发现新设备");
            }

        });
        #此处为动态订阅,每10秒检查一次是否有新设备,有则订阅---end
        for ($i = 0; $i < $count; $i++) {
            #定时发送心跳
            Timer::add(5, function () use ($mqtt, $serial_list,$i) {
                $mqtt->publish("pubTopic/socket/{$serial_list[$i]}", 'ping', ['qos'=>1]);
            });
        }
    }

    public function onWorkerReload($worker)
    {
        foreach ($worker->connections as $connection) {
            $connection->send('worker reloading');
        }
    }

}