357 lines
12 KiB
C#
357 lines
12 KiB
C#
using CapMachine.Wpf.ChannelModel;
|
||
using CapMachine.Wpf.Models;
|
||
using CapMachine.Wpf.Models.HighSpeed;
|
||
using CapMachine.Wpf.PrismEvent;
|
||
using CsvHelper.Configuration;
|
||
using CsvHelper;
|
||
using Prism.Mvvm;
|
||
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Globalization;
|
||
using System.IO;
|
||
using System.Linq;
|
||
using System.Reflection;
|
||
using System.Text;
|
||
using System.Threading.Channels;
|
||
using System.Threading.Tasks;
|
||
using System.Timers;
|
||
using CapMachine.Core;
|
||
using CapMachine.Model;
|
||
using System.Security.Cryptography.Xml;
|
||
using Prism.Ioc;
|
||
|
||
namespace CapMachine.Wpf.Services
|
||
{
|
||
/// <summary>
|
||
/// 高速数据
|
||
/// 通信的数据缓存
|
||
/// </summary>
|
||
public class HighSpeedDataService : BindableBase
|
||
{
|
||
public HighSpeedDataService(ConfigService configService, ILogService logService, IFreeSql FreeSql,IContainerProvider containerProvider)
|
||
{
|
||
ConfigService = configService;
|
||
LogService = logService;
|
||
this.FreeSql = FreeSql;
|
||
ContainerProvider = containerProvider;
|
||
CacheHighFragMsg = new List<CommMsg>();
|
||
|
||
//500ms 触发一次
|
||
CycleTimer = new System.Timers.Timer(500);
|
||
CycleTimer.Elapsed += CycleAction;
|
||
CycleTimer.AutoReset = true;
|
||
CycleTimer.Enabled = true;
|
||
|
||
//实时数据记录管道数据监听
|
||
Task.Run(() => ListenCycleChannelAction());
|
||
|
||
}
|
||
public ConfigService ConfigService { get; }
|
||
public ILogService LogService { get; }
|
||
public IFreeSql FreeSql { get; }
|
||
public IContainerProvider ContainerProvider { get; }
|
||
|
||
|
||
/// <summary>
|
||
/// 时间到
|
||
/// </summary>
|
||
/// <param name="sender"></param>
|
||
/// <param name="e"></param>
|
||
/// <exception cref="NotImplementedException"></exception>
|
||
// 保护 CurListMsg 的并发锁
|
||
private readonly object _curListMsgLock = new object();
|
||
|
||
/// <summary>
|
||
/// 触发刷盘的记录条数阈值(可根据业务容量调整,默认1万条)
|
||
/// </summary>
|
||
public int FlushRecordThreshold { get; set; } = 10000;
|
||
|
||
private void CycleAction(object? sender, ElapsedEventArgs e)
|
||
{
|
||
if (IsEnable)
|
||
{
|
||
// 将当前快照写入通道,避免共享同一 List 引用导致的并发访问问题
|
||
List<CommMsg> snapshot;
|
||
lock (_curListMsgLock)
|
||
{
|
||
snapshot = new List<CommMsg>(CurListMsg);
|
||
}
|
||
// 非阻塞写入通道;若通道满则丢弃最老数据(由通道配置控制)
|
||
CycleChannelInfo.Writer.TryWrite(snapshot);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 添加或者更新消息数据
|
||
/// </summary>
|
||
public void AppendOrUpdateMsg(CommMsg commMsg)
|
||
{
|
||
lock (_curListMsgLock)
|
||
{
|
||
// 以 Category+MsgInfo 作为唯一键,避免不同总线类别之间发生键冲突
|
||
int idx = CurListMsg.FindIndex(a => a.MsgInfo == commMsg.MsgInfo && a.Category == commMsg.Category);
|
||
if (idx < 0)
|
||
{
|
||
//不存在则新增
|
||
CurListMsg.Add(commMsg);
|
||
}
|
||
else
|
||
{
|
||
//存在则更新数据
|
||
var Data = CurListMsg[idx];
|
||
Data.MsgData = commMsg.MsgData;
|
||
Data.Time = commMsg.Time;
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 初始化高速数据
|
||
/// </summary>
|
||
public void InitHighData()
|
||
{
|
||
//报文重新清零
|
||
CurListMsg = new List<CommMsg>();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 当前的报文消息集合
|
||
/// </summary>
|
||
private List<CommMsg> CurListMsg { get; set; } = new List<CommMsg>();
|
||
//{
|
||
// new CommMsg() { Category = "CAN2",MsgInfo = "0x000003E3",Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),MsgData = "33 00 00 00 37 00 04 00"},
|
||
// new CommMsg() { Category = "CAN1",MsgInfo = "0x000003E3",Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),MsgData = "33 00 00 00 37 00 04 00"},
|
||
// new CommMsg() { Category = "CAN2",MsgInfo = "0x000003E3",Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),MsgData = "33 00 00 00 37 00 04 00"},
|
||
// new CommMsg() { Category = "CAN3",MsgInfo = "0x000003E3",Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),MsgData = "33 00 00 00 37 00 04 00"},
|
||
// new CommMsg() { Category = "CAN4",MsgInfo = "0x000003E3",Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),MsgData = "33 00 00 00 37 00 04 00"},
|
||
// new CommMsg() { Category = "CAN5",MsgInfo = "0x000003E3",Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),MsgData = "33 00 00 00 37 00 04 00"},
|
||
//};
|
||
|
||
/// <summary>
|
||
/// 周期定时器
|
||
/// </summary>
|
||
private System.Timers.Timer CycleTimer { get; set; }
|
||
|
||
|
||
private bool _IsEnable=true;
|
||
/// <summary>
|
||
/// 是否启用
|
||
/// </summary>
|
||
public bool IsEnable
|
||
{
|
||
get { return _IsEnable; }
|
||
set { _IsEnable = value; RaisePropertyChanged(); }
|
||
}
|
||
|
||
|
||
#region 队列的操作
|
||
|
||
/// <summary>
|
||
/// 缓存数据
|
||
/// </summary>
|
||
public List<CommMsg> CacheHighFragMsg { get; set; }
|
||
|
||
|
||
/// <summary>
|
||
/// 队列通道
|
||
/// 当前队列消费周期触发的数据记录
|
||
/// </summary>
|
||
public Channel<List<CommMsg>> CycleChannelInfo = Channel.CreateBounded<List<CommMsg>>(new BoundedChannelOptions(100000)
|
||
{
|
||
FullMode = BoundedChannelFullMode.DropOldest, //消息满了 等待消费 新的消息不插入
|
||
SingleWriter = false,//允许一次写入多条数据
|
||
SingleReader = true, //一次只能读取一条消息
|
||
|
||
});
|
||
|
||
/// <summary>
|
||
/// 缓存的集合数据
|
||
/// </summary>
|
||
public List<RecordChannelData> CacheRecordData { get; set; } = new List<RecordChannelData>();
|
||
|
||
/// <summary>
|
||
/// 当前缓存的队列信息
|
||
/// 高效的队列数据
|
||
/// </summary>
|
||
public ConcurrentQueue<RecordChannelData> ConcurrentQueueData { get; set; } = new ConcurrentQueue<RecordChannelData>();
|
||
|
||
|
||
/// <summary>
|
||
/// 最大的缓存单元数据
|
||
/// 存储超过规定600个缓存数据后进行判断后删除
|
||
/// </summary>
|
||
private int MaxCacheCellCount { get; set; } = 0;
|
||
|
||
/// <summary>
|
||
/// 数据记录管道监听方法
|
||
/// </summary>
|
||
/// <exception cref="NotImplementedException"></exception>
|
||
private async void ListenCycleChannelAction()
|
||
{
|
||
while (await CycleChannelInfo.Reader.WaitToReadAsync())
|
||
{
|
||
if (CycleChannelInfo.Reader.TryRead(out var CycleChannelData))
|
||
{
|
||
////第一次计时
|
||
//stopwatch.Start(); //启动Stopwatch
|
||
|
||
// 新增数据,按条数累积以控制刷盘频率
|
||
CacheHighFragMsg.AddRange(CycleChannelData);
|
||
MaxCacheCellCount += CycleChannelData.Count;
|
||
|
||
if (MaxCacheCellCount >= FlushRecordThreshold)
|
||
{
|
||
//CSV文件保存
|
||
SaveToCsv(CacheHighFragMsg);
|
||
CacheHighFragMsg.Clear();
|
||
MaxCacheCellCount = 0;
|
||
}
|
||
|
||
//}
|
||
|
||
//CacheRecordData.Clear();
|
||
|
||
//Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss:fff")}-{LineName}-保存成功!");
|
||
//}
|
||
|
||
|
||
//stopwatch.Stop(); //停止Stopwatch
|
||
//Console.WriteLine("保存数据耗时::{0}", stopwatch.Elapsed.TotalSeconds.ToString());
|
||
//stopwatch.Reset();
|
||
}
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
|
||
#region CSV Helper 数据保存CSV
|
||
|
||
/// <summary>
|
||
/// 当前的小时文件信息
|
||
/// </summary>
|
||
private string CurHourFileName { get; set; }
|
||
|
||
/// <summary>
|
||
/// CSV文件保存根路径
|
||
/// </summary>
|
||
private string SaveCsvRootPath { get; set; }
|
||
|
||
/// <summary>
|
||
/// Append to the file.
|
||
/// </summary>
|
||
CsvConfiguration CSVconfig { get; set; } = new CsvConfiguration(CultureInfo.CurrentCulture)
|
||
{
|
||
// Don't write the header again.
|
||
HasHeaderRecord = false,
|
||
};
|
||
|
||
|
||
/// <summary>
|
||
/// 保存到CSV文件
|
||
/// </summary>
|
||
private void SaveToCsv(List<CommMsg> CacheData)
|
||
{
|
||
CurHourFileName = DateTime.Now.ToString("yyyy-MM-dd HH");
|
||
|
||
//年份的文件是否存在
|
||
var FilePath = HighSpeedFileHelper.GetExpFilePath(ConfigService.HighSpeedMsgSaveCsvRootPath, "ConfigService.CurExpInfo.Name");
|
||
if (!Directory.Exists(FilePath))
|
||
{
|
||
Directory.CreateDirectory(FilePath);
|
||
}
|
||
|
||
//文件全部路径信息
|
||
var FileFullInfo = HighSpeedFileHelper.GetHourCSVPath(FilePath, CurHourFileName);
|
||
if (!File.Exists(FileFullInfo))//是否存在文件
|
||
{
|
||
//新增文件
|
||
lock (ConfigService.HighSpeedMsgCsvFileLock)
|
||
{
|
||
//新增的文件
|
||
using (var writer = new StreamWriter(FileFullInfo, false, Encoding.UTF8))
|
||
using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture))
|
||
{
|
||
var Result = csv.Context.RegisterClassMap<HighRecordMsgMap>();
|
||
|
||
csv.WriteRecords(CacheData);
|
||
}
|
||
}
|
||
|
||
|
||
//先保存当前的文件信息
|
||
FreeSql.Insert<HighMsgFileInfo>(new HighMsgFileInfo()
|
||
{
|
||
ExpName = "ConfigService.CurExpInfo.Name",
|
||
FileInfo = CurHourFileName,
|
||
FilePath = FileFullInfo
|
||
}).ExecuteInserted();
|
||
|
||
//删除时间范围之外的文件
|
||
var DeleteInfo = FreeSql.Delete<HighMsgFileInfo>().Where(a => a.CreateTime < DateTime.Now.AddHours(-ConfigService.HighSpeedMsgCsvFileCacheHour)).ExecuteDeleted();
|
||
if (DeleteInfo.Count > 0)
|
||
{
|
||
//DB删除成功的话,开始删除实体文件
|
||
foreach (var item in DeleteInfo)
|
||
{
|
||
DeleteFile(item.FilePath);
|
||
}
|
||
}
|
||
}
|
||
else//存在文件的话则新增数据
|
||
{
|
||
lock (ConfigService.HighSpeedMsgCsvFileLock)
|
||
{
|
||
//往已有的文件增加数据
|
||
using (var stream = File.Open(FileFullInfo, FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
|
||
//using (var stream = new FileStream(FileFullInfo, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite))
|
||
using (var writer = new StreamWriter(stream))
|
||
using (var csv = new CsvWriter(writer, CSVconfig))
|
||
{
|
||
csv.Context.RegisterClassMap<HighRecordMsgMap>();
|
||
csv.WriteRecords(CacheData);
|
||
}
|
||
}
|
||
|
||
}
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 删除文件
|
||
/// </summary>
|
||
/// <param name="FilePath"></param>
|
||
private bool DeleteFile(string FilePath)
|
||
{
|
||
try
|
||
{
|
||
if (File.Exists(FilePath))
|
||
{
|
||
File.Delete(FilePath);
|
||
Console.WriteLine("文件已成功删除");
|
||
return true;
|
||
}
|
||
else
|
||
{
|
||
Console.WriteLine("文件不存在");
|
||
return true;
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine("删除文件时发生错误: " + ex.Message);
|
||
return false;
|
||
}
|
||
|
||
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
|
||
}
|
||
|