328 lines
14 KiB
C#
328 lines
14 KiB
C#
using System;
|
||
using System.Collections.Generic;
|
||
using System.IO;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using FATrace.Com;
|
||
using FATrace.Model;
|
||
using NLog;
|
||
using FATrace.OEMApp;
|
||
using FATrace.OEMApp.Model.Jellyfin;
|
||
using TaskStatus = FATrace.Model.TaskStatus;
|
||
|
||
namespace FATrace.OEMApp.Services
|
||
{
|
||
/// <summary>
|
||
/// Jellyfin 批量监控服务(单例)。
|
||
/// 职责:
|
||
/// 1. 周期性拉取待监控任务(Pending/Running),避免遗漏与重复;
|
||
/// 2. 批量从 Jellyfin 获取最近创建的媒体条目(/Items),减少对 Jellyfin 的压力;
|
||
/// 3. 基于多种名称匹配策略(空格/下划线互换、去除非字母数字、子串匹配)进行快速匹配;
|
||
/// 4. 匹配命中后,事务地插入 VideoAction、更新 OEMRawUse(UrlState/VideoUrl/VideoActionId)并标记任务完成;
|
||
/// 5. 配置缺失或暂未匹配成功时,等待并重试,保证循环健康稳定运行。
|
||
/// </summary>
|
||
public sealed class JellyfinMonitorQueueService
|
||
{
|
||
private static readonly Lazy<JellyfinMonitorQueueService> _lazy = new(() => new JellyfinMonitorQueueService());
|
||
public static JellyfinMonitorQueueService Instance => _lazy.Value;
|
||
|
||
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
|
||
private CancellationTokenSource? _cts;
|
||
private Task? _loopTask;
|
||
|
||
private JellyfinMonitorQueueService() { }
|
||
|
||
/// <summary>
|
||
/// 启动 Jellyfin 监控后台循环(幂等)。
|
||
/// </summary>
|
||
/// <remarks>
|
||
/// - 周期性读取待监控任务(Pending/Running),批量拉取 Jellyfin Items 进行匹配;
|
||
/// - 匹配成功后,事务性地更新 OEMRawUse/VideoAction 与任务状态;
|
||
/// - 若配置缺失,将等待后重试,不会抛异常终止。
|
||
/// </remarks>
|
||
public void Start()
|
||
{
|
||
if (_cts != null) return; // 已启动
|
||
_cts = new CancellationTokenSource();
|
||
_loopTask = Task.Run(() => RunAsync(_cts.Token));
|
||
_logger.Info("[JellyfinMonitorQueue] 已启动");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 停止 Jellyfin 监控后台循环(尽力而为)。
|
||
/// </summary>
|
||
public void Stop()
|
||
{
|
||
try { _cts?.Cancel(); } catch { }
|
||
finally { _cts = null; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 主循环:读取配置与待监控任务,批量拉取 Jellyfin Items 并逐条任务匹配。
|
||
/// </summary>
|
||
/// <param name="token">取消令牌。</param>
|
||
private async Task RunAsync(CancellationToken token)
|
||
{
|
||
while (!token.IsCancellationRequested)
|
||
{
|
||
try
|
||
{
|
||
// 读取 Jellyfin 基础配置,缺失时不抛异常,等待重试
|
||
var baseUrl = SafeGet("JellyfinBaseUrl");
|
||
var apiKey = SafeGet("JellyfinApiKey");
|
||
var parentId = SafeGet("JellyfinParentId");
|
||
if (string.IsNullOrWhiteSpace(baseUrl) || string.IsNullOrWhiteSpace(apiKey) || string.IsNullOrWhiteSpace(parentId))
|
||
{
|
||
_logger.Warn("[JellyfinMonitorQueue] 配置缺失,等待中...");
|
||
await Task.Delay(3000, token);
|
||
continue;
|
||
}
|
||
|
||
var db = FSqlContext.FDb;
|
||
// 批量拉取待监控任务(优先处理旧任务),避免一次性加载过多
|
||
var tasks = await db.Select<JellyfinMonitorTask>()
|
||
.Where(t => t.Status == TaskStatus.Pending || t.Status == TaskStatus.Running)
|
||
.OrderBy(t => t.Id)
|
||
.Limit(200)
|
||
.ToListAsync();
|
||
|
||
if (tasks.Count == 0)
|
||
{
|
||
await Task.Delay(1000, token);
|
||
continue;
|
||
}
|
||
|
||
// 拉取 Jellyfin 最新条目(可调大/分页,这里先拉200)
|
||
var jf = new JellyfinClient(new JellyfinClientOptions
|
||
{
|
||
BaseUrl = baseUrl,
|
||
ApiKey = apiKey,
|
||
UseQueryApiKey = true,
|
||
TimeoutMs = 10000,
|
||
UserAgent = "FATrace.OEMApp/1.0"
|
||
});
|
||
|
||
// 仅取 Items 列表进行快速匹配,降低反序列化成本
|
||
// 注意:命名参数后不能再跟位置参数,CancellationToken 也需命名传入
|
||
var items = await jf.GetItemsOnlyAsync(
|
||
parentId,
|
||
sortBy: "DateCreated",
|
||
sortOrder: "Descending",
|
||
limit: 200,
|
||
includeItemTypes: "Video,Movie",
|
||
cancellationToken: token);
|
||
|
||
foreach (var t in tasks)
|
||
{
|
||
if (token.IsCancellationRequested) break;
|
||
try
|
||
{
|
||
await ProcessTaskAsync(t, items, baseUrl!, apiKey!, token);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.Error(ex, "[JellyfinMonitorQueue] 处理任务失败 Id={Id}", t.Id);
|
||
}
|
||
}
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
break;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.Error(ex, "[JellyfinMonitorQueue] 主循环异常");
|
||
try { await Task.Delay(2000, token); } catch { }
|
||
}
|
||
}
|
||
_logger.Info("[JellyfinMonitorQueue] 已停止");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 处理单条 Jellyfin 监控任务:尝试与已拉取的 Items 匹配,成功后事务更新。
|
||
/// </summary>
|
||
/// <param name="t">待处理的监控任务。</param>
|
||
/// <param name="items">从 Jellyfin 拉取的最新 Items 列表。</param>
|
||
/// <param name="baseUrl">Jellyfin 基础地址。</param>
|
||
/// <param name="apiKey">Jellyfin API Key。</param>
|
||
/// <param name="token">取消令牌。</param>
|
||
private async Task ProcessTaskAsync(JellyfinMonitorTask t, System.Collections.Generic.IReadOnlyList<JellyfinItem> items, string baseUrl, string apiKey, CancellationToken token)
|
||
{
|
||
var db = FSqlContext.FDb;
|
||
// 更新状态为 Running
|
||
if (t.Status != TaskStatus.Running)
|
||
{
|
||
t.Status = TaskStatus.Running;
|
||
t.TryCount += 1;
|
||
t.UpdateTime = DateTime.Now;
|
||
await db.Update<JellyfinMonitorTask>()
|
||
.SetSource(t)
|
||
.UpdateColumns(a => new { a.Status, a.TryCount, a.UpdateTime })
|
||
.Where(a => a.Id == t.Id)
|
||
.ExecuteAffrowsAsync();
|
||
}
|
||
|
||
// 针对本地文件名构造多个候选名称,提高匹配成功率
|
||
var candidates = BuildNameCandidates(Path.GetFileNameWithoutExtension(t.LocalFileNameOrPath) ?? t.LocalFileNameOrPath ?? string.Empty);
|
||
var match = items.FirstOrDefault(it => !string.IsNullOrWhiteSpace(it.Name) && NameMatches(candidates, it.Name));
|
||
if (match == null || string.IsNullOrWhiteSpace(match.Id))
|
||
{
|
||
// 未匹配,稍后重试
|
||
return;
|
||
}
|
||
|
||
var playUrl = BuildStreamUrl(baseUrl, match.Id!, apiKey);
|
||
|
||
// 查询 OEMRawUse
|
||
var oem = await db.Select<OEMRawUse>().Where(x => x.Id == t.OemRawUseId).FirstAsync();
|
||
if (oem == null)
|
||
{
|
||
await db.Update<JellyfinMonitorTask>()
|
||
.Set(a => a.Status, TaskStatus.Failed)
|
||
.Set(a => a.Error, "未找到 OEMRawUse")
|
||
.Set(a => a.UpdateTime, DateTime.Now)
|
||
.Where(a => a.Id == t.Id)
|
||
.ExecuteAffrowsAsync();
|
||
return;
|
||
}
|
||
|
||
// 事务:插入 VideoAction & 更新 OEMRawUse & 更新监控任务
|
||
try
|
||
{
|
||
db.Ado.Transaction(() =>
|
||
{
|
||
// 1) 保存操作记录(VideoAction)
|
||
var action = new VideoAction
|
||
{
|
||
Code = t.Code,
|
||
User = t.User,
|
||
VideoFilePath = GetLocalPathIfExists(t.LocalFileNameOrPath!),
|
||
VideoName = match.Name,
|
||
// 使用下载任务的时间窗口作为视频的起止时间;兼容旧数据为空的情况
|
||
StartTime = t.NvrStartTime ?? DateTime.Now,
|
||
EndTime = t.NvrEndTime ?? DateTime.Now,
|
||
CreateTime = DateTime.Now
|
||
};
|
||
var actionId = (long)db.Insert<VideoAction>(action).ExecuteIdentity();
|
||
|
||
// 2) 标记 OEMRawUse 已有可播放地址,并关联 VideoAction
|
||
db.Update<OEMRawUse>()
|
||
.Set(a => new OEMRawUse
|
||
{
|
||
//UrlState = true,
|
||
//VideoUrl = playUrl,
|
||
//VideoActionId = actionId,
|
||
RawName = t.RawName
|
||
})
|
||
.Where(a => a.Id == t.OemRawUseId)
|
||
.ExecuteAffrows();
|
||
|
||
// 3) 标记监控任务完成并记录匹配到的 Jellyfin ItemId
|
||
db.Update<JellyfinMonitorTask>()
|
||
.Set(a => a.Status, TaskStatus.Completed)
|
||
.Set(a => a.FoundItemId, match.Id)
|
||
.Set(a => a.UpdateTime, DateTime.Now)
|
||
.Where(a => a.Id == t.Id)
|
||
.ExecuteAffrows();
|
||
});
|
||
_logger.Info("[JellyfinMonitorQueue] 任务完成 Id={Id}, ItemId={ItemId}", t.Id, match.Id);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.Error(ex, "[JellyfinMonitorQueue] 数据库更新失败 Id={Id}", t.Id);
|
||
await db.Update<JellyfinMonitorTask>()
|
||
.Set(a => a.Status, TaskStatus.Failed)
|
||
.Set(a => a.Error, ex.Message)
|
||
.Set(a => a.UpdateTime, DateTime.Now)
|
||
.Where(a => a.Id == t.Id)
|
||
.ExecuteAffrowsAsync();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 安全读取字符串配置。读取失败/缺失返回空串。
|
||
/// </summary>
|
||
private static string SafeGet(string key)
|
||
{
|
||
try { return ConfigHelper.GetStringOrDefault(key, string.Empty); } catch { return string.Empty; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 尝试将相对路径转换为应用目录下的绝对路径,并返回存在的路径;若均不存在则返回原值。
|
||
/// </summary>
|
||
private static string? GetLocalPathIfExists(string input)
|
||
{
|
||
try
|
||
{
|
||
if (string.IsNullOrWhiteSpace(input)) return null;
|
||
if (File.Exists(input)) return input;
|
||
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, input);
|
||
return File.Exists(path) ? path : input;
|
||
}
|
||
catch { return input; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 构造 Jellyfin 视频流播放 URL(mp4)。
|
||
/// </summary>
|
||
private static string BuildStreamUrl(string baseUrl, string itemId, string apiKey)
|
||
{
|
||
baseUrl = baseUrl?.TrimEnd('/') ?? string.Empty;
|
||
var sb = new StringBuilder();
|
||
sb.Append(baseUrl).Append("/Videos/").Append(itemId).Append("/stream.mp4").Append("?api_key=").Append(Uri.EscapeDataString(apiKey));
|
||
return sb.ToString();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 构造多种“候选名称”,提升匹配的鲁棒性(空格/下划线互换,去除非字母数字等)。
|
||
/// </summary>
|
||
private static HashSet<string> BuildNameCandidates(string name)
|
||
{
|
||
var set = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||
if (string.IsNullOrWhiteSpace(name)) return set;
|
||
var n = name.Trim();
|
||
set.Add(n);
|
||
set.Add(n.Replace('_', ' '));
|
||
set.Add(n.Replace(' ', '_'));
|
||
set.Add(RemoveNonAlphaNumeric(n));
|
||
set.Add(RemoveNonAlphaNumeric(n.Replace('_', ' ')));
|
||
set.Add(RemoveNonAlphaNumeric(n.Replace(' ', '_')));
|
||
return set;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 判断 Jellyfin 项名称是否与候选名称集合匹配(完全匹配或“去非字母数字”的子串匹配)。
|
||
/// </summary>
|
||
private static bool NameMatches(HashSet<string> candidates, string? remoteName)
|
||
{
|
||
if (string.IsNullOrWhiteSpace(remoteName)) return false;
|
||
if (candidates.Contains(remoteName)) return true;
|
||
var normalized = RemoveNonAlphaNumeric(remoteName);
|
||
if (candidates.Contains(normalized)) return true;
|
||
foreach (var c in candidates)
|
||
{
|
||
if (string.IsNullOrEmpty(c)) continue;
|
||
if (remoteName.IndexOf(c, StringComparison.OrdinalIgnoreCase) >= 0) return true;
|
||
if (normalized.IndexOf(RemoveNonAlphaNumeric(c), StringComparison.OrdinalIgnoreCase) >= 0) return true;
|
||
}
|
||
return false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 去除字符串中的非字母数字字符,用于弱化命名差异。
|
||
/// </summary>
|
||
private static string RemoveNonAlphaNumeric(string input)
|
||
{
|
||
if (string.IsNullOrWhiteSpace(input)) return input;
|
||
var sb = new StringBuilder(input.Length);
|
||
foreach (var ch in input)
|
||
{
|
||
if (char.IsLetterOrDigit(ch)) sb.Append(ch);
|
||
}
|
||
return sb.ToString();
|
||
}
|
||
}
|
||
}
|