using HslCommunication; using HslCommunication.MQTT; using NLog; using OrpaonEMS.App.Models; using OrpaonEMS.Core; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows; using System.Text.Json; using Prism.Mvvm; namespace OrpaonEMS.App.Services { public class MqttDataClientService : BindableBase { /// /// MqttClientDataServer /// public MqttDataClientService(IFreeSql freeSql, ILogService logService, ConfigDataService configDataService, BmsDataService bmsDataService, SolarEnergyService solarEnergyService, InPowerPCSDataService inPowerPCSDataService) { FreeSql = freeSql; LogService = logService; ConfigDataService = configDataService; BmsDataService = bmsDataService; SolarEnergyService = solarEnergyService; InPowerPCSDataService = inPowerPCSDataService; if (ConfigDataService.IsMaster) { //MqttClientInit(); //CycleSendStart(); } } /// /// 订阅 /// private string Topic { get; set; } = string.Empty; /// /// Mqtt客户端 /// private MqttClient mqttClient { get; set; } private bool _MqttClientState = false; /// /// Mqtt连接状态 /// private bool MqttClientState { get { return _MqttClientState; } set { if (_MqttClientState != value) { if (value) { MqttClientStateMsg = "正常"; } else { MqttClientStateMsg = "失败"; } _MqttClientState = value; } _MqttClientState = value; } } private string _MqttClientStateMsg; /// /// Mqttt连接状态消息 /// public string MqttClientStateMsg { get { return _MqttClientStateMsg; } set { _MqttClientStateMsg = value; RaisePropertyChanged(); } } private Random Random = new Random(); /// /// 指令 /// private string[] Quality = new string[] { "Good", "Bad" }; /// /// Mqtt线程使能 /// private bool MqttThrendEnable { get; set; } = true; /// /// ScanDevice扫描Task /// static Task CycleSendTask { get; set; } public IFreeSql FreeSql { get; } public ILogService LogService { get; } public ConfigDataService ConfigDataService { get; } public BmsDataService BmsDataService { get; } public SolarEnergyService SolarEnergyService { get; } public InPowerPCSDataService InPowerPCSDataService { get; } /// /// 当前发布的数据 /// public PubData CurPubData { get; set; } = new PubData(); /// ///扫描 /// private void CycleSendStart() { Thread.Sleep(1000); CycleSendTask = Task.Run(async () => { //延后5秒 await Task.Delay(5000); while (MqttThrendEnable) { await Task.Delay(5000); GetPubData(); //发布数据 OperateResult send = mqttClient.PublishMessage(new MqttApplicationMessage() { QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce, Topic = Topic, Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(CurPubData)) }); if (!send.IsSuccess) { var DATA = 1; //_Logger.Error($"发送失败:{send.Message}"); } } }); } /// /// 获取PubData数据 /// /// private void GetPubData() { CurPubData.BmsAccCharg = BmsDataService.BmsAccCharg.RtValue; CurPubData.BmsAccDisCharg = BmsDataService.BmsAccDisCharg.RtValue; CurPubData.BmsCur = BmsDataService.BmsCur.RtValue; CurPubData.BmsVol = BmsDataService.BmsVol.RtValue; CurPubData.BmsSOC = BmsDataService.BmsSOC.RtValue; CurPubData.BmsSOH = BmsDataService.BmsSOH.RtValue; CurPubData.BmsSOE = BmsDataService.BmsSOE.RtValue; CurPubData.BmsResP = BmsDataService.BmsResP.RtValue; CurPubData.BmsResN = BmsDataService.BmsResN.RtValue; //CurPubData.BmsPw = BmsDataService.BmsPw.RtValue; CurPubData.BmsBatState = BmsDataService.BmsBatStateMsg; CurPubData.MaxChargePower = BmsDataService.MaxChargePowerCell.RtValue; CurPubData.MaxDisChargePower = BmsDataService.MaxDisChargePowerCell.RtValue; CurPubData.PcsPower = InPowerPCSDataService.Power; CurPubData.PcsTotalReactivePw = InPowerPCSDataService.TotalReactivePw; CurPubData.PcsTotalApparentPw = InPowerPCSDataService.TotalApparentPw; CurPubData.PcsInputVol = InPowerPCSDataService.InputVol; CurPubData.PcsInputCur = InPowerPCSDataService.InputCur; CurPubData.PcsInputPw = InPowerPCSDataService.InputPw; CurPubData.PcsCurAlarmStateStr = InPowerPCSDataService.PcsRunState.PcsStateMsg; CurPubData.PcsAVol = InPowerPCSDataService.AVol; CurPubData.PcsBVol = InPowerPCSDataService.BVol; CurPubData.PcsCVol = InPowerPCSDataService.CVol; CurPubData.PcsACur = InPowerPCSDataService.ACur; CurPubData.PcsBCur = InPowerPCSDataService.BCur; CurPubData.PcsCCur = InPowerPCSDataService.CCur; CurPubData.PCSFaultStateStr = InPowerPCSDataService.PcsRunState.PcsSysFaultStateMsg; CurPubData.SolarAccPw = SolarEnergyService.AccPw; CurPubData.SolarDayPw = SolarEnergyService.AccDayPw; CurPubData.SolarEfficiency = SolarEnergyService.Efficiency; //CurPubData.SolarInternalTemp = SolarEnergyService.te; //CurPubData.SolarMonthPw = SolarEnergyService.Mon; CurPubData.SolarPw = SolarEnergyService.Power; CurPubData.SolarPwFactor = SolarEnergyService.PwFactor; CurPubData.SolarReactivePw = SolarEnergyService.ReactivePw; //CurPubData.SolarResV = SolarEnergyService.res; //CurPubData.SolarState = SolarEnergyService.BmsAccCharg.RtValue; CurPubData.LoadMeterPw = BmsDataService.BmsAccCharg.RtValue; //CurPubData.XX = BmsDataService.BmsAccCharg.RtValue; //CurPubData.XX = BmsDataService.BmsAccCharg.RtValue; //CurPubData.XX = BmsDataService.BmsAccCharg.RtValue; } /// /// Mqtt初始化 /// private async void MqttClientInit() { try { Topic = ConfigDataService.MqttTopic; MqttConnectionOptions options = new MqttConnectionOptions() { IpAddress = ConfigDataService.MqttServerUrl,//IpAddress = "www.orpaon.com", Port = ConfigDataService.MqttServerPort, ClientId = ConfigDataService.MqttClientId, KeepAlivePeriod = TimeSpan.FromSeconds(int.Parse("100")), }; options.Credentials = new MqttCredential(ConfigDataService.MqttUser, ConfigDataService.MqttPwd); mqttClient?.ConnectClose(); mqttClient = new MqttClient(options); mqttClient.LogNet = new HslCommunication.LogNet.LogNetSingle(string.Empty); //mqttClient.LogNet.BeforeSaveToFile += LogNet_BeforeSaveToFileByMqttClient; mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived; mqttClient.OnNetworkError += MqttClient_OnNetworkError; OperateResult connect = mqttClient.ConnectServer(); if (connect.IsSuccess) { //panel2.Enabled = true; //button1.Enabled = false; //button2.Enabled = true; //panel2.Enabled = true; MqttClientState = true; //MessageBox.Show(StringResources.Language.ConnectServerSuccess); //txtMsg.AppendText($"MqttServer 连接成功! {Environment.NewLine}"); //logger.Info($"时间:{DateTime.Now.ToString()} - MqttServer 连接成功"); //立刻订阅数据 OperateResult send = mqttClient.SubscribeMessage(Topic); if (send.IsSuccess) { //TxtMqttMessage.AppendText("Mqtt 客户端订阅成功" + Environment.NewLine); } else { MessageBox.Show("Send Failed:" + send.Message); } } else { MqttClientState = false; //button1.Enabled = true; MessageBox.Show(connect.Message); } } catch (Exception ex) { LogService.Error(String.Format("ErrSource : {0} ErrMsg : {1}", ex.StackTrace.ToString(), ex.Message.ToString())); } } /// /// Mqtt连接关闭 /// public void MqttClientClose() { OperateResult send = mqttClient.UnSubscribeMessage(Topic); //logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt连接关闭"); MqttClientState = false; mqttClient.ConnectClose(); } /// /// Mqtt订阅主题 /// public void MqttClientSubscribe() { OperateResult send = mqttClient.SubscribeMessage(Topic); if (send.IsSuccess) { //logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt订阅主题"); //btnMqttSubscribe.Enabled = false; //btnMqttUnSubscribe.Enabled = true; //TxtMqttMessage.AppendText("Mqtt 客户端订阅成功" + Environment.NewLine); } else { MessageBox.Show("Send Failed:" + send.Message); } } /// /// Mqtt取消订阅主题 /// public void MqttClientUnSubscribe() { OperateResult send = mqttClient.UnSubscribeMessage(Topic); if (send.IsSuccess) { //logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt取消订阅主题"); //btnMqttSubscribe.Enabled = true; //btnMqttUnSubscribe.Enabled = false; //TxtMqttMessage.AppendText("Mqtt 客户端取消订阅" + Environment.NewLine); } else { MessageBox.Show("Send Failed:" + send.Message); } } private void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message) { //var data = message; } private void MqttClient_OnNetworkError(object sender, EventArgs e) { //logger.Info($"时间:{DateTime.Now.ToString()} - MqttClient_OnNetworkError"); // 当网络异常的时候触发,可以在此处重连服务器 if (sender is MqttClient client) { // 开始重连服务器,直到连接成功为止 client.LogNet?.WriteInfo("网络异常,准备10秒后重新连接。"); while (true) { // 每隔10秒重连 System.Threading.Thread.Sleep(10_000); client.LogNet?.WriteInfo("准备重新连接服务器..."); OperateResult connect = client.ConnectServer(); if (connect.IsSuccess) { // 连接成功后,可以在下方break之前进行订阅,或是数据初始化操作 client.LogNet?.WriteInfo("连接服务器成功!"); break; } client.LogNet?.WriteInfo("连接失败,准备10秒后重新连接。"); } } } } }