Files
FATrace/FATrace.OEMApp/Services/DownloadTaskWorker.cs
2025-12-04 18:39:34 +08:00

485 lines
21 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using FATrace.Com;
using FATrace.Model;
using FATrace.HKNetLib.Wrapper;
using NLog;
using FATrace.OEMApp;
using System.IO;
using TaskStatus = FATrace.Model.TaskStatus;
namespace FATrace.OEMApp.Services
{
/// <summary>
/// 下载任务队列(单例)。
/// 负责:
/// 1. 将按钮入队的下载请求持久化为 DownloadTaskPending
/// 2. 后台循环按顺序逐条下载(同一时刻仅一个下载 Running
/// 3. 根据海康 SDK 进度事件回写 DownloadTask.Progress便于 UI/调试观察
/// 4. 下载完成后创建 OEMRawUse 与 JellyfinMonitorTask为后续 Jellyfin 监听做准备
/// 5. 异常失败时写入错误并标记 Failed循环不中断
/// </summary>
public sealed class DownloadTaskWorker
{
private static readonly Lazy<DownloadTaskWorker> _lazy = new(() => new DownloadTaskWorker());
public static DownloadTaskWorker Instance => _lazy.Value;
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly SemaphoreSlim _singleRunner = new(1, 1); // 保证同一时间只有一个下载
private CancellationTokenSource? _cts; // 后台主循环的取消令牌(停止服务时发出)
private Task? _loopTask; // 后台主循环 Task
private HkCamera? _hk; // 供下载与事件回调使用的海康客户端(由 UI 注入)
// 当前下载文件名变更事件(用于 UI 显示)
public event Action<string>? DownloadFileNameChanged;
private void RaiseDownloadFileName(string name)
{
try { DownloadFileNameChanged?.Invoke(name); } catch { }
}
// 任务状态事件(用于按需刷新 gridRULog
public event Action<DownloadTask>? TaskStarted;
public event Action<DownloadTask>? TaskCompleted;
public event Action<DownloadTask, string?>? TaskFailed;
private void RaiseTaskStarted(DownloadTask t) { try { TaskStarted?.Invoke(t); } catch { } }
private void RaiseTaskCompleted(DownloadTask t) { try { TaskCompleted?.Invoke(t); } catch { } }
private void RaiseTaskFailed(DownloadTask t, string? err) { try { TaskFailed?.Invoke(t, err); } catch { } }
/// <summary>
/// 下载的视频时间
/// </summary>
public static int VideoTime { get; set; }
private DownloadTaskWorker()
{
VideoTime = ConfigHelper.GetIntOrDefault("VideoTime", 30);
}
/// <summary>
/// 启动下载队列后台循环。
/// </summary>
/// <param name="hk">已初始化的 HkCamera 客户端,用于发起下载与接收事件。</param>
/// <remarks>
/// - 该方法幂等,重复调用不会重复启动。
/// - 启动时会将上次异常退出时处于 Running 的任务重置为 Pending避免任务丢失。
/// </remarks>
public void Start(HkCamera hk)
{
_hk = hk ?? throw new ArgumentNullException(nameof(hk));
if (_cts != null) return; // 已启动
// 恢复上次未完成的运行中任务为待处理,然后再循环执行
try
{
var db = FSqlContext.FDb;
// 将异常退出时处于 Running 的任务回滚为 Pending避免卡住队列
db.Update<DownloadTask>()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Running)
.ExecuteAffrows();
// 将失败的任务按最大重试次数重置为 Pending程序重启后可自动再次尝试
// 最大重试次数从配置读取appSettings: DownloadTaskMaxRetry默认 3
var maxRetry = ConfigHelper.GetIntOrDefault("DownloadTaskMaxRetry", 3);
db.Update<DownloadTask>()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Error, "")
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Failed && a.TryCount < maxRetry)
.ExecuteAffrows();
}
catch { }
_cts = new CancellationTokenSource();
//开始后台主循环
_loopTask = Task.Run(() => RunAsync(_cts.Token));
_logger.Info("[DownloadTaskWorker] 已启动");
}
/// <summary>
/// 停止下载队列后台循环(尽力而为)。
/// </summary>
public void Stop()
{
try
{
// 发出取消信号RunAsync 将尽快退出;
// 正在处理的任务会在 token 取消时尝试停止 SDK 下载
_cts?.Cancel();
}
catch { }
finally
{
_cts = null;
}
}
/// <summary>
/// 入队一个下载任务(同时持久化到数据库)。
/// </summary>
/// <param name="code">业务条码/编号,关联 OEMRawUse.Code 与 VideoAction.Code。</param>
/// <param name="rawName">原料名称。</param>
/// <param name="user">操作用户。</param>
/// <param name="start">NVR 下载开始时间(默认当前时间-30 秒)。</param>
/// <param name="end">NVR 下载结束时间(默认当前时间)。</param>
/// <param name="rawCode">原料条码(可选)。未提供时默认等于 code。</param>
/// <returns>新增 DownloadTask 的自增 Id。</returns>
public long Enqueue(ParsedCodeInfo parsedCodeInfo, string user, DateTime? start = null, DateTime? end = null)
{
if (parsedCodeInfo == null) throw new ArgumentException("code 不能为空");
if (string.IsNullOrWhiteSpace(parsedCodeInfo.Code)) throw new ArgumentException("code 不能为空");
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("user 不能为空");
var now = DateTime.Now;
// 构造持久化任务:
// - 默认回溯 30 秒(符合项目规则)
// - rawCode 未提供时回落到 code避免非空列插入失败
var task = new DownloadTask
{
Code = parsedCodeInfo.Code,
RawName = parsedCodeInfo.RawName,
RawCode = parsedCodeInfo.RawCode,
User = user,
Status = TaskStatus.Pending,
Progress = 0,
NvrStartTime = start ?? now.AddSeconds(-VideoTime),
NvrEndTime = end ?? now,
CreateTime = now,
UpdateTime = now
};
// 入库返回自增 Id便于 UI 提示与后续跟踪
var id = FSqlContext.FDb.Insert<DownloadTask>(task).ExecuteIdentity();
_logger.Info("[DownloadTaskWorker] 入队 DownloadTask: Id={Id}, Code={Code}", id, parsedCodeInfo.Code);
return id;
}
/// <summary>
/// 后台主循环:定期拉取最早的 Pending 任务并顺序处理
/// </summary>
/// <param name="token">取消令牌</param>
private async Task RunAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
DownloadTask? next = null;
try
{
await _singleRunner.WaitAsync(token);
try
{
var db = FSqlContext.FDb;
next = await db.Select<DownloadTask>()
.Where(t => t.Status == TaskStatus.Pending && t.NvrEndTime < DateTime.Now.AddSeconds(5))
.OrderBy(t => t.Id)
.FirstAsync();
if (next != null)
{
await ProcessTaskAsync(next, token);
}
}
finally
{
_singleRunner.Release();
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Error(ex, "[DownloadTaskWorker] 主循环异常");
}
if (next == null)
{
try { await Task.Delay(5000, token); } catch { }
}
}
_logger.Info("[DownloadTaskWorker] 已停止");
}
/// <summary>
/// 当前处理中的 DownloadTask
/// </summary>
public DownloadTask CurDownloadTask { get; set; }
/// <summary>
/// 实际执行单个下载任务:
/// 1) 标记 Running生成保存路径
/// 2) 订阅 SDK 事件以回写进度
/// 3) 发起下载并等待完成
/// 4) 下载成功 -> 写 OEMRawUse 与 JellyfinMonitorTask
/// </summary>
/// <param name="t">待处理的 DownloadTask。</param>
/// <param name="token">取消令牌。</param>
private async Task ProcessTaskAsync(DownloadTask downloadTask, CancellationToken token)
{
if (_hk == null)
{
_logger.Warn("[DownloadTaskWorker] HkCamera 未初始化,跳过任务 Id={Id}", downloadTask.Id);
return;
}
CurDownloadTask = downloadTask;
var db = FSqlContext.FDb;
// 步骤1状态入库Running/TryCount/Progress/UpdateTime
// 标记运行中
downloadTask.Status = TaskStatus.Running;
downloadTask.TryCount += 1;
downloadTask.Progress = 0;
downloadTask.UpdateTime = DateTime.Now;
await db.Update<DownloadTask>()
.SetSource(downloadTask)
.UpdateColumns(a => new { a.Status, a.TryCount, a.Progress, a.UpdateTime })
.Where(a => a.Id == downloadTask.Id)
.ExecuteAffrowsAsync();
// 通知任务开始
RaiseTaskStarted(downloadTask);
// 步骤2生成保存路径含安全文件名并确保保存目录存在
// 生成本地文件名/路径
var saveBase = _hk.NVRVideoSavePath;
if (string.IsNullOrWhiteSpace(saveBase))
{
_logger.Error("[DownloadTaskWorker] NVRVideoSavePath 为空,无法下载。任务 Id={Id}", downloadTask.Id);
await MarkFailedAsync(downloadTask, "NVRVideoSavePath 未配置");
return;
}
var filePath = NVRCom.GetVideoPathName(saveBase, downloadTask.Code!);
// 确保保存目录存在,避免 SDK 写文件失败
try
{
var dir = Path.GetDirectoryName(filePath);
if (!string.IsNullOrWhiteSpace(dir) && !Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
catch { }
await db.Update<DownloadTask>()
.Set(a => a.VideoFilePath, filePath)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == downloadTask.Id)
.ExecuteAffrowsAsync();
//当前的下载路径
downloadTask.VideoFilePath = filePath;
// 通知 UI 当前下载文件名
RaiseDownloadFileName(Path.GetFileName(filePath) ?? string.Empty);
// 步骤3订阅 SDK 事件 -> TCS 转换
// 事件 -> TCS
// 订阅两个事件:进度与完成。进度事件写回数据库;完成事件用于唤醒等待
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
void OnProgress(object? s, short p)
{
try
{
//实时更新下载进度
//db.Update<DownloadTask>()
// .Set(a => a.Progress, Math.Max((short)0, Math.Min((short)100, p)))
// .Set(a => a.UpdateTime, DateTime.Now)
// .Where(a => a.Id == t.Id)
// .ExecuteAffrows();
}
catch { }
}
void OnComplete(object? s, string msg)
{
try
{
tcs.TrySetResult(msg);
}
catch { }
}
_hk.NVRLoadVideoProcessEventHandler += OnProgress;
_hk.NVRLoadVideoCompleteEventHandler += OnComplete;
// 步骤4调用 SDK 按时间下载,并等待完成事件(含超时/取消处理)
try
{
// 发起下载(按时间范围)
var res = _hk.Sdk_NET_DVR_GetFileByTime_V40(downloadTask.NvrStartTime, downloadTask.NvrEndTime, filePath);
if (!res.Result)
{
await MarkFailedAsync(downloadTask, res.Msg);
return;
}
//下载过程中的监控事件
// 计算这次下载的时间窗口长度(秒),用于推导合理的超时
var timeWindowSec = Math.Max(1, (int)(downloadTask.NvrEndTime - downloadTask.NvrStartTime).TotalSeconds);
// 计算总超时秒数:优先读配置 DownloadTaskTimeoutSeconds
// 如果未配置,用时间窗口 * 3 的经验值,并限制在 [120, 1800] 区间,防止过短/过长。
var timeoutSec = ConfigHelper.GetIntOrDefault(
"DownloadTaskTimeoutSeconds",
Math.Max(120, Math.Min(1800, timeWindowSec * 3))
);
// completed 代表“下载完成”这个事件(由 tcs.TrySetResult 在回调里触发)
Task completed = tcs.Task;
// timeoutTask 是“超时定时器”,超时时间到会完成;同时受 token 取消影响
Task timeoutTask = Task.Delay(TimeSpan.FromSeconds(timeoutSec), token);
// 当外部取消Stop/退出)时,主动让 tcs 进入取消态,避免一直等待 completed
using (token.Register(() =>
{
try { tcs.TrySetCanceled(); } catch { }
}))
{
// 等待“完成事件”或“超时”二者之一先发生
var finished = await Task.WhenAny(completed, timeoutTask);
// 如果先等到了 timeoutTask说明超时发生
if (finished == timeoutTask)
{
// 尽力停止 SDK 的下载(防止后台还在跑)
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
// 将任务标记为失败(原因是超时),写入数据库状态
await MarkFailedAsync(downloadTask, $"下载超时({timeoutSec}s)");
return; // 结束本次任务处理
}
}
// 能走到这里,表示 completed 先发生(下载完成事件被触发)
// 读取回调携带的完成消息(有些 SDK 会返回提示文本,例如“下载完成”)
var completeMsg = await ((Task<string>)tcs.Task);
// 简单的成功判断:消息非空,并包含“完成”字样就算成功
var succeed = !string.IsNullOrWhiteSpace(completeMsg) && completeMsg.Contains("完成");
// 若判断不通过,视为失败,尽力停止下载并入库失败原因
if (!succeed)
{
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
await MarkFailedAsync(downloadTask, string.IsNullOrWhiteSpace(completeMsg) ? "下载失败" : completeMsg);
return;
}
// 走到这里即认为下载成功,后续会继续执行文件校验与入库逻辑
// 步骤5文件有效性检查存在且大小>0
// 文件有效性检查
try
{
if (!File.Exists(filePath) || new FileInfo(filePath).Length <= 0)
{
await MarkFailedAsync(downloadTask, "下载文件不存在或为空");
return;
}
}
catch
{
// 文件系统异常
await MarkFailedAsync(downloadTask, "下载文件验证异常");
return;
}
//// 步骤6入库不再使用 Jellyfin 监听):
//// 1) 插入 VideoAction保存本地视频元数据
//var va = new VideoAction
//{
// Code = downloadTask.Code,
// User = downloadTask.User,
// VideoFilePath = filePath,
// VideoName = Path.GetFileName(filePath) ?? string.Empty,
// StartTime = downloadTask.NvrStartTime,
// EndTime = downloadTask.NvrEndTime,
// CreateTime = DateTime.Now
//};
//var videoActionId = db.Insert<VideoAction>(va).ExecuteIdentity();
// 2) 插入 OEMRawUse直接填充本地文件路径作为 VideoUrl并置 UrlState=true
var rawUse = new OEMRawUse
{
InBagCode = downloadTask.Code,
RawName = downloadTask.RawName,
RawCode = downloadTask.RawCode,
User = downloadTask.User,
VideoStartTime = downloadTask.NvrStartTime,
VideoEndTime = downloadTask.NvrEndTime,
VideoFilePath = downloadTask.VideoFilePath,
VideoName = NVRCom.GetVideoName(downloadTask.Code!),
};
var rawUseId = db.Insert<OEMRawUse>(rawUse).ExecuteIdentity();
// 步骤7收尾——标记下载完成并记录日志
// 标记下载完成
downloadTask.Status = TaskStatus.Completed;
downloadTask.Progress = 100;
downloadTask.UpdateTime = DateTime.Now;
await db.Update<DownloadTask>()
.SetSource(downloadTask)
.UpdateColumns(a => new { a.Status, a.Progress, a.UpdateTime })
.Where(a => a.Id == downloadTask.Id)
.ExecuteAffrowsAsync();
_logger.Info("[DownloadTaskWorker] 下载完成并入库DownloadTaskId={Id}, OEMRawUseId={RawUseId}", downloadTask.Id, rawUseId);
// 通知任务完成
RaiseTaskCompleted(downloadTask);
}
catch (OperationCanceledException)
{
// 取消:停止当前 SDK 下载并标记失败
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
await MarkFailedAsync(downloadTask, "任务被取消");
}
catch (Exception ex)
{
await MarkFailedAsync(downloadTask, ex.Message);
}
finally
{
// 释放事件订阅,避免内存泄露或重复触发
try { _hk.NVRLoadVideoProcessEventHandler -= OnProgress; } catch { }
try { _hk.NVRLoadVideoCompleteEventHandler -= OnComplete; } catch { }
//_singleRunner.Release();
// 清空 UI 显示(可选)
RaiseDownloadFileName(string.Empty);
}
}
/// <summary>
/// 将任务标记为失败,并持久化错误信息。
/// </summary>
/// <param name="t">下载任务。</param>
/// <param name="error">错误描述(可空)。</param>
private Task MarkFailedAsync(DownloadTask t, string? error)
{
var db = FSqlContext.FDb;
// 将状态置为 Failed记录错误信息与时间
// 不抛出异常,保证主循环可以继续处理后续任务
t.Status = TaskStatus.Failed;
t.Error = error;
t.UpdateTime = DateTime.Now;
_logger.Warn("[DownloadTaskWorker] 任务失败 Id={Id}, 错误={Err}", t.Id, error);
var task = db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Error, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
try { RaiseTaskFailed(t, error); } catch { }
return task;
}
}
}