目录
- 解决服务器重启,导致ZLMediaKit中的拉流推流被清空问题
-
- 项目背景
- 解决方案
- 代码
-
- pom.xml
- application.yml
- 拉流地址枚举
- 推流地址枚举
- 核心代码ScheduleTask
- 启动内种调用方案
- 项目部署
-
- 1.部署方式:docker部署
- 2.java项目打包
- 3.拉取java8的jre镜像
- 4.将jar打成docker镜像
- 5.启动zlm-timer容器,并设置 --restart=always 容器自启动
- 6.查看容器的日志
- 7.后续维护
解决服务器重启,导致ZLMediaKit中的拉流推流被清空问题
项目背景
接上一篇文章【实战】ZLMediaKit部署及使用
解决方案
- 确保docker服务和ZLMediaKit服务的开机自启动, 上篇文章中写了的
- 使用java项目写定时任务,取获取ZLMediaKit中的设备列表,并判断是否和预计的一致,如果不一致,则将缺失的进行重新拉流和推流
- 并且需要确保java项目是能够开启重启的,该方案使用docker的方式部署java项目从而实现此效果
代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.4</version> <relativePath/> </parent> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>zlm-timer-task</artifactId> <version>1.0.0</version> <name>zlm-timer-task</name> <description>用于定时检查zlm中的拉/推流状态,并重新拉/推流</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.6.13</spring-boot.version> <hutool.version>5.7.18</hutool.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>compile</scope> <optional>true</optional> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>${hutool.version}</version> </dependency> </dependencies> </dependencyManagement> <!-- 打包插件 --> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <executable>true</executable> </configuration> </plugin> </plugins> </build> </project>
application.yml
# 服务端口 server: port: 8881 # zlmediakit 配置 zlm: zlmUrl: http://192.168.1.180:8080 # zlm服务地址 secret: qts123 # zlm鉴权秘钥 app: live vhost: __defaultVhost__ # 定时任务配置 task: initialDelay: 5 #初始延迟时间,单位秒 delayBetweenTasks: 10 #任务执行间隔时间,单位秒 # 需要拉流/推设备 devices: hk20,hk33
拉流地址枚举
package com.example.demo.timer; import lombok.AllArgsConstructor; import lombok.Getter; /** * rtsp拉流枚举 * */ @Getter @AllArgsConstructor public enum RtspEnum { // 摄像头,根据实际情况进行定义 hk20("hk20","实际的rtsp地址","环境实验室2(3-1)"), hk33("hk33","实际的rtsp地址","环境实验室2(3-3)"), ; private final String stream; private final String url; private final String name; public static RtspEnum getByStream(String stream){ for (RtspEnum rtspEnum : RtspEnum.values()) { if(rtspEnum.getStream().equals(stream)){ return rtspEnum; } } return null; } }
推流地址枚举
package com.example.demo.timer; import lombok.AllArgsConstructor; import lombok.Getter; /** * rtmp推流枚举 */ @Getter @AllArgsConstructor public enum RtmpEnum { // 海康摄像头,stream字段与rtsp对应,根据实际情况进行定义 hk20("hk20","实际的rtmp地址","环境实验室2(3-1)"), hk33("hk33","实际的rtmp地址","环境实验室2(3-3)"), ; private final String stream; private final String url; private final String name; /** * 通过 stream获取枚举 */ public static RtmpEnum getByStream(String stream){ for (RtmpEnum rtmpEnum : RtmpEnum.values()) { if(rtmpEnum.getStream().equals(stream)){ return rtmpEnum; } } return null; } }
核心代码ScheduleTask
package com.example.demo.timer; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * 定时任务 * * @author qingtongsheng * @date 2023年03月21日 16:54 */ @Slf4j @Component public class ScheduleTask { // 需要拉/推流的设备列表 public static List<String> deviceList = new ArrayList<>(); // 设定任务执行的初始延迟时间(以秒为单位) public static long initialDelay = 5; // 设定任务执行的间隔时间(以秒为单位) public static long delayBetweenTasks = 10; // 时间单位 public static final TimeUnit TIMEUNIT = TimeUnit.SECONDS; // @Value获取yml配置 public static String zlmUrl = "http://127.0.0.1:8080"; public static String secret = "qts123"; public static String app = "live"; public static String vhost = "__defaultVhost__"; @Value("#{'${devices}'.split(',')}") public void setDeviceList(List<String> devices) { ScheduleTask.deviceList = devices; } @Value("${task.initialDelay}") public void setInitialDelay(long initialDelay) { ScheduleTask.initialDelay = initialDelay; } @Value("${task.delayBetweenTasks}") public void setDelayBetweenTasks(long delayBetweenTasks) { ScheduleTask.delayBetweenTasks = delayBetweenTasks; } @Value("${zlm.zlmUrl}") public void setZlmUrl(String zlmUrl) { ScheduleTask.zlmUrl = zlmUrl; } @Value("${zlm.secret}") public void setSecret(String secret) { ScheduleTask.secret = secret; } @Value("${zlm.app}") public void setApp(String app) { ScheduleTask.app = app; } @Value("${zlm.vhost}") public void setVhost(String vhost) { ScheduleTask.vhost = vhost; } public static void task() { // 创建一个 ScheduledExecutorService 对象 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); // 创建一个 Runnable 任务 Runnable task = () -> { // 计算程序执行耗时 long startTime = System.currentTimeMillis(); // 执行任务 handleCommand(); long endTime = System.currentTimeMillis(); log.info("任务执行耗时:{} ms" , (endTime - startTime)); }; // 使用 ScheduledExecutorService 对象的 scheduleWithFixedDelay 方法执行任务 scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delayBetweenTasks, TIMEUNIT); shutdownThread(scheduledExecutorService); } /** * 调用 zlm api获取设备列表 * * @return */ private static String getMediaList() { System.out.println("zlmUrl => "+zlmUrl); String url = zlmUrl + "/index/api/getMediaList"; HashMap<String, Object> params = new HashMap<>(); params.put("secret", secret); params.put("schema", "rtmp"); params.put("app", app); params.put("vhost", vhost); return HttpUtil.get(url, params,5000); } /** * 任务执行代码 */ private static void handleCommand() { // 获取线上设备列表 String body; try { body = getMediaList(); } catch (Exception e) { // 访问zlm失败,打印错误信息,可能是服务器未启动完成 log.warn("访问zlm失败,可能是服务器未启动完成:{}", e.getMessage()); return; } JSONObject jsonObject = JSONUtil.parseObj(body); Integer code = jsonObject.getInt("code"); if (code != 0) { // 调用失败,打印错误信息 log.info("获取设备列表失败,错误信息: code = {},msg = {}", jsonObject.getInt("code"),jsonObject.getStr("msg")); return; } // 调用成功 JSONArray data = jsonObject.getJSONArray("data"); // zlm中的所有设备ID列表 List<String> streamIdList = new ArrayList<>(); // 没有推流的流ID列表 Set<String> notOutputStreamIdList = new HashSet<>(); if (data != null) { // 得到所有设备ID streamIdList = data.stream().map(item -> ((JSONObject) item).getStr("stream")).collect(Collectors.toList()); // 得到没有推流的流ID,即过滤data得到readerCount为0的流ID notOutputStreamIdList = data.stream() .filter(item -> ((JSONObject) item).getInt("readerCount") == 0) .map(item -> ((JSONObject) item).getStr("stream")) .collect(Collectors.toSet()); } log.info("当前设备列表: {}",streamIdList); if (!notOutputStreamIdList.isEmpty()) { log.info("需要重新推流的流ID列表: {}", notOutputStreamIdList); // 将未推流的流ID进行推流处理 for (String streamId : notOutputStreamIdList) { // 推流处理 RtmpEnum rtmpEnum = RtmpEnum.getByStream(streamId); String pushData = pushStream(rtmpEnum); JSONObject pushObject = JSONUtil.parseObj(pushData); Integer pushCode = pushObject.getInt("code"); if (pushCode != 0) { log.warn("推流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtmpEnum.getName(), pushCode,pushObject.getStr("msg")); } else { log.info("推流成功: 设备名称 = 【{}】", rtmpEnum.getName()); } } log.info("重新推流完成"); } if (streamIdList.size() >= deviceList.size()) { log.info("设备列表个数正常, 不需要重新拉流"); return; } log.info("设备列表缺失,开始重新拉流/推流"); // 需要对缺失的进行重新拉,并重新推流 for (String stream : deviceList) { // 将不在设备列表中的设备,重新拉流,并推流 if (!streamIdList.contains(stream)) { RtspEnum rtspEnum = RtspEnum.getByStream(stream); if (rtspEnum == null) { continue; } try { // 拉流 String pullData = pullStream(rtspEnum); JSONObject pullObject = JSONUtil.parseObj(pullData); Integer pushCode = pullObject.getInt("code"); if (pushCode != 0) { log.warn("重新拉流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtspEnum.getName(), pushCode,pullObject.getStr("msg")); } else { log.info("重新拉流完成: 设备名称 = 【{}】", rtspEnum.getName()); // 等待拉流完成 Thread.sleep(2000); } } catch (Exception e) { // 重新拉流失败 log.error("重新拉流失败:{} , 错误信息 {}", rtspEnum.getName(),e.getMessage()); } try { // 推流 String pushData = pushStream(RtmpEnum.getByStream(rtspEnum.getStream())); JSONObject pushObject = JSONUtil.parseObj(pushData); Integer pushCode = pushObject.getInt("code"); if (pushCode != 0) { log.warn("重新推流失败: 设备名称 = 【{}】, 错误码 = 【{}】,msg = 【{}】", rtspEnum.getName(), pushCode,pushObject.getStr("msg")); } else { log.info("重新推流完成: 设备名称 = 【{}】", rtspEnum.getName()); // 等待推流完成 Thread.sleep(2000); } } catch (Exception e) { // 重新推流失败 log.error("重新推流失败:{}, 错误信息 {}", rtspEnum.getName(),e.getMessage()); } } } log.info("结束重新拉流/推流"); } /** * 调用 zlm api 拉流 * * @param rtspEnum 拉流地址枚举 */ private static String pullStream(RtspEnum rtspEnum) { String url = zlmUrl + "/index/api/addStreamProxy"; HashMap<String, Object> params = new HashMap<>(); params.put("secret", secret); params.put("vhost", vhost); params.put("app", app); params.put("stream", rtspEnum.getStream()); params.put("url", rtspEnum.getUrl()); return HttpUtil.get(url, params); } /** * 调用 zlm api 推流 */ private static String pushStream(RtmpEnum rtmpEnum) { if (rtmpEnum == null) { throw new RuntimeException("推流地址不能为空"); } String url = zlmUrl + "/index/api/addStreamPusherProxy"; HashMap<String, Object> params = new HashMap<>(); params.put("secret", secret); params.put("schema", "rtmp"); params.put("vhost", vhost); params.put("app", app); params.put("stream", rtmpEnum.getStream()); params.put("dst_url", rtmpEnum.getUrl()); return HttpUtil.get(url, params); } /** * 程序结束时关闭线程池 * * @param scheduledExecutorService */ private static void shutdownThread(ScheduledExecutorService scheduledExecutorService) { // 关闭 ScheduledExecutorService 对象 Thread shutdownThread = new Thread(() -> { System.out.println("正在关闭定时任务线程..."); scheduledExecutorService.shutdown(); System.out.println("定时任务线程已经关闭..."); System.exit(0); }); // 进程结束的钩子 Runtime.getRuntime().addShutdownHook(shutdownThread); } }
启动内种调用方案
package com.example.demo; import com.example.demo.timer.ScheduleTask; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); // 调用定时任务 ScheduleTask.task(); System.out.println("定时任务已启动"); } }
项目部署
1.部署方式:docker部署
Dockerfile
#设置镜像基础: jdk8-jre , 比jdk内存小 FROM java:openjdk-8u111-jre-alpine #维护人员信息 MAINTAINER qingtongsheng #设置对外暴露端口 多个端口空格分割 EXPOSE 8881 # VOLUME /tmp # 复制jar到容器中,并重命名 ADD zlm-timer-task-1.0.0.jar app.jar # 复制配置文件到容器中 ADD config/application.yml config/application.yml #执行启动命令 ENTRYPOINT ["java","-Dserver.port=8881","-Dspring.config.location=/config/application.yml","-jar","/app.jar"]
2.java项目打包
2.1 将jar包 和 Dockerfile 上传到linux的 /home/docker/timer 路径下 2.2 将application.yml配置文件放到 /home/docker/timer/config 路径下
3.拉取java8的jre镜像
命令执行: docker pull java:openjdk-8u111-jre-alpine
4.将jar打成docker镜像
4.1 进入/home/docker/timer目录 cd /home/docker/timer 4.2 打镜像 docker build -t zlm-timer . (注意后面的点.需要带上)
5.启动zlm-timer容器,并设置 --restart=always 容器自启动
5.0 docker自启动 systemctl enable docker.service 5.1 docker run -it -d --name timer --restart=always -p 8881:8881 -v /home/docker/timer/config:/config zlm-timer /bin/bash
6.查看容器的日志
6.1 查看容器控制台日志,看最后20行,一直输出 docker logs -f --tail=20 timer
7.后续维护
7.1 修改配置: /home/docker/timer/config中的application.yml 7.2 再重启容器 docker restart timer 修改容器自动重启 docker update --restart=always timer