Files
FATrace/FATrace.OEMApp/Services/DownloadTaskWorker.cs
2025-12-02 17:27:09 +08:00

337 lines
13 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using FATrace.Com;
using FATrace.Model;
using FATrace.HKNetLib.Wrapper;
using NLog;
using FATrace.OEMApp;
using TaskStatus = FATrace.Model.TaskStatus;
namespace FATrace.OEMApp.Services
{
/// <summary>
/// 下载任务队列(单例)。
/// 负责:
/// 1. 将按钮入队的下载请求持久化为 DownloadTaskPending
/// 2. 后台循环按顺序逐条下载(同一时刻仅一个下载 Running
/// 3. 根据海康 SDK 进度事件回写 DownloadTask.Progress便于 UI/调试观察
/// 4. 下载完成后创建 OEMRawUse 与 JellyfinMonitorTask为后续 Jellyfin 监听做准备
/// 5. 异常失败时写入错误并标记 Failed循环不中断
/// </summary>
public sealed class DownloadTaskWorker
{
private static readonly Lazy<DownloadTaskWorker> _lazy = new(() => new DownloadTaskWorker());
public static DownloadTaskWorker Instance => _lazy.Value;
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly SemaphoreSlim _singleRunner = new(1, 1); // 保证同一时间只有一个下载
private CancellationTokenSource? _cts;
private Task? _loopTask;
private HkCamera? _hk;
private DownloadTaskWorker() { }
/// <summary>
/// 启动下载队列后台循环。
/// </summary>
/// <param name="hk">已初始化的 HkCamera 客户端,用于发起下载与接收事件。</param>
/// <remarks>
/// - 该方法幂等,重复调用不会重复启动。
/// - 启动时会将上次异常退出时处于 Running 的任务重置为 Pending避免任务丢失。
/// </remarks>
public void Start(HkCamera hk)
{
_hk = hk ?? throw new ArgumentNullException(nameof(hk));
if (_cts != null) return; // 已启动
// 恢复上次未完成的运行中任务为待处理
try
{
var db = FSqlContext.FDb;
db.Update<DownloadTask>()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Running)
.ExecuteAffrows();
// 将失败的任务按最大重试次数重置为 Pending程序重启后可自动再次尝试
// 最大重试次数从配置读取appSettings: DownloadTaskMaxRetry默认 3
var maxRetry = ConfigHelper.GetIntOrDefault("DownloadTaskMaxRetry", 3);
db.Update<DownloadTask>()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Error, "")
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Failed && a.TryCount < maxRetry)
.ExecuteAffrows();
}
catch { }
_cts = new CancellationTokenSource();
_loopTask = Task.Run(() => RunAsync(_cts.Token));
_logger.Info("[DownloadTaskWorker] 已启动");
}
/// <summary>
/// 停止下载队列后台循环(尽力而为)。
/// </summary>
public void Stop()
{
try
{
_cts?.Cancel();
}
catch { }
finally
{
_cts = null;
}
}
/// <summary>
/// 入队一个下载任务(同时持久化到数据库)。
/// </summary>
/// <param name="code">业务条码/编号,关联 OEMRawUse.Code 与 VideoAction.Code。</param>
/// <param name="rawName">原料名称。</param>
/// <param name="user">操作用户。</param>
/// <param name="start">NVR 下载开始时间(默认当前时间-5 分钟)。</param>
/// <param name="end">NVR 下载结束时间(默认当前时间)。</param>
/// <returns>新增 DownloadTask 的自增 Id。</returns>
public long Enqueue(string code, string rawName, string user, DateTime? start = null, DateTime? end = null)
{
if (string.IsNullOrWhiteSpace(code)) throw new ArgumentException("code 不能为空");
if (string.IsNullOrWhiteSpace(rawName)) throw new ArgumentException("rawName 不能为空");
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("user 不能为空");
var now = DateTime.Now;
var task = new DownloadTask
{
Code = code,
RawName = rawName,
User = user,
Status = TaskStatus.Pending,
Progress = 0,
NvrStartTime = start ?? now.AddMinutes(-5),
NvrEndTime = end ?? now,
CreateTime = now,
UpdateTime = now
};
var id = FSqlContext.FDb.Insert<DownloadTask>(task).ExecuteIdentity();
_logger.Info("[DownloadTaskWorker] 入队 DownloadTask: Id={Id}, Code={Code}", id, code);
return id;
}
/// <summary>
/// 后台主循环:定期拉取最早的 Pending 任务并顺序处理
/// </summary>
/// <param name="token">取消令牌</param>
private async Task RunAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
// 查询最早入队但未处理的任务
var db = FSqlContext.FDb;
var next = await db.Select<DownloadTask>()
.Where(t => t.Status == TaskStatus.Pending)
.OrderBy(t => t.Id)
.FirstAsync();
if (next == null)
{
// 暂无任务,稍候再查
await Task.Delay(1000, token);
continue;
}
// 使用信号量确保同一时间仅有一个任务进入下载处理
await _singleRunner.WaitAsync(token);
_ = ProcessTaskAsync(next, token).ContinueWith(_ => _singleRunner.Release());
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Error(ex, "[DownloadTaskWorker] 主循环异常");
try { await Task.Delay(2000, token); } catch { }
}
}
_logger.Info("[DownloadTaskWorker] 已停止");
}
/// <summary>
/// 实际执行单个下载任务:
/// 1) 标记 Running生成保存路径
/// 2) 订阅 SDK 事件以回写进度
/// 3) 发起下载并等待完成
/// 4) 下载成功 -> 写 OEMRawUse 与 JellyfinMonitorTask
/// </summary>
/// <param name="t">待处理的 DownloadTask。</param>
/// <param name="token">取消令牌。</param>
private async Task ProcessTaskAsync(DownloadTask t, CancellationToken token)
{
if (_hk == null)
{
_logger.Warn("[DownloadTaskWorker] HkCamera 未初始化,跳过任务 Id={Id}", t.Id);
return;
}
var db = FSqlContext.FDb;
// 标记运行中
t.Status = TaskStatus.Running;
t.TryCount += 1;
t.Progress = 0;
t.UpdateTime = DateTime.Now;
await db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.TryCount, a.Progress, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
// 生成本地文件名/路径
var saveBase = _hk.NVRVideoSavePath;
if (string.IsNullOrWhiteSpace(saveBase))
{
_logger.Error("[DownloadTaskWorker] NVRVideoSavePath 为空,无法下载。任务 Id={Id}", t.Id);
await MarkFailedAsync(t, "NVRVideoSavePath 未配置");
return;
}
var filePath = NVRCom.GetVideoName(saveBase, t.Code ?? "CODE");
await db.Update<DownloadTask>()
.Set(a => a.VideoFilePath, filePath)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
// 事件 -> TCS
// 订阅两个事件:进度与完成。进度事件写回数据库;完成事件用于唤醒等待
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
void OnProgress(object? s, short p)
{
try
{
db.Update<DownloadTask>()
.Set(a => a.Progress, Math.Max((short)0, Math.Min((short)100, p)))
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrows();
}
catch { }
}
void OnComplete(object? s, string msg)
{
try
{
tcs.TrySetResult(true);
}
catch { }
}
_hk.NVRLoadVideoProcessEventHandler += OnProgress;
_hk.NVRLoadVideoCompleteEventHandler += OnComplete;
try
{
// 发起下载(按时间范围)
var res = _hk.Sdk_NET_DVR_GetFileByTime_V40(t.NvrStartTime, t.NvrEndTime, filePath);
if (!res.Result)
{
await MarkFailedAsync(t, res.Msg);
return;
}
// HkCamera 内部会启动进度监控:此处等待完成事件触发
using (token.Register(() => tcs.TrySetCanceled()))
{
await tcs.Task;
}
// 下载完成:写 OEMRawUse 并创建 JellyfinMonitorTask
// 1) 插入 OEMRawUseUrlState=false, VideoUrl 空)
var rawUse = new OEMRawUse
{
InBagCode = t.Code,
RawName = t.RawName,
User = t.User,
UrlState = false,
VideoUrl = string.Empty,
CreateTime = DateTime.Now,
VideoActionId = 0
};
var rawUseId = db.Insert<OEMRawUse>(rawUse).ExecuteIdentity();
// 2) 创建 Jellyfin 监听任务,交由 JellyfinMonitorQueueService 批量匹配
var jfTask = new JellyfinMonitorTask
{
OemRawUseId = rawUseId,
LocalFileNameOrPath = filePath,
Code = t.Code,
RawName = t.RawName,
User = t.User,
NvrStartTime = t.NvrStartTime,
NvrEndTime = t.NvrEndTime,
Status = TaskStatus.Pending,
TryCount = 0,
CreateTime = DateTime.Now,
UpdateTime = DateTime.Now
};
db.Insert<JellyfinMonitorTask>(jfTask).ExecuteAffrows();
// 标记下载完成
t.Status = TaskStatus.Completed;
t.Progress = 100;
t.UpdateTime = DateTime.Now;
await db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Progress, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
_logger.Info("[DownloadTaskWorker] 下载完成,已创建 Jellyfin 监控任务。DownloadTaskId={Id}, OEMRawUseId={RawUseId}", t.Id, rawUseId);
}
catch (OperationCanceledException)
{
await MarkFailedAsync(t, "任务被取消");
}
catch (Exception ex)
{
await MarkFailedAsync(t, ex.Message);
}
finally
{
// 释放事件订阅,避免内存泄露或重复触发
try { _hk.NVRLoadVideoProcessEventHandler -= OnProgress; } catch { }
try { _hk.NVRLoadVideoCompleteEventHandler -= OnComplete; } catch { }
}
}
/// <summary>
/// 将任务标记为失败,并持久化错误信息。
/// </summary>
/// <param name="t">下载任务。</param>
/// <param name="error">错误描述(可空)。</param>
private Task MarkFailedAsync(DownloadTask t, string? error)
{
var db = FSqlContext.FDb;
t.Status = TaskStatus.Failed;
t.Error = error;
t.UpdateTime = DateTime.Now;
_logger.Warn("[DownloadTaskWorker] 任务失败 Id={Id}, 错误={Err}", t.Id, error);
return db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Error, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
}
}
}