This commit is contained in:
2025-10-29 11:42:58 +08:00
parent 7f6f84cd0e
commit a178c3550e
190 changed files with 81361 additions and 92 deletions

View File

@@ -0,0 +1,334 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using FATrace.Com;
using FATrace.Model;
using FATrace.HKNetLib.Wrapper;
using NLog;
using FATrace.OEMApp;
using TaskStatus = FATrace.Model.TaskStatus;
namespace FATrace.OEMApp.Services
{
/// <summary>
/// 下载任务队列(单例)。
/// 负责:
/// 1. 将按钮入队的下载请求持久化为 DownloadTaskPending
/// 2. 后台循环按顺序逐条下载(同一时刻仅一个下载 Running
/// 3. 根据海康 SDK 进度事件回写 DownloadTask.Progress便于 UI/调试观察
/// 4. 下载完成后创建 OEMRawUse 与 JellyfinMonitorTask为后续 Jellyfin 监听做准备
/// 5. 异常失败时写入错误并标记 Failed循环不中断
/// </summary>
public sealed class DownloadTaskWorker
{
private static readonly Lazy<DownloadTaskWorker> _lazy = new(() => new DownloadTaskWorker());
public static DownloadTaskWorker Instance => _lazy.Value;
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly SemaphoreSlim _singleRunner = new(1, 1); // 保证同一时间只有一个下载
private CancellationTokenSource? _cts;
private Task? _loopTask;
private HkCamera? _hk;
private DownloadTaskWorker() { }
/// <summary>
/// 启动下载队列后台循环。
/// </summary>
/// <param name="hk">已初始化的 HkCamera 客户端,用于发起下载与接收事件。</param>
/// <remarks>
/// - 该方法幂等,重复调用不会重复启动。
/// - 启动时会将上次异常退出时处于 Running 的任务重置为 Pending避免任务丢失。
/// </remarks>
public void Start(HkCamera hk)
{
_hk = hk ?? throw new ArgumentNullException(nameof(hk));
if (_cts != null) return; // 已启动
// 恢复上次未完成的运行中任务为待处理
try
{
var db = FSqlContext.FDb;
db.Update<DownloadTask>()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Running)
.ExecuteAffrows();
// 将失败的任务按最大重试次数重置为 Pending程序重启后可自动再次尝试
// 最大重试次数从配置读取appSettings: DownloadTaskMaxRetry默认 3
var maxRetry = ConfigHelper.GetIntOrDefault("DownloadTaskMaxRetry", 3);
db.Update<DownloadTask>()
.Set(a => a.Status, TaskStatus.Pending)
.Set(a => a.Error, null)
.Set(a => a.Progress, 0)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Status == TaskStatus.Failed && a.TryCount < maxRetry)
.ExecuteAffrows();
}
catch { }
_cts = new CancellationTokenSource();
_loopTask = Task.Run(() => RunAsync(_cts.Token));
_logger.Info("[DownloadTaskWorker] 已启动");
}
/// <summary>
/// 停止下载队列后台循环(尽力而为)。
/// </summary>
public void Stop()
{
try
{
_cts?.Cancel();
}
catch { }
finally
{
_cts = null;
}
}
/// <summary>
/// 入队一个下载任务(同时持久化到数据库)。
/// </summary>
/// <param name="code">业务条码/编号,关联 OEMRawUse.Code 与 VideoAction.Code。</param>
/// <param name="rawName">原料名称。</param>
/// <param name="user">操作用户。</param>
/// <param name="start">NVR 下载开始时间(默认当前时间-5 分钟)。</param>
/// <param name="end">NVR 下载结束时间(默认当前时间)。</param>
/// <returns>新增 DownloadTask 的自增 Id。</returns>
public long Enqueue(string code, string rawName, string user, DateTime? start = null, DateTime? end = null)
{
if (string.IsNullOrWhiteSpace(code)) throw new ArgumentException("code 不能为空");
if (string.IsNullOrWhiteSpace(rawName)) throw new ArgumentException("rawName 不能为空");
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("user 不能为空");
var now = DateTime.Now;
var task = new DownloadTask
{
Code = code,
RawName = rawName,
User = user,
Status = TaskStatus.Pending,
Progress = 0,
NvrStartTime = start ?? now.AddMinutes(-5),
NvrEndTime = end ?? now,
CreateTime = now,
UpdateTime = now
};
var id = FSqlContext.FDb.Insert<DownloadTask>(task).ExecuteIdentity();
_logger.Info("[DownloadTaskWorker] 入队 DownloadTask: Id={Id}, Code={Code}", id, code);
return id;
}
/// <summary>
/// 后台主循环:定期拉取最早的 Pending 任务并顺序处理。
/// </summary>
/// <param name="token">取消令牌。</param>
private async Task RunAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
// 查询最早入队但未处理的任务
var db = FSqlContext.FDb;
var next = await db.Select<DownloadTask>()
.Where(t => t.Status == TaskStatus.Pending)
.OrderBy(t => t.Id)
.FirstAsync();
if (next == null)
{
// 暂无任务,稍候再查
await Task.Delay(1000, token);
continue;
}
// 使用信号量确保同一时间仅有一个任务进入下载处理
await _singleRunner.WaitAsync(token);
_ = ProcessTaskAsync(next, token).ContinueWith(_ => _singleRunner.Release());
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Error(ex, "[DownloadTaskWorker] 主循环异常");
try { await Task.Delay(2000, token); } catch { }
}
}
_logger.Info("[DownloadTaskWorker] 已停止");
}
/// <summary>
/// 实际执行单个下载任务:
/// 1) 标记 Running生成保存路径
/// 2) 订阅 SDK 事件以回写进度
/// 3) 发起下载并等待完成
/// 4) 下载成功 -> 写 OEMRawUse 与 JellyfinMonitorTask
/// </summary>
/// <param name="t">待处理的 DownloadTask。</param>
/// <param name="token">取消令牌。</param>
private async Task ProcessTaskAsync(DownloadTask t, CancellationToken token)
{
if (_hk == null)
{
_logger.Warn("[DownloadTaskWorker] HkCamera 未初始化,跳过任务 Id={Id}", t.Id);
return;
}
var db = FSqlContext.FDb;
// 标记运行中
t.Status = TaskStatus.Running;
t.TryCount += 1;
t.Progress = 0;
t.UpdateTime = DateTime.Now;
await db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.TryCount, a.Progress, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
// 生成本地文件名/路径
var saveBase = _hk.NVRVideoSavePath;
if (string.IsNullOrWhiteSpace(saveBase))
{
_logger.Error("[DownloadTaskWorker] NVRVideoSavePath 为空,无法下载。任务 Id={Id}", t.Id);
await MarkFailedAsync(t, "NVRVideoSavePath 未配置");
return;
}
var filePath = NVRCom.GetVideoName(saveBase, t.Code ?? "CODE");
await db.Update<DownloadTask>()
.Set(a => a.VideoFilePath, filePath)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
// 事件 -> TCS
// 订阅两个事件:进度与完成。进度事件写回数据库;完成事件用于唤醒等待。
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
void OnProgress(object? s, short p)
{
try
{
db.Update<DownloadTask>()
.Set(a => a.Progress, Math.Max((short)0, Math.Min((short)100, p)))
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrows();
}
catch { }
}
void OnComplete(object? s, string msg)
{
try
{
tcs.TrySetResult(true);
}
catch { }
}
_hk.NVRLoadVideoProcessEventHandler += OnProgress;
_hk.NVRLoadVideoCompleteEventHandler += OnComplete;
try
{
// 发起下载(按时间范围)
var res = _hk.Sdk_NET_DVR_GetFileByTime_V40(t.NvrStartTime, t.NvrEndTime, filePath);
if (!res.Result)
{
await MarkFailedAsync(t, res.Msg);
return;
}
// HkCamera 内部会启动进度监控:此处等待完成事件触发
using (token.Register(() => tcs.TrySetCanceled()))
{
await tcs.Task;
}
// 下载完成:写 OEMRawUse 并创建 JellyfinMonitorTask
// 1) 插入 OEMRawUseUrlState=false, VideoUrl 空)
var rawUse = new OEMRawUse
{
InBagCode = t.Code,
RawName = t.RawName,
User = t.User,
UrlState = false,
VideoUrl = string.Empty,
CreateTime = DateTime.Now,
VideoActionId = 0
};
var rawUseId = db.Insert<OEMRawUse>(rawUse).ExecuteIdentity();
// 2) 创建 Jellyfin 监听任务,交由 JellyfinMonitorQueueService 批量匹配
var jfTask = new JellyfinMonitorTask
{
OemRawUseId = rawUseId,
LocalFileNameOrPath = filePath,
Code = t.Code,
RawName = t.RawName,
User = t.User,
NvrStartTime = t.NvrStartTime,
NvrEndTime = t.NvrEndTime,
Status = TaskStatus.Pending,
TryCount = 0,
CreateTime = DateTime.Now,
UpdateTime = DateTime.Now
};
db.Insert<JellyfinMonitorTask>(jfTask).ExecuteAffrows();
// 标记下载完成
t.Status = TaskStatus.Completed;
t.Progress = 100;
t.UpdateTime = DateTime.Now;
await db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Progress, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
_logger.Info("[DownloadTaskWorker] 下载完成,已创建 Jellyfin 监控任务。DownloadTaskId={Id}, OEMRawUseId={RawUseId}", t.Id, rawUseId);
}
catch (OperationCanceledException)
{
await MarkFailedAsync(t, "任务被取消");
}
catch (Exception ex)
{
await MarkFailedAsync(t, ex.Message);
}
finally
{
// 释放事件订阅,避免内存泄露或重复触发
try { _hk.NVRLoadVideoProcessEventHandler -= OnProgress; } catch { }
try { _hk.NVRLoadVideoCompleteEventHandler -= OnComplete; } catch { }
}
}
/// <summary>
/// 将任务标记为失败,并持久化错误信息。
/// </summary>
/// <param name="t">下载任务。</param>
/// <param name="error">错误描述(可空)。</param>
private Task MarkFailedAsync(DownloadTask t, string? error)
{
var db = FSqlContext.FDb;
t.Status = TaskStatus.Failed;
t.Error = error;
t.UpdateTime = DateTime.Now;
_logger.Warn("[DownloadTaskWorker] 任务失败 Id={Id}, 错误={Err}", t.Id, error);
return db.Update<DownloadTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.Error, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
}
}
}

View File

@@ -0,0 +1,327 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FATrace.Com;
using FATrace.Model;
using NLog;
using FATrace.OEMApp;
using FATrace.OEMApp.Model.Jellyfin;
using TaskStatus = FATrace.Model.TaskStatus;
namespace FATrace.OEMApp.Services
{
/// <summary>
/// Jellyfin 批量监控服务(单例)。
/// 职责:
/// 1. 周期性拉取待监控任务Pending/Running避免遗漏与重复
/// 2. 批量从 Jellyfin 获取最近创建的媒体条目(/Items减少对 Jellyfin 的压力;
/// 3. 基于多种名称匹配策略(空格/下划线互换、去除非字母数字、子串匹配)进行快速匹配;
/// 4. 匹配命中后,事务地插入 VideoAction、更新 OEMRawUseUrlState/VideoUrl/VideoActionId并标记任务完成
/// 5. 配置缺失或暂未匹配成功时,等待并重试,保证循环健康稳定运行。
/// </summary>
public sealed class JellyfinMonitorQueueService
{
private static readonly Lazy<JellyfinMonitorQueueService> _lazy = new(() => new JellyfinMonitorQueueService());
public static JellyfinMonitorQueueService Instance => _lazy.Value;
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private CancellationTokenSource? _cts;
private Task? _loopTask;
private JellyfinMonitorQueueService() { }
/// <summary>
/// 启动 Jellyfin 监控后台循环(幂等)。
/// </summary>
/// <remarks>
/// - 周期性读取待监控任务Pending/Running批量拉取 Jellyfin Items 进行匹配;
/// - 匹配成功后,事务性地更新 OEMRawUse/VideoAction 与任务状态;
/// - 若配置缺失,将等待后重试,不会抛异常终止。
/// </remarks>
public void Start()
{
if (_cts != null) return; // 已启动
_cts = new CancellationTokenSource();
_loopTask = Task.Run(() => RunAsync(_cts.Token));
_logger.Info("[JellyfinMonitorQueue] 已启动");
}
/// <summary>
/// 停止 Jellyfin 监控后台循环(尽力而为)。
/// </summary>
public void Stop()
{
try { _cts?.Cancel(); } catch { }
finally { _cts = null; }
}
/// <summary>
/// 主循环:读取配置与待监控任务,批量拉取 Jellyfin Items 并逐条任务匹配。
/// </summary>
/// <param name="token">取消令牌。</param>
private async Task RunAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
// 读取 Jellyfin 基础配置,缺失时不抛异常,等待重试
var baseUrl = SafeGet("JellyfinBaseUrl");
var apiKey = SafeGet("JellyfinApiKey");
var parentId = SafeGet("JellyfinParentId");
if (string.IsNullOrWhiteSpace(baseUrl) || string.IsNullOrWhiteSpace(apiKey) || string.IsNullOrWhiteSpace(parentId))
{
_logger.Warn("[JellyfinMonitorQueue] 配置缺失,等待中...");
await Task.Delay(3000, token);
continue;
}
var db = FSqlContext.FDb;
// 批量拉取待监控任务(优先处理旧任务),避免一次性加载过多
var tasks = await db.Select<JellyfinMonitorTask>()
.Where(t => t.Status == TaskStatus.Pending || t.Status == TaskStatus.Running)
.OrderBy(t => t.Id)
.Limit(200)
.ToListAsync();
if (tasks.Count == 0)
{
await Task.Delay(1000, token);
continue;
}
// 拉取 Jellyfin 最新条目(可调大/分页这里先拉200
var jf = new JellyfinClient(new JellyfinClientOptions
{
BaseUrl = baseUrl,
ApiKey = apiKey,
UseQueryApiKey = true,
TimeoutMs = 10000,
UserAgent = "FATrace.OEMApp/1.0"
});
// 仅取 Items 列表进行快速匹配,降低反序列化成本
// 注意命名参数后不能再跟位置参数CancellationToken 也需命名传入
var items = await jf.GetItemsOnlyAsync(
parentId,
sortBy: "DateCreated",
sortOrder: "Descending",
limit: 200,
includeItemTypes: "Video,Movie",
cancellationToken: token);
foreach (var t in tasks)
{
if (token.IsCancellationRequested) break;
try
{
await ProcessTaskAsync(t, items, baseUrl!, apiKey!, token);
}
catch (Exception ex)
{
_logger.Error(ex, "[JellyfinMonitorQueue] 处理任务失败 Id={Id}", t.Id);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Error(ex, "[JellyfinMonitorQueue] 主循环异常");
try { await Task.Delay(2000, token); } catch { }
}
}
_logger.Info("[JellyfinMonitorQueue] 已停止");
}
/// <summary>
/// 处理单条 Jellyfin 监控任务:尝试与已拉取的 Items 匹配,成功后事务更新。
/// </summary>
/// <param name="t">待处理的监控任务。</param>
/// <param name="items">从 Jellyfin 拉取的最新 Items 列表。</param>
/// <param name="baseUrl">Jellyfin 基础地址。</param>
/// <param name="apiKey">Jellyfin API Key。</param>
/// <param name="token">取消令牌。</param>
private async Task ProcessTaskAsync(JellyfinMonitorTask t, System.Collections.Generic.IReadOnlyList<JellyfinItem> items, string baseUrl, string apiKey, CancellationToken token)
{
var db = FSqlContext.FDb;
// 更新状态为 Running
if (t.Status != TaskStatus.Running)
{
t.Status = TaskStatus.Running;
t.TryCount += 1;
t.UpdateTime = DateTime.Now;
await db.Update<JellyfinMonitorTask>()
.SetSource(t)
.UpdateColumns(a => new { a.Status, a.TryCount, a.UpdateTime })
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
}
// 针对本地文件名构造多个候选名称,提高匹配成功率
var candidates = BuildNameCandidates(Path.GetFileNameWithoutExtension(t.LocalFileNameOrPath) ?? t.LocalFileNameOrPath ?? string.Empty);
var match = items.FirstOrDefault(it => !string.IsNullOrWhiteSpace(it.Name) && NameMatches(candidates, it.Name));
if (match == null || string.IsNullOrWhiteSpace(match.Id))
{
// 未匹配,稍后重试
return;
}
var playUrl = BuildStreamUrl(baseUrl, match.Id!, apiKey);
// 查询 OEMRawUse
var oem = await db.Select<OEMRawUse>().Where(x => x.Id == t.OemRawUseId).FirstAsync();
if (oem == null)
{
await db.Update<JellyfinMonitorTask>()
.Set(a => a.Status, TaskStatus.Failed)
.Set(a => a.Error, "未找到 OEMRawUse")
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
return;
}
// 事务:插入 VideoAction & 更新 OEMRawUse & 更新监控任务
try
{
db.Ado.Transaction(() =>
{
// 1) 保存操作记录VideoAction
var action = new VideoAction
{
Code = t.Code,
User = t.User,
VideoFilePath = GetLocalPathIfExists(t.LocalFileNameOrPath!),
VideoName = match.Name,
// 使用下载任务的时间窗口作为视频的起止时间;兼容旧数据为空的情况
StartTime = t.NvrStartTime ?? DateTime.Now,
EndTime = t.NvrEndTime ?? DateTime.Now,
CreateTime = DateTime.Now
};
var actionId = (long)db.Insert<VideoAction>(action).ExecuteIdentity();
// 2) 标记 OEMRawUse 已有可播放地址,并关联 VideoAction
db.Update<OEMRawUse>()
.Set(a => new OEMRawUse
{
UrlState = true,
VideoUrl = playUrl,
VideoActionId = actionId,
RawName = t.RawName
})
.Where(a => a.Id == t.OemRawUseId)
.ExecuteAffrows();
// 3) 标记监控任务完成并记录匹配到的 Jellyfin ItemId
db.Update<JellyfinMonitorTask>()
.Set(a => a.Status, TaskStatus.Completed)
.Set(a => a.FoundItemId, match.Id)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrows();
});
_logger.Info("[JellyfinMonitorQueue] 任务完成 Id={Id}, ItemId={ItemId}", t.Id, match.Id);
}
catch (Exception ex)
{
_logger.Error(ex, "[JellyfinMonitorQueue] 数据库更新失败 Id={Id}", t.Id);
await db.Update<JellyfinMonitorTask>()
.Set(a => a.Status, TaskStatus.Failed)
.Set(a => a.Error, ex.Message)
.Set(a => a.UpdateTime, DateTime.Now)
.Where(a => a.Id == t.Id)
.ExecuteAffrowsAsync();
}
}
/// <summary>
/// 安全读取字符串配置。读取失败/缺失返回空串。
/// </summary>
private static string SafeGet(string key)
{
try { return ConfigHelper.GetStringOrDefault(key, string.Empty); } catch { return string.Empty; }
}
/// <summary>
/// 尝试将相对路径转换为应用目录下的绝对路径,并返回存在的路径;若均不存在则返回原值。
/// </summary>
private static string? GetLocalPathIfExists(string input)
{
try
{
if (string.IsNullOrWhiteSpace(input)) return null;
if (File.Exists(input)) return input;
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, input);
return File.Exists(path) ? path : input;
}
catch { return input; }
}
/// <summary>
/// 构造 Jellyfin 视频流播放 URLmp4
/// </summary>
private static string BuildStreamUrl(string baseUrl, string itemId, string apiKey)
{
baseUrl = baseUrl?.TrimEnd('/') ?? string.Empty;
var sb = new StringBuilder();
sb.Append(baseUrl).Append("/Videos/").Append(itemId).Append("/stream.mp4").Append("?api_key=").Append(Uri.EscapeDataString(apiKey));
return sb.ToString();
}
/// <summary>
/// 构造多种“候选名称”,提升匹配的鲁棒性(空格/下划线互换,去除非字母数字等)。
/// </summary>
private static HashSet<string> BuildNameCandidates(string name)
{
var set = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
if (string.IsNullOrWhiteSpace(name)) return set;
var n = name.Trim();
set.Add(n);
set.Add(n.Replace('_', ' '));
set.Add(n.Replace(' ', '_'));
set.Add(RemoveNonAlphaNumeric(n));
set.Add(RemoveNonAlphaNumeric(n.Replace('_', ' ')));
set.Add(RemoveNonAlphaNumeric(n.Replace(' ', '_')));
return set;
}
/// <summary>
/// 判断 Jellyfin 项名称是否与候选名称集合匹配(完全匹配或“去非字母数字”的子串匹配)。
/// </summary>
private static bool NameMatches(HashSet<string> candidates, string? remoteName)
{
if (string.IsNullOrWhiteSpace(remoteName)) return false;
if (candidates.Contains(remoteName)) return true;
var normalized = RemoveNonAlphaNumeric(remoteName);
if (candidates.Contains(normalized)) return true;
foreach (var c in candidates)
{
if (string.IsNullOrEmpty(c)) continue;
if (remoteName.IndexOf(c, StringComparison.OrdinalIgnoreCase) >= 0) return true;
if (normalized.IndexOf(RemoveNonAlphaNumeric(c), StringComparison.OrdinalIgnoreCase) >= 0) return true;
}
return false;
}
/// <summary>
/// 去除字符串中的非字母数字字符,用于弱化命名差异。
/// </summary>
private static string RemoveNonAlphaNumeric(string input)
{
if (string.IsNullOrWhiteSpace(input)) return input;
var sb = new StringBuilder(input.Length);
foreach (var ch in input)
{
if (char.IsLetterOrDigit(ch)) sb.Append(ch);
}
return sb.ToString();
}
}
}