using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using FATrace.Com;
using FATrace.Model;
using FATrace.HKNetLib.Wrapper;
using NLog;
using FATrace.OEMApp;
using System.IO;
using TaskStatus = FATrace.Model.TaskStatus;
namespace FATrace.OEMApp.Services
{
///
/// 下载任务队列(单例)。
/// 负责:
/// 1. 将按钮入队的下载请求持久化为 DownloadTask(Pending)
/// 2. 后台循环按顺序逐条下载(同一时刻仅一个下载 Running)
/// 3. 根据海康 SDK 进度事件回写 DownloadTask.Progress,便于 UI/调试观察
/// 4. 下载完成后创建 OEMRawUse 与 JellyfinMonitorTask,为后续 Jellyfin 监听做准备
/// 5. 异常失败时写入错误并标记 Failed,循环不中断
///
public sealed class DownloadTaskWorker
{
private static readonly Lazy _lazy = new(() => new DownloadTaskWorker());
public static DownloadTaskWorker Instance => _lazy.Value;
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly SemaphoreSlim _singleRunner = new(1, 1); // 保证同一时间只有一个下载
private CancellationTokenSource? _cts; // 后台主循环的取消令牌(停止服务时发出)
private Task? _loopTask; // 后台主循环 Task
private HkCamera? _hk; // 供下载与事件回调使用的海康客户端(由 UI 注入)
private DownloadTaskWorker() { }
///
/// 启动下载队列后台循环。
///
/// 已初始化的 HkCamera 客户端,用于发起下载与接收事件。
///
/// - 该方法幂等,重复调用不会重复启动。
/// - 启动时会将上次异常退出时处于 Running 的任务重置为 Pending,避免任务丢失。
///
public void Start(HkCamera hk)
{
_hk = hk ?? throw new ArgumentNullException(nameof(hk));
if (_cts != null) return; // 已启动
// 恢复上次未完成的运行中任务为待处理,然后再循环执行
try
{
var db = FSqlContext.FDb;
// 将异常退出时处于 Running 的任务回滚为 Pending,避免卡住队列
db.Update()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Running)
.ExecuteAffrows();
// 将失败的任务按最大重试次数重置为 Pending,程序重启后可自动再次尝试
// 最大重试次数从配置读取(appSettings: DownloadTaskMaxRetry,默认 3)
var maxRetry = ConfigHelper.GetIntOrDefault("DownloadTaskMaxRetry", 3);
db.Update()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Error, "")
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Failed && a.TryCount < maxRetry)
.ExecuteAffrows();
}
catch { }
_cts = new CancellationTokenSource();
//开始后台主循环
_loopTask = Task.Run(() => RunAsync(_cts.Token));
_logger.Info("[DownloadTaskWorker] 已启动");
}
///
/// 停止下载队列后台循环(尽力而为)。
///
public void Stop()
{
try
{
// 发出取消信号,RunAsync 将尽快退出;
// 正在处理的任务会在 token 取消时尝试停止 SDK 下载
_cts?.Cancel();
}
catch { }
finally
{
_cts = null;
}
}
///
/// 入队一个下载任务(同时持久化到数据库)。
///
/// 业务条码/编号,关联 OEMRawUse.Code 与 VideoAction.Code。
/// 原料名称。
/// 操作用户。
/// NVR 下载开始时间(默认当前时间-30 秒)。
/// NVR 下载结束时间(默认当前时间)。
/// 原料条码(可选)。未提供时默认等于 code。
/// 新增 DownloadTask 的自增 Id。
public long Enqueue(string code, string rawName, string user, DateTime? start = null, DateTime? end = null, string? rawCode = null)
{
if (string.IsNullOrWhiteSpace(code)) throw new ArgumentException("code 不能为空");
if (string.IsNullOrWhiteSpace(rawName)) throw new ArgumentException("rawName 不能为空");
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("user 不能为空");
var now = DateTime.Now;
// 构造持久化任务:
// - 默认回溯 30 秒(符合项目规则)
// - rawCode 未提供时回落到 code,避免非空列插入失败
var task = new DownloadTask
{
Code = code,
RawName = rawName,
RawCode = rawCode ?? code,
User = user,
Status = TaskStatus.Pending,
Progress = 0,
NvrStartTime = start ?? now.AddSeconds(-30),
NvrEndTime = end ?? now,
CreateTime = now,
UpdateTime = now
};
// 入库返回自增 Id,便于 UI 提示与后续跟踪
var id = FSqlContext.FDb.Insert(task).ExecuteIdentity();
_logger.Info("[DownloadTaskWorker] 入队 DownloadTask: Id={Id}, Code={Code}", id, code);
return id;
}
///
/// 后台主循环:定期拉取最早的 Pending 任务并顺序处理
///
/// 取消令牌
private async Task RunAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
// 查询最早入队但未处理的任务
var db = FSqlContext.FDb;
var next = await db.Select()
.Where(t => t.Status == TaskStatus.Pending)
.OrderBy(t => t.Id)
.FirstAsync();
if (next == null)
{
// 暂无任务,稍候再查
await Task.Delay(5000, token);
continue;
}
// 使用信号量确保同一时间仅有一个任务进入下载处理
await _singleRunner.WaitAsync(token);
// 通过 ContinueWith 在任务结束时释放信号量,避免阻塞主循环
_ = ProcessTaskAsync(next, token).ContinueWith(_ => _singleRunner.Release());
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Error(ex, "[DownloadTaskWorker] 主循环异常");
try { await Task.Delay(2000, token); } catch { }
}
}
_logger.Info("[DownloadTaskWorker] 已停止");
}
///
/// 实际执行单个下载任务:
/// 1) 标记 Running,生成保存路径
/// 2) 订阅 SDK 事件以回写进度
/// 3) 发起下载并等待完成
/// 4) 下载成功 -> 写 OEMRawUse 与 JellyfinMonitorTask
///
/// 待处理的 DownloadTask。
/// 取消令牌。
private async Task ProcessTaskAsync(DownloadTask t, CancellationToken token)
{
if (_hk == null)
{
_logger.Warn("[DownloadTaskWorker] HkCamera 未初始化,跳过任务 Id={Id}", t.Id);
return;
}
var db = FSqlContext.FDb;
// 步骤1:状态入库(Running/TryCount/Progress/UpdateTime)
// 标记运行中
t.Status = TaskStatus.Running;
t.TryCount += 1;
t.Progress = 0;
t.UpdateTime = DateTime.Now;
await db.Update()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.TryCount, a.Progress, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
// 步骤2:生成保存路径(含安全文件名),并确保保存目录存在
// 生成本地文件名/路径
var saveBase = _hk.NVRVideoSavePath;
if (string.IsNullOrWhiteSpace(saveBase))
{
_logger.Error("[DownloadTaskWorker] NVRVideoSavePath 为空,无法下载。任务 Id={Id}", t.Id);
await MarkFailedAsync(t, "NVRVideoSavePath 未配置");
return;
}
var filePath = NVRCom.GetVideoName(saveBase, t.Code ?? "CODE");
// 确保保存目录存在,避免 SDK 写文件失败
try
{
var dir = Path.GetDirectoryName(filePath);
if (!string.IsNullOrWhiteSpace(dir) && !Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
catch { }
await db.Update()
.Set(a => a.VideoFilePath, filePath)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
// 步骤3:订阅 SDK 事件 -> TCS 转换
// 事件 -> TCS
// 订阅两个事件:进度与完成。进度事件写回数据库;完成事件用于唤醒等待
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
void OnProgress(object? s, short p)
{
try
{
//db.Update()
// .Set(a => a.Progress, Math.Max((short)0, Math.Min((short)100, p)))
// .Set(a => a.UpdateTime, DateTime.Now)
// .Where(a => a.Id == t.Id)
// .ExecuteAffrows();
}
catch { }
}
void OnComplete(object? s, string msg)
{
try
{
tcs.TrySetResult(msg);
}
catch { }
}
_hk.NVRLoadVideoProcessEventHandler += OnProgress;
_hk.NVRLoadVideoCompleteEventHandler += OnComplete;
// 步骤4:调用 SDK 按时间下载,并等待完成事件(含超时/取消处理)
try
{
// 发起下载(按时间范围)
var res = _hk.Sdk_NET_DVR_GetFileByTime_V40(t.NvrStartTime, t.NvrEndTime, filePath);
if (!res.Result)
{
await MarkFailedAsync(t, res.Msg);
return;
}
// HkCamera 内部会启动进度监控:此处等待完成事件触发(带超时)
var timeWindowSec = Math.Max(1, (int)(t.NvrEndTime - t.NvrStartTime).TotalSeconds);
var timeoutSec = ConfigHelper.GetIntOrDefault("DownloadTaskTimeoutSeconds", Math.Max(120, Math.Min(1800, timeWindowSec * 3)));
Task completed = tcs.Task;
Task timeoutTask = Task.Delay(TimeSpan.FromSeconds(timeoutSec), token);
using (token.Register(() =>
{
try { tcs.TrySetCanceled(); } catch { }
}))
{
var finished = await Task.WhenAny(completed, timeoutTask);
if (finished == timeoutTask)
{
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
await MarkFailedAsync(t, $"下载超时({timeoutSec}s)");
return;
}
}
var completeMsg = await ((Task)tcs.Task);
// 依据完成事件消息判断成功/失败(包含“完成”视为成功)
var succeed = !string.IsNullOrWhiteSpace(completeMsg) && completeMsg.Contains("完成");
if (!succeed)
{
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
await MarkFailedAsync(t, string.IsNullOrWhiteSpace(completeMsg) ? "下载失败" : completeMsg);
return;
}
// 步骤5:文件有效性检查(存在且大小>0)
// 文件有效性检查
try
{
if (!File.Exists(filePath) || new FileInfo(filePath).Length <= 0)
{
await MarkFailedAsync(t, "下载文件不存在或为空");
return;
}
}
catch
{
// 文件系统异常
await MarkFailedAsync(t, "下载文件验证异常");
return;
}
// 步骤6:入库——写 OEMRawUse 并创建 Jellyfin 监听任务
// 下载完成:写 OEMRawUse 并创建 JellyfinMonitorTask
// 1) 插入 OEMRawUse(UrlState=false, VideoUrl 空)
var rawUse = new OEMRawUse
{
InBagCode = t.Code,
RawName = t.RawName,
RawCode = t.RawCode,
User = t.User,
UrlState = false,
VideoUrl = string.Empty,
CreateTime = DateTime.Now,
VideoActionId = 0
};
var rawUseId = db.Insert(rawUse).ExecuteIdentity();
// 2) 创建 Jellyfin 监听任务,交由 JellyfinMonitorQueueService 批量匹配
var jfTask = new JellyfinMonitorTask
{
OemRawUseId = rawUseId,
LocalFileNameOrPath = filePath,
Code = t.Code,
RawName = t.RawName,
User = t.User,
NvrStartTime = t.NvrStartTime,
NvrEndTime = t.NvrEndTime,
Status = TaskStatus.Pending,
TryCount = 0,
CreateTime = DateTime.Now,
UpdateTime = DateTime.Now
};
db.Insert(jfTask).ExecuteAffrows();
// 步骤7:收尾——标记下载完成并记录日志
// 标记下载完成
t.Status = TaskStatus.Completed;
t.Progress = 100;
t.UpdateTime = DateTime.Now;
await db.Update()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Progress, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
_logger.Info("[DownloadTaskWorker] 下载完成,已创建 Jellyfin 监控任务 DownloadTaskId={Id}, OEMRawUseId={RawUseId}", t.Id, rawUseId);
}
catch (OperationCanceledException)
{
// 取消:停止当前 SDK 下载并标记失败
try { _hk.Sdk_NET_DVR_StopGetFile(); } catch { }
await MarkFailedAsync(t, "任务被取消");
}
catch (Exception ex)
{
await MarkFailedAsync(t, ex.Message);
}
finally
{
// 释放事件订阅,避免内存泄露或重复触发
try { _hk.NVRLoadVideoProcessEventHandler -= OnProgress; } catch { }
try { _hk.NVRLoadVideoCompleteEventHandler -= OnComplete; } catch { }
//_singleRunner.Release();
}
}
///
/// 将任务标记为失败,并持久化错误信息。
///
/// 下载任务。
/// 错误描述(可空)。
private Task MarkFailedAsync(DownloadTask t, string? error)
{
var db = FSqlContext.FDb;
// 将状态置为 Failed,记录错误信息与时间;
// 不抛出异常,保证主循环可以继续处理后续任务
t.Status = TaskStatus.Failed;
t.Error = error;
t.UpdateTime = DateTime.Now;
_logger.Warn("[DownloadTaskWorker] 任务失败 Id={Id}, 错误={Err}", t.Id, error);
return db.Update()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Error, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
}
}
}