485 lines
21 KiB
C#
485 lines
21 KiB
C#
using System;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using System.Collections.Generic;
|
||
using FATrace.Com;
|
||
using FATrace.Model;
|
||
using FATrace.HKNetLib.Wrapper;
|
||
using NLog;
|
||
using FATrace.OEMApp;
|
||
using System.IO;
|
||
using TaskStatus = FATrace.Model.TaskStatus;
|
||
|
||
namespace FATrace.OEMApp.Services
|
||
{
|
||
/// <summary>
|
||
/// 下载任务队列(单例)。
|
||
/// 负责:
|
||
/// 1. 将按钮入队的下载请求持久化为 DownloadTask(Pending)
|
||
/// 2. 后台循环按顺序逐条下载(同一时刻仅一个下载 Running)
|
||
/// 3. 根据海康 SDK 进度事件回写 DownloadTask.Progress,便于 UI/调试观察
|
||
/// 4. 下载完成后创建 OEMRawUse 与 JellyfinMonitorTask,为后续 Jellyfin 监听做准备
|
||
/// 5. 异常失败时写入错误并标记 Failed,循环不中断
|
||
/// </summary>
|
||
public sealed class DownloadTaskWorker
|
||
{
|
||
private static readonly Lazy<DownloadTaskWorker> _lazy = new(() => new DownloadTaskWorker());
|
||
public static DownloadTaskWorker Instance => _lazy.Value;
|
||
|
||
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
|
||
private readonly SemaphoreSlim _singleRunner = new(1, 1); // 保证同一时间只有一个下载
|
||
private CancellationTokenSource? _cts; // 后台主循环的取消令牌(停止服务时发出)
|
||
private Task? _loopTask; // 后台主循环 Task
|
||
|
||
private HkCamera? _hk; // 供下载与事件回调使用的海康客户端(由 UI 注入)
|
||
|
||
// 当前下载文件名变更事件(用于 UI 显示)
|
||
public event Action<string>? DownloadFileNameChanged;
|
||
private void RaiseDownloadFileName(string name)
|
||
{
|
||
try { DownloadFileNameChanged?.Invoke(name); } catch { }
|
||
}
|
||
|
||
// 任务状态事件(用于按需刷新 gridRULog)
|
||
public event Action<DownloadTask>? TaskStarted;
|
||
public event Action<DownloadTask>? TaskCompleted;
|
||
public event Action<DownloadTask, string?>? TaskFailed;
|
||
private void RaiseTaskStarted(DownloadTask t) { try { TaskStarted?.Invoke(t); } catch { } }
|
||
private void RaiseTaskCompleted(DownloadTask t) { try { TaskCompleted?.Invoke(t); } catch { } }
|
||
private void RaiseTaskFailed(DownloadTask t, string? err) { try { TaskFailed?.Invoke(t, err); } catch { } }
|
||
|
||
/// <summary>
|
||
/// 下载的视频时间
|
||
/// </summary>
|
||
public static int VideoTime { get; set; }
|
||
|
||
private DownloadTaskWorker()
|
||
{
|
||
VideoTime = ConfigHelper.GetIntOrDefault("VideoTime", 30);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动下载队列后台循环。
|
||
/// </summary>
|
||
/// <param name="hk">已初始化的 HkCamera 客户端,用于发起下载与接收事件。</param>
|
||
/// <remarks>
|
||
/// - 该方法幂等,重复调用不会重复启动。
|
||
/// - 启动时会将上次异常退出时处于 Running 的任务重置为 Pending,避免任务丢失。
|
||
/// </remarks>
|
||
public void Start(HkCamera hk)
|
||
{
|
||
_hk = hk ?? throw new ArgumentNullException(nameof(hk));
|
||
if (_cts != null) return; // 已启动
|
||
// 恢复上次未完成的运行中任务为待处理,然后再循环执行
|
||
try
|
||
{
|
||
var db = FSqlContext.FDb;
|
||
// 将异常退出时处于 Running 的任务回滚为 Pending,避免卡住队列
|
||
db.Update<DownloadTask>()
|
||
.Set(a => a.Status, TaskStatus.Pending)
|
||
.Set(a => a.Progress, 0)
|
||
.Set(a => a.UpdateTime, DateTime.Now)
|
||
.Where(a => a.Status == TaskStatus.Running)
|
||
.ExecuteAffrows();
|
||
|
||
// 将失败的任务按最大重试次数重置为 Pending,程序重启后可自动再次尝试
|
||
// 最大重试次数从配置读取(appSettings: DownloadTaskMaxRetry,默认 3)
|
||
var maxRetry = ConfigHelper.GetIntOrDefault("DownloadTaskMaxRetry", 3);
|
||
db.Update<DownloadTask>()
|
||
.Set(a => a.Status, TaskStatus.Pending)
|
||
.Set(a => a.Error, "")
|
||
.Set(a => a.Progress, 0)
|
||
.Set(a => a.UpdateTime, DateTime.Now)
|
||
.Where(a => a.Status == TaskStatus.Failed && a.TryCount < maxRetry)
|
||
.ExecuteAffrows();
|
||
}
|
||
catch { }
|
||
_cts = new CancellationTokenSource();
|
||
//开始后台主循环
|
||
_loopTask = Task.Run(() => RunAsync(_cts.Token));
|
||
_logger.Info("[DownloadTaskWorker] 已启动");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 停止下载队列后台循环(尽力而为)。
|
||
/// </summary>
|
||
public void Stop()
|
||
{
|
||
try
|
||
{
|
||
// 发出取消信号,RunAsync 将尽快退出;
|
||
// 正在处理的任务会在 token 取消时尝试停止 SDK 下载
|
||
_cts?.Cancel();
|
||
}
|
||
catch { }
|
||
finally
|
||
{
|
||
_cts = null;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 入队一个下载任务(同时持久化到数据库)。
|
||
/// </summary>
|
||
/// <param name="code">业务条码/编号,关联 OEMRawUse.Code 与 VideoAction.Code。</param>
|
||
/// <param name="rawName">原料名称。</param>
|
||
/// <param name="user">操作用户。</param>
|
||
/// <param name="start">NVR 下载开始时间(默认当前时间-30 秒)。</param>
|
||
/// <param name="end">NVR 下载结束时间(默认当前时间)。</param>
|
||
/// <param name="rawCode">原料条码(可选)。未提供时默认等于 code。</param>
|
||
/// <returns>新增 DownloadTask 的自增 Id。</returns>
|
||
public long Enqueue(ParsedCodeInfo parsedCodeInfo, string user, DateTime? start = null, DateTime? end = null)
|
||
{
|
||
if (parsedCodeInfo == null) throw new ArgumentException("code 不能为空");
|
||
if (string.IsNullOrWhiteSpace(parsedCodeInfo.Code)) throw new ArgumentException("code 不能为空");
|
||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("user 不能为空");
|
||
|
||
var now = DateTime.Now;
|
||
// 构造持久化任务:
|
||
// - 默认回溯 30 秒(符合项目规则)
|
||
// - rawCode 未提供时回落到 code,避免非空列插入失败
|
||
var task = new DownloadTask
|
||
{
|
||
Code = parsedCodeInfo.Code,
|
||
RawName = parsedCodeInfo.RawName,
|
||
RawCode = parsedCodeInfo.RawCode,
|
||
User = user,
|
||
Status = TaskStatus.Pending,
|
||
Progress = 0,
|
||
NvrStartTime = now,
|
||
NvrEndTime = now.AddSeconds(VideoTime),
|
||
CreateTime = now,
|
||
UpdateTime = now
|
||
};
|
||
|
||
// 入库返回自增 Id,便于 UI 提示与后续跟踪
|
||
var id = FSqlContext.FDb.Insert<DownloadTask>(task).ExecuteIdentity();
|
||
_logger.Info("[DownloadTaskWorker] 入队 DownloadTask: Id={Id}, Code={Code}", id, parsedCodeInfo.Code);
|
||
return id;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 后台主循环:定期拉取最早的 Pending 任务并顺序处理
|
||
/// </summary>
|
||
/// <param name="token">取消令牌</param>
|
||
private async Task RunAsync(CancellationToken token)
|
||
{
|
||
while (!token.IsCancellationRequested)
|
||
{
|
||
DownloadTask? next = null;
|
||
try
|
||
{
|
||
await _singleRunner.WaitAsync(token);
|
||
try
|
||
{
|
||
var db = FSqlContext.FDb;
|
||
next = await db.Select<DownloadTask>()
|
||
.Where(t => t.Status == TaskStatus.Pending && t.NvrEndTime < DateTime.Now.AddSeconds(5))
|
||
.OrderBy(t => t.Id)
|
||
.FirstAsync();
|
||
|
||
if (next != null)
|
||
{
|
||
await ProcessTaskAsync(next, token);
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_singleRunner.Release();
|
||
}
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
break;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.Error(ex, "[DownloadTaskWorker] 主循环异常");
|
||
}
|
||
|
||
if (next == null)
|
||
{
|
||
try { await Task.Delay(5000, token); } catch { }
|
||
}
|
||
}
|
||
_logger.Info("[DownloadTaskWorker] 已停止");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 当前处理中的 DownloadTask
|
||
/// </summary>
|
||
public DownloadTask CurDownloadTask { get; set; }
|
||
|
||
/// <summary>
|
||
/// 实际执行单个下载任务:
|
||
/// 1) 标记 Running,生成保存路径
|
||
/// 2) 订阅 SDK 事件以回写进度
|
||
/// 3) 发起下载并等待完成
|
||
/// 4) 下载成功 -> 写 OEMRawUse 与 JellyfinMonitorTask
|
||
/// </summary>
|
||
/// <param name="t">待处理的 DownloadTask。</param>
|
||
/// <param name="token">取消令牌。</param>
|
||
private async Task ProcessTaskAsync(DownloadTask downloadTask, CancellationToken token)
|
||
{
|
||
if (_hk == null)
|
||
{
|
||
_logger.Warn("[DownloadTaskWorker] HkCamera 未初始化,跳过任务 Id={Id}", downloadTask.Id);
|
||
return;
|
||
}
|
||
|
||
CurDownloadTask = downloadTask;
|
||
|
||
var db = FSqlContext.FDb;
|
||
|
||
// 步骤1:状态入库(Running/TryCount/Progress/UpdateTime)
|
||
// 标记运行中
|
||
downloadTask.Status = TaskStatus.Running;
|
||
downloadTask.TryCount += 1;
|
||
downloadTask.Progress = 0;
|
||
downloadTask.UpdateTime = DateTime.Now;
|
||
await db.Update<DownloadTask>()
|
||
.SetSource(downloadTask)
|
||
.UpdateColumns(a => new { a.Status, a.TryCount, a.Progress, a.UpdateTime })
|
||
.Where(a => a.Id == downloadTask.Id)
|
||
.ExecuteAffrowsAsync();
|
||
|
||
// 通知任务开始
|
||
RaiseTaskStarted(downloadTask);
|
||
|
||
// 步骤2:生成保存路径(含安全文件名),并确保保存目录存在
|
||
// 生成本地文件名/路径
|
||
var saveBase = _hk.NVRVideoSavePath;
|
||
if (string.IsNullOrWhiteSpace(saveBase))
|
||
{
|
||
_logger.Error("[DownloadTaskWorker] NVRVideoSavePath 为空,无法下载。任务 Id={Id}", downloadTask.Id);
|
||
await MarkFailedAsync(downloadTask, "NVRVideoSavePath 未配置");
|
||
return;
|
||
}
|
||
var filePath = NVRCom.GetVideoPathName(saveBase, downloadTask.Code!);
|
||
// 确保保存目录存在,避免 SDK 写文件失败
|
||
|
||
|
||
try
|
||
{
|
||
var dir = Path.GetDirectoryName(filePath);
|
||
if (!string.IsNullOrWhiteSpace(dir) && !Directory.Exists(dir))
|
||
{
|
||
Directory.CreateDirectory(dir);
|
||
}
|
||
}
|
||
catch { }
|
||
await db.Update<DownloadTask>()
|
||
.Set(a => a.VideoFilePath, filePath)
|
||
.Set(a => a.UpdateTime, DateTime.Now)
|
||
.Where(a => a.Id == downloadTask.Id)
|
||
.ExecuteAffrowsAsync();
|
||
|
||
//当前的下载路径
|
||
downloadTask.VideoFilePath = filePath;
|
||
|
||
// 通知 UI 当前下载文件名
|
||
RaiseDownloadFileName(Path.GetFileName(filePath) ?? string.Empty);
|
||
|
||
// 步骤3:订阅 SDK 事件 -> TCS 转换
|
||
// 事件 -> TCS
|
||
// 订阅两个事件:进度与完成。进度事件写回数据库;完成事件用于唤醒等待
|
||
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||
|
||
void OnProgress(object? s, short p)
|
||
{
|
||
try
|
||
{
|
||
//实时更新下载进度
|
||
|
||
//db.Update<DownloadTask>()
|
||
// .Set(a => a.Progress, Math.Max((short)0, Math.Min((short)100, p)))
|
||
// .Set(a => a.UpdateTime, DateTime.Now)
|
||
// .Where(a => a.Id == t.Id)
|
||
// .ExecuteAffrows();
|
||
}
|
||
catch { }
|
||
}
|
||
void OnComplete(object? s, string msg)
|
||
{
|
||
try
|
||
{
|
||
tcs.TrySetResult(msg);
|
||
}
|
||
catch { }
|
||
}
|
||
|
||
|
||
_hk.NVRLoadVideoProcessEventHandler += OnProgress;
|
||
_hk.NVRLoadVideoCompleteEventHandler += OnComplete;
|
||
|
||
// 步骤4:调用 SDK 按时间下载,并等待完成事件(含超时/取消处理)
|
||
try
|
||
{
|
||
// 发起下载(按时间范围)
|
||
var res = _hk.Sdk_NET_DVR_GetFileByTime_V40(downloadTask.NvrStartTime, downloadTask.NvrEndTime, filePath);
|
||
if (!res.Result)
|
||
{
|
||
await MarkFailedAsync(downloadTask, res.Msg);
|
||
return;
|
||
}
|
||
|
||
//下载过程中的监控事件
|
||
// 计算这次下载的时间窗口长度(秒),用于推导合理的超时
|
||
var timeWindowSec = Math.Max(1, (int)(downloadTask.NvrEndTime - downloadTask.NvrStartTime).TotalSeconds);
|
||
|
||
// 计算总超时秒数:优先读配置 DownloadTaskTimeoutSeconds;
|
||
// 如果未配置,用时间窗口 * 3 的经验值,并限制在 [120, 1800] 区间,防止过短/过长。
|
||
var timeoutSec = ConfigHelper.GetIntOrDefault(
|
||
"DownloadTaskTimeoutSeconds",
|
||
Math.Max(120, Math.Min(1800, timeWindowSec * 3))
|
||
);
|
||
|
||
// completed 代表“下载完成”这个事件(由 tcs.TrySetResult 在回调里触发)
|
||
Task completed = tcs.Task;
|
||
|
||
// timeoutTask 是“超时定时器”,超时时间到会完成;同时受 token 取消影响
|
||
Task timeoutTask = Task.Delay(TimeSpan.FromSeconds(timeoutSec), token);
|
||
|
||
// 当外部取消(Stop/退出)时,主动让 tcs 进入取消态,避免一直等待 completed
|
||
using (token.Register(() =>
|
||
{
|
||
try { tcs.TrySetCanceled(); } catch { }
|
||
}))
|
||
{
|
||
// 等待“完成事件”或“超时”二者之一先发生
|
||
var finished = await Task.WhenAny(completed, timeoutTask);
|
||
|
||
// 如果先等到了 timeoutTask,说明超时发生
|
||
if (finished == timeoutTask)
|
||
{
|
||
// 尽力停止 SDK 的下载(防止后台还在跑)
|
||
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
|
||
// 将任务标记为失败(原因是超时),写入数据库状态
|
||
await MarkFailedAsync(downloadTask, $"下载超时({timeoutSec}s)");
|
||
return; // 结束本次任务处理
|
||
}
|
||
}
|
||
|
||
// 能走到这里,表示 completed 先发生(下载完成事件被触发)
|
||
// 读取回调携带的完成消息(有些 SDK 会返回提示文本,例如“下载完成”)
|
||
var completeMsg = await ((Task<string>)tcs.Task);
|
||
|
||
// 简单的成功判断:消息非空,并包含“完成”字样就算成功
|
||
var succeed = !string.IsNullOrWhiteSpace(completeMsg) && completeMsg.Contains("完成");
|
||
|
||
// 若判断不通过,视为失败,尽力停止下载并入库失败原因
|
||
if (!succeed)
|
||
{
|
||
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
|
||
await MarkFailedAsync(downloadTask, string.IsNullOrWhiteSpace(completeMsg) ? "下载失败" : completeMsg);
|
||
return;
|
||
}
|
||
|
||
// 走到这里即认为下载成功,后续会继续执行文件校验与入库逻辑
|
||
|
||
// 步骤5:文件有效性检查(存在且大小>0)
|
||
// 文件有效性检查
|
||
try
|
||
{
|
||
if (!File.Exists(filePath) || new FileInfo(filePath).Length <= 0)
|
||
{
|
||
await MarkFailedAsync(downloadTask, "下载文件不存在或为空");
|
||
return;
|
||
}
|
||
}
|
||
catch
|
||
{
|
||
// 文件系统异常
|
||
await MarkFailedAsync(downloadTask, "下载文件验证异常");
|
||
return;
|
||
}
|
||
|
||
//// 步骤6:入库(不再使用 Jellyfin 监听):
|
||
//// 1) 插入 VideoAction(保存本地视频元数据)
|
||
//var va = new VideoAction
|
||
//{
|
||
// Code = downloadTask.Code,
|
||
// User = downloadTask.User,
|
||
// VideoFilePath = filePath,
|
||
// VideoName = Path.GetFileName(filePath) ?? string.Empty,
|
||
// StartTime = downloadTask.NvrStartTime,
|
||
// EndTime = downloadTask.NvrEndTime,
|
||
// CreateTime = DateTime.Now
|
||
//};
|
||
//var videoActionId = db.Insert<VideoAction>(va).ExecuteIdentity();
|
||
|
||
// 2) 插入 OEMRawUse(直接填充本地文件路径作为 VideoUrl,并置 UrlState=true)
|
||
var rawUse = new OEMRawUse
|
||
{
|
||
InBagCode = downloadTask.Code,
|
||
RawName = downloadTask.RawName,
|
||
RawCode = downloadTask.RawCode,
|
||
User = downloadTask.User,
|
||
VideoStartTime = downloadTask.NvrStartTime,
|
||
VideoEndTime = downloadTask.NvrEndTime,
|
||
VideoFilePath = downloadTask.VideoFilePath,
|
||
VideoName = NVRCom.GetVideoName(downloadTask.Code!),
|
||
};
|
||
var rawUseId = db.Insert<OEMRawUse>(rawUse).ExecuteIdentity();
|
||
|
||
// 步骤7:收尾——标记下载完成并记录日志
|
||
// 标记下载完成
|
||
downloadTask.Status = TaskStatus.Completed;
|
||
downloadTask.Progress = 100;
|
||
downloadTask.UpdateTime = DateTime.Now;
|
||
await db.Update<DownloadTask>()
|
||
.SetSource(downloadTask)
|
||
.UpdateColumns(a => new { a.Status, a.Progress, a.UpdateTime })
|
||
.Where(a => a.Id == downloadTask.Id)
|
||
.ExecuteAffrowsAsync();
|
||
|
||
_logger.Info("[DownloadTaskWorker] 下载完成并入库:DownloadTaskId={Id}, OEMRawUseId={RawUseId}", downloadTask.Id, rawUseId);
|
||
// 通知任务完成
|
||
RaiseTaskCompleted(downloadTask);
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
// 取消:停止当前 SDK 下载并标记失败
|
||
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
|
||
await MarkFailedAsync(downloadTask, "任务被取消");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
await MarkFailedAsync(downloadTask, ex.Message);
|
||
}
|
||
finally
|
||
{
|
||
// 释放事件订阅,避免内存泄露或重复触发
|
||
try { _hk.NVRLoadVideoProcessEventHandler -= OnProgress; } catch { }
|
||
try { _hk.NVRLoadVideoCompleteEventHandler -= OnComplete; } catch { }
|
||
//_singleRunner.Release();
|
||
// 清空 UI 显示(可选)
|
||
RaiseDownloadFileName(string.Empty);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 将任务标记为失败,并持久化错误信息。
|
||
/// </summary>
|
||
/// <param name="t">下载任务。</param>
|
||
/// <param name="error">错误描述(可空)。</param>
|
||
private Task MarkFailedAsync(DownloadTask t, string? error)
|
||
{
|
||
var db = FSqlContext.FDb;
|
||
// 将状态置为 Failed,记录错误信息与时间;
|
||
// 不抛出异常,保证主循环可以继续处理后续任务
|
||
t.Status = TaskStatus.Failed;
|
||
t.Error = error;
|
||
t.UpdateTime = DateTime.Now;
|
||
_logger.Warn("[DownloadTaskWorker] 任务失败 Id={Id}, 错误={Err}", t.Id, error);
|
||
var task = db.Update<DownloadTask>()
|
||
.SetSource(t)
|
||
.UpdateColumns(a => new { a.Status, a.Error, a.UpdateTime })
|
||
.Where(a => a.Id == t.Id)
|
||
.ExecuteAffrowsAsync();
|
||
try { RaiseTaskFailed(t, error); } catch { }
|
||
return task;
|
||
}
|
||
}
|
||
}
|