实战解决服务器重启,导致ZLMediaKit中的拉流推流被清空问题

目录

  • 解决服务器重启,导致ZLMediaKit中的拉流推流被清空问题
    • 项目背景
    • 解决方案
    • 代码
      • pom.xml
      • application.yml
      • 拉流地址枚举
      • 推流地址枚举
      • 核心代码ScheduleTask
      • 启动内种调用方案
    • 项目部署
      • 1.部署方式:docker部署
      • 2.java项目打包
      • 3.拉取java8的jre镜像
      • 4.将jar打成docker镜像
      • 5.启动zlm-timer容器,并设置 --restart=always 容器自启动
      • 6.查看容器的日志
      • 7.后续维护

解决服务器重启,导致ZLMediaKit中的拉流推流被清空问题

项目背景

接上一篇文章【实战】ZLMediaKit部署及使用

解决方案

  1. 确保docker服务和ZLMediaKit服务的开机自启动, 上篇文章中写了的
  2. 使用java项目写定时任务,取获取ZLMediaKit中的设备列表,并判断是否和预计的一致,如果不一致,则将缺失的进行重新拉流和推流
  3. 并且需要确保java项目是能够开启重启的,该方案使用docker的方式部署java项目从而实现此效果

代码

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>zlm-timer-task</artifactId>
    <version>1.0.0</version>
    <name>zlm-timer-task</name>
    <description>用于定时检查zlm中的拉/推流状态,并重新拉/推流</description>


    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
        <hutool.version>5.7.18</hutool.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>${hutool.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <!-- 打包插件 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <executable>true</executable>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.yml

# 服务端口
server:
  port: 8881
# zlmediakit 配置
zlm:
  zlmUrl: http://192.168.1.180:8080 # zlm服务地址
  secret: qts123 # zlm鉴权秘钥
  app: live
  vhost: __defaultVhost__
# 定时任务配置
task:
  initialDelay: 5 #初始延迟时间,单位秒
  delayBetweenTasks: 10 #任务执行间隔时间,单位秒
# 需要拉流/推设备
devices: hk20,hk33

拉流地址枚举

package com.example.demo.timer;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * rtsp拉流枚举
 * 
 */
@Getter
@AllArgsConstructor
public enum RtspEnum {
    // 摄像头,根据实际情况进行定义
    hk20("hk20","实际的rtsp地址","环境实验室2(3-1)"),
    hk33("hk33","实际的rtsp地址","环境实验室2(3-3)"),
    ;
    private final String stream;
    private final String url;
    private final String name;

    public static RtspEnum getByStream(String stream){
        for (RtspEnum rtspEnum : RtspEnum.values()) {
            if(rtspEnum.getStream().equals(stream)){
                return rtspEnum;
            }
        }
        return null;
    }
}

推流地址枚举

package com.example.demo.timer;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 *  rtmp推流枚举
 */
@Getter
@AllArgsConstructor
public enum RtmpEnum {
   
    // 海康摄像头,stream字段与rtsp对应,根据实际情况进行定义
    hk20("hk20","实际的rtmp地址","环境实验室2(3-1)"),
    hk33("hk33","实际的rtmp地址","环境实验室2(3-3)"),
    
    ;
    private final String stream;
    private final String url;
    private final String name;

    /**
     * 通过 stream获取枚举
     */
    public static RtmpEnum getByStream(String stream){
        for (RtmpEnum rtmpEnum : RtmpEnum.values()) {
            if(rtmpEnum.getStream().equals(stream)){
                return rtmpEnum;
            }
        }
        return null;
    }

}

核心代码ScheduleTask

package com.example.demo.timer;

import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 定时任务
 *
 * @author qingtongsheng
 * @date 2023年03月21日 16:54
 */
@Slf4j
@Component
public class ScheduleTask {
    // 需要拉/推流的设备列表
    public static List<String> deviceList = new ArrayList<>();
    // 设定任务执行的初始延迟时间(以秒为单位)
    public static long initialDelay = 5;
    // 设定任务执行的间隔时间(以秒为单位)
    public static long delayBetweenTasks = 10;
    // 时间单位
    public static final TimeUnit TIMEUNIT = TimeUnit.SECONDS;
    // @Value获取yml配置
    public static String zlmUrl = "http://127.0.0.1:8080";
    public static String secret = "qts123";
    public static String app = "live";
    public static String vhost = "__defaultVhost__";

    @Value("#{'${devices}'.split(',')}")
    public void setDeviceList(List<String> devices) {
        ScheduleTask.deviceList = devices;
    }
    @Value("${task.initialDelay}")
    public void setInitialDelay(long initialDelay) {
        ScheduleTask.initialDelay = initialDelay;
    }
    @Value("${task.delayBetweenTasks}")
    public void setDelayBetweenTasks(long delayBetweenTasks) {
        ScheduleTask.delayBetweenTasks = delayBetweenTasks;
    }
    @Value("${zlm.zlmUrl}")
    public void setZlmUrl(String zlmUrl) {
        ScheduleTask.zlmUrl = zlmUrl;
    }
    @Value("${zlm.secret}")
    public void setSecret(String secret) {
        ScheduleTask.secret = secret;
    }
    @Value("${zlm.app}")
    public void setApp(String app) {
        ScheduleTask.app = app;
    }
    @Value("${zlm.vhost}")
    public void setVhost(String vhost) {
        ScheduleTask.vhost = vhost;
    }


    public static void task() {
        // 创建一个 ScheduledExecutorService 对象
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // 创建一个 Runnable 任务
        Runnable task = () -> {
            // 计算程序执行耗时
            long startTime = System.currentTimeMillis();
            // 执行任务
            handleCommand();
            long endTime = System.currentTimeMillis();
            log.info("任务执行耗时:{} ms" , (endTime - startTime));

        };

        // 使用 ScheduledExecutorService 对象的 scheduleWithFixedDelay 方法执行任务
        scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delayBetweenTasks, TIMEUNIT);

        shutdownThread(scheduledExecutorService);
    }

    /**
     * 调用 zlm api获取设备列表
     *
     * @return
     */
    private static String getMediaList() {
        System.out.println("zlmUrl => "+zlmUrl);
        String url = zlmUrl + "/index/api/getMediaList";
        HashMap<String, Object> params = new HashMap<>();
        params.put("secret", secret);
        params.put("schema", "rtmp");
        params.put("app", app);
        params.put("vhost", vhost);
        return HttpUtil.get(url, params,5000);
    }

    /**
     * 任务执行代码
     */
    private static void handleCommand() {
        // 获取线上设备列表
        String body;
        try {
            body = getMediaList();
        } catch (Exception e) {
            // 访问zlm失败,打印错误信息,可能是服务器未启动完成
            log.warn("访问zlm失败,可能是服务器未启动完成:{}", e.getMessage());
            return;
        }
        JSONObject jsonObject = JSONUtil.parseObj(body);
        Integer code = jsonObject.getInt("code");
        if (code != 0) {
            // 调用失败,打印错误信息
            log.info("获取设备列表失败,错误信息: code = {},msg = {}", jsonObject.getInt("code"),jsonObject.getStr("msg"));
            return;
        }

        // 调用成功
        JSONArray data = jsonObject.getJSONArray("data");
        // zlm中的所有设备ID列表
        List<String> streamIdList = new ArrayList<>();
        // 没有推流的流ID列表
        Set<String> notOutputStreamIdList = new HashSet<>();
        if (data != null) {
            // 得到所有设备ID
            streamIdList = data.stream().map(item -> ((JSONObject) item).getStr("stream")).collect(Collectors.toList());

            // 得到没有推流的流ID,即过滤data得到readerCount为0的流ID
            notOutputStreamIdList = data.stream()
                    .filter(item -> ((JSONObject) item).getInt("readerCount") == 0)
                    .map(item -> ((JSONObject) item).getStr("stream"))
                    .collect(Collectors.toSet());
        }
        log.info("当前设备列表: {}",streamIdList);

        if (!notOutputStreamIdList.isEmpty()) {
            log.info("需要重新推流的流ID列表: {}", notOutputStreamIdList);
            // 将未推流的流ID进行推流处理
            for (String streamId : notOutputStreamIdList) {
                // 推流处理
                RtmpEnum rtmpEnum = RtmpEnum.getByStream(streamId);
                String pushData = pushStream(rtmpEnum);
                JSONObject pushObject = JSONUtil.parseObj(pushData);
                Integer pushCode = pushObject.getInt("code");
                if (pushCode != 0) {
                    log.warn("推流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtmpEnum.getName(), pushCode,pushObject.getStr("msg"));
                } else {
                    log.info("推流成功: 设备名称 = 【{}】", rtmpEnum.getName());
                }
            }
            log.info("重新推流完成");
        }

        if (streamIdList.size() >= deviceList.size()) {
            log.info("设备列表个数正常, 不需要重新拉流");
            return;
        }

        log.info("设备列表缺失,开始重新拉流/推流");
        // 需要对缺失的进行重新拉,并重新推流
        for (String stream : deviceList) {
            // 将不在设备列表中的设备,重新拉流,并推流
            if (!streamIdList.contains(stream)) {
                RtspEnum rtspEnum = RtspEnum.getByStream(stream);
                if (rtspEnum == null) {
                    continue;
                }
                try {
                    // 拉流
                    String pullData = pullStream(rtspEnum);
                    JSONObject pullObject = JSONUtil.parseObj(pullData);
                    Integer pushCode = pullObject.getInt("code");
                    if (pushCode != 0) {
                        log.warn("重新拉流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtspEnum.getName(), pushCode,pullObject.getStr("msg"));
                    } else {
                        log.info("重新拉流完成: 设备名称 = 【{}】", rtspEnum.getName());
                        // 等待拉流完成
                        Thread.sleep(2000);
                    }
                } catch (Exception e) {
                    // 重新拉流失败
                    log.error("重新拉流失败:{} , 错误信息 {}", rtspEnum.getName(),e.getMessage());
                }
                try {
                    // 推流
                    String pushData = pushStream(RtmpEnum.getByStream(rtspEnum.getStream()));
                    JSONObject pushObject = JSONUtil.parseObj(pushData);
                    Integer pushCode = pushObject.getInt("code");
                    if (pushCode != 0) {
                        log.warn("重新推流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtspEnum.getName(), pushCode,pushObject.getStr("msg"));
                    } else {
                        log.info("重新推流完成: 设备名称 = 【{}】", rtspEnum.getName());
                        // 等待推流完成
                        Thread.sleep(2000);
                    }
                } catch (Exception e) {
                    // 重新推流失败
                    log.error("重新推流失败:{}, 错误信息 {}", rtspEnum.getName(),e.getMessage());
                }
            }
        }

        log.info("结束重新拉流/推流");
    }

    /**
     * 调用 zlm api 拉流
     *
     * @param rtspEnum 拉流地址枚举
     */
    private static String pullStream(RtspEnum rtspEnum) {
        String url = zlmUrl + "/index/api/addStreamProxy";
        HashMap<String, Object> params = new HashMap<>();
        params.put("secret", secret);
        params.put("vhost", vhost);
        params.put("app", app);
        params.put("stream", rtspEnum.getStream());
        params.put("url", rtspEnum.getUrl());
        return HttpUtil.get(url, params);
    }

    /**
     * 调用 zlm api 推流
     */
    private static String pushStream(RtmpEnum rtmpEnum) {
        if (rtmpEnum == null) {
            throw new RuntimeException("推流地址不能为空");
        }
        String url = zlmUrl + "/index/api/addStreamPusherProxy";
        HashMap<String, Object> params = new HashMap<>();
        params.put("secret", secret);
        params.put("schema", "rtmp");
        params.put("vhost", vhost);
        params.put("app", app);
        params.put("stream", rtmpEnum.getStream());
        params.put("dst_url", rtmpEnum.getUrl());
        return HttpUtil.get(url, params);
    }

    /**
     * 程序结束时关闭线程池
     *
     * @param scheduledExecutorService
     */
    private static void shutdownThread(ScheduledExecutorService scheduledExecutorService) {
        // 关闭 ScheduledExecutorService 对象
        Thread shutdownThread = new Thread(() -> {
            System.out.println("正在关闭定时任务线程...");
            scheduledExecutorService.shutdown();
            System.out.println("定时任务线程已经关闭...");
            System.exit(0);
        });
        // 进程结束的钩子
        Runtime.getRuntime().addShutdownHook(shutdownThread);
    }
}

启动内种调用方案

package com.example.demo;

import com.example.demo.timer.ScheduleTask;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
        // 调用定时任务
        ScheduleTask.task();
        System.out.println("定时任务已启动");
    }
}

项目部署

1.部署方式:docker部署

Dockerfile

#设置镜像基础: jdk8-jre , 比jdk内存小
FROM java:openjdk-8u111-jre-alpine
#维护人员信息
MAINTAINER qingtongsheng
#设置对外暴露端口 多个端口空格分割
EXPOSE 8881
# VOLUME /tmp
# 复制jar到容器中,并重命名
ADD zlm-timer-task-1.0.0.jar app.jar
# 复制配置文件到容器中
ADD config/application.yml config/application.yml

#执行启动命令
ENTRYPOINT ["java","-Dserver.port=8881","-Dspring.config.location=/config/application.yml","-jar","/app.jar"]

2.java项目打包

2.1 将jar包 和 Dockerfile 上传到linux的 /home/docker/timer 路径下
2.2 将application.yml配置文件放到 /home/docker/timer/config 路径下

3.拉取java8的jre镜像

命令执行:  docker pull java:openjdk-8u111-jre-alpine

4.将jar打成docker镜像

4.1 进入/home/docker/timer目录  cd /home/docker/timer
4.2 打镜像  docker build -t zlm-timer .    (注意后面的点.需要带上)

5.启动zlm-timer容器,并设置 --restart=always 容器自启动

5.0 docker自启动  systemctl enable docker.service
5.1 docker run -it -d --name timer --restart=always -p 8881:8881 -v /home/docker/timer/config:/config zlm-timer /bin/bash

6.查看容器的日志

6.1 查看容器控制台日志,看最后20行,一直输出   docker logs -f --tail=20 timer

7.后续维护

7.1 修改配置: /home/docker/timer/config中的application.yml
7.2 再重启容器  docker restart timer
修改容器自动重启 docker update --restart=always timer