using HslCommunication;
using HslCommunication.MQTT;
using NLog;
using OrpaonEMS.App.Models;
using OrpaonEMS.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Text.Json;
using Prism.Mvvm;
namespace OrpaonEMS.App.Services
{
public class MqttDataClientService : BindableBase
{
///
/// MqttClientDataServer
///
public MqttDataClientService(IFreeSql freeSql, ILogService logService, ConfigDataService configDataService, BmsDataService bmsDataService,
SolarEnergyService solarEnergyService,
InPowerPCSDataService inPowerPCSDataService)
{
FreeSql = freeSql;
LogService = logService;
ConfigDataService = configDataService;
BmsDataService = bmsDataService;
SolarEnergyService = solarEnergyService;
InPowerPCSDataService = inPowerPCSDataService;
if (ConfigDataService.IsMaster)
{
MqttClientInit();
CycleSendStart();
}
}
///
/// 订阅
///
private string Topic { get; set; } = string.Empty;
///
/// Mqtt客户端
///
private MqttClient mqttClient { get; set; }
private bool _MqttClientState = false;
///
/// Mqtt连接状态
///
private bool MqttClientState
{
get { return _MqttClientState; }
set
{
if (_MqttClientState != value)
{
if (value)
{
MqttClientStateMsg = "正常";
}
else
{
MqttClientStateMsg = "失败";
}
_MqttClientState = value;
}
_MqttClientState = value;
}
}
private string _MqttClientStateMsg;
///
/// Mqttt连接状态消息
///
public string MqttClientStateMsg
{
get { return _MqttClientStateMsg; }
set { _MqttClientStateMsg = value; RaisePropertyChanged(); }
}
private Random Random = new Random();
///
/// 指令
///
private string[] Quality = new string[] { "Good", "Bad" };
///
/// Mqtt线程使能
///
private bool MqttThrendEnable { get; set; } = true;
///
/// ScanDevice扫描Task
///
static Task CycleSendTask { get; set; }
public IFreeSql FreeSql { get; }
public ILogService LogService { get; }
public ConfigDataService ConfigDataService { get; }
public BmsDataService BmsDataService { get; }
public SolarEnergyService SolarEnergyService { get; }
public InPowerPCSDataService InPowerPCSDataService { get; }
///
/// 当前发布的数据
///
public PubData CurPubData { get; set; } = new PubData();
///
///扫描
///
private void CycleSendStart()
{
Thread.Sleep(1000);
CycleSendTask = Task.Run(async () =>
{
//延后5秒
await Task.Delay(5000);
while (MqttThrendEnable)
{
await Task.Delay(5000);
GetPubData();
//发布数据
OperateResult send = mqttClient.PublishMessage(new MqttApplicationMessage()
{
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce,
Topic = Topic,
Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(CurPubData))
});
if (!send.IsSuccess)
{
var DATA = 1;
//_Logger.Error($"发送失败:{send.Message}");
}
}
});
}
///
/// 获取PubData数据
///
///
private void GetPubData()
{
CurPubData.BmsAccCharg = BmsDataService.BmsAccCharg.RtValue;
CurPubData.BmsAccDisCharg = BmsDataService.BmsAccDisCharg.RtValue;
CurPubData.BmsCur = BmsDataService.BmsCur.RtValue;
CurPubData.BmsVol = BmsDataService.BmsVol.RtValue;
CurPubData.BmsSOC = BmsDataService.BmsSOC.RtValue;
CurPubData.BmsSOH = BmsDataService.BmsSOH.RtValue;
CurPubData.BmsSOE = BmsDataService.BmsSOE.RtValue;
CurPubData.BmsResP = BmsDataService.BmsResP.RtValue;
CurPubData.BmsResN = BmsDataService.BmsResN.RtValue;
//CurPubData.BmsPw = BmsDataService.BmsPw.RtValue;
CurPubData.BmsBatState = BmsDataService.BmsBatStateMsg;
CurPubData.MaxChargePower = BmsDataService.MaxChargePowerCell.RtValue;
CurPubData.MaxDisChargePower = BmsDataService.MaxDisChargePowerCell.RtValue;
CurPubData.PcsPower = InPowerPCSDataService.Power;
CurPubData.PcsTotalReactivePw = InPowerPCSDataService.TotalReactivePw;
CurPubData.PcsTotalApparentPw = InPowerPCSDataService.TotalApparentPw;
CurPubData.PcsInputVol = InPowerPCSDataService.InputVol;
CurPubData.PcsInputCur = InPowerPCSDataService.InputCur;
CurPubData.PcsInputPw = InPowerPCSDataService.InputPw;
CurPubData.PcsCurAlarmStateStr = InPowerPCSDataService.PcsRunState.PcsStateMsg;
CurPubData.PcsAVol = InPowerPCSDataService.AVol;
CurPubData.PcsBVol = InPowerPCSDataService.BVol;
CurPubData.PcsCVol = InPowerPCSDataService.CVol;
CurPubData.PcsACur = InPowerPCSDataService.ACur;
CurPubData.PcsBCur = InPowerPCSDataService.BCur;
CurPubData.PcsCCur = InPowerPCSDataService.CCur;
CurPubData.PCSFaultStateStr = InPowerPCSDataService.PcsRunState.PcsSysFaultStateMsg;
CurPubData.SolarAccPw = SolarEnergyService.AccPw;
CurPubData.SolarDayPw = SolarEnergyService.AccDayPw;
CurPubData.SolarEfficiency = SolarEnergyService.Efficiency;
//CurPubData.SolarInternalTemp = SolarEnergyService.te;
//CurPubData.SolarMonthPw = SolarEnergyService.Mon;
CurPubData.SolarPw = SolarEnergyService.Power;
CurPubData.SolarPwFactor = SolarEnergyService.PwFactor;
CurPubData.SolarReactivePw = SolarEnergyService.ReactivePw;
//CurPubData.SolarResV = SolarEnergyService.res;
//CurPubData.SolarState = SolarEnergyService.BmsAccCharg.RtValue;
CurPubData.LoadMeterPw = BmsDataService.BmsAccCharg.RtValue;
//CurPubData.XX = BmsDataService.BmsAccCharg.RtValue;
//CurPubData.XX = BmsDataService.BmsAccCharg.RtValue;
//CurPubData.XX = BmsDataService.BmsAccCharg.RtValue;
}
///
/// Mqtt初始化
///
private async void MqttClientInit()
{
try
{
Topic = ConfigDataService.MqttTopic;
MqttConnectionOptions options = new MqttConnectionOptions()
{
IpAddress = ConfigDataService.MqttServerUrl,//IpAddress = "www.orpaon.com",
Port = ConfigDataService.MqttServerPort,
ClientId = ConfigDataService.MqttClientId,
KeepAlivePeriod = TimeSpan.FromSeconds(int.Parse("100")),
};
options.Credentials = new MqttCredential(ConfigDataService.MqttUser, ConfigDataService.MqttPwd);
mqttClient?.ConnectClose();
mqttClient = new MqttClient(options);
mqttClient.LogNet = new HslCommunication.LogNet.LogNetSingle(string.Empty);
//mqttClient.LogNet.BeforeSaveToFile += LogNet_BeforeSaveToFileByMqttClient;
mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
mqttClient.OnNetworkError += MqttClient_OnNetworkError;
OperateResult connect = mqttClient.ConnectServer();
if (connect.IsSuccess)
{
//panel2.Enabled = true;
//button1.Enabled = false;
//button2.Enabled = true;
//panel2.Enabled = true;
MqttClientState = true;
//MessageBox.Show(StringResources.Language.ConnectServerSuccess);
//txtMsg.AppendText($"MqttServer 连接成功! {Environment.NewLine}");
//logger.Info($"时间:{DateTime.Now.ToString()} - MqttServer 连接成功");
//立刻订阅数据
OperateResult send = mqttClient.SubscribeMessage(Topic);
if (send.IsSuccess)
{
//TxtMqttMessage.AppendText("Mqtt 客户端订阅成功" + Environment.NewLine);
}
else
{
MessageBox.Show("Send Failed:" + send.Message);
}
}
else
{
MqttClientState = false;
//button1.Enabled = true;
MessageBox.Show(connect.Message);
}
}
catch (Exception ex)
{
LogService.Error(String.Format("ErrSource : {0} ErrMsg : {1}", ex.StackTrace.ToString(), ex.Message.ToString()));
}
}
///
/// Mqtt连接关闭
///
public void MqttClientClose()
{
OperateResult send = mqttClient.UnSubscribeMessage(Topic);
//logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt连接关闭");
MqttClientState = false;
mqttClient.ConnectClose();
}
///
/// Mqtt订阅主题
///
public void MqttClientSubscribe()
{
OperateResult send = mqttClient.SubscribeMessage(Topic);
if (send.IsSuccess)
{
//logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt订阅主题");
//btnMqttSubscribe.Enabled = false;
//btnMqttUnSubscribe.Enabled = true;
//TxtMqttMessage.AppendText("Mqtt 客户端订阅成功" + Environment.NewLine);
}
else
{
MessageBox.Show("Send Failed:" + send.Message);
}
}
///
/// Mqtt取消订阅主题
///
public void MqttClientUnSubscribe()
{
OperateResult send = mqttClient.UnSubscribeMessage(Topic);
if (send.IsSuccess)
{
//logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt取消订阅主题");
//btnMqttSubscribe.Enabled = true;
//btnMqttUnSubscribe.Enabled = false;
//TxtMqttMessage.AppendText("Mqtt 客户端取消订阅" + Environment.NewLine);
}
else
{
MessageBox.Show("Send Failed:" + send.Message);
}
}
private void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message)
{
//var data = message;
}
private void MqttClient_OnNetworkError(object sender, EventArgs e)
{
//logger.Info($"时间:{DateTime.Now.ToString()} - MqttClient_OnNetworkError");
// 当网络异常的时候触发,可以在此处重连服务器
if (sender is MqttClient client)
{
// 开始重连服务器,直到连接成功为止
client.LogNet?.WriteInfo("网络异常,准备10秒后重新连接。");
while (true)
{
// 每隔10秒重连
System.Threading.Thread.Sleep(10_000);
client.LogNet?.WriteInfo("准备重新连接服务器...");
OperateResult connect = client.ConnectServer();
if (connect.IsSuccess)
{
// 连接成功后,可以在下方break之前进行订阅,或是数据初始化操作
client.LogNet?.WriteInfo("连接服务器成功!");
break;
}
client.LogNet?.WriteInfo("连接失败,准备10秒后重新连接。");
}
}
}
}
}