Files
YuPu-OrpaonEMS/OrpaonEMS.App/Services/MqttDataClientService.cs
Tyrone CT 325b24c99f 更改了周五和周六晚上不充电
关闭后进程不关闭的操作
Mqtt的发布,关闭这个功能
2025-03-01 00:19:51 +08:00

362 lines
13 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using HslCommunication;
using HslCommunication.MQTT;
using NLog;
using OrpaonEMS.App.Models;
using OrpaonEMS.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Text.Json;
using Prism.Mvvm;
namespace OrpaonEMS.App.Services
{
public class MqttDataClientService : BindableBase
{
/// <summary>
/// MqttClientDataServer
/// </summary>
public MqttDataClientService(IFreeSql freeSql, ILogService logService, ConfigDataService configDataService, BmsDataService bmsDataService,
SolarEnergyService solarEnergyService,
InPowerPCSDataService inPowerPCSDataService)
{
FreeSql = freeSql;
LogService = logService;
ConfigDataService = configDataService;
BmsDataService = bmsDataService;
SolarEnergyService = solarEnergyService;
InPowerPCSDataService = inPowerPCSDataService;
if (ConfigDataService.IsMaster)
{
//MqttClientInit();
//CycleSendStart();
}
}
/// <summary>
/// 订阅
/// </summary>
private string Topic { get; set; } = string.Empty;
/// <summary>
/// Mqtt客户端
/// </summary>
private MqttClient mqttClient { get; set; }
private bool _MqttClientState = false;
/// <summary>
/// Mqtt连接状态
/// </summary>
private bool MqttClientState
{
get { return _MqttClientState; }
set
{
if (_MqttClientState != value)
{
if (value)
{
MqttClientStateMsg = "正常";
}
else
{
MqttClientStateMsg = "失败";
}
_MqttClientState = value;
}
_MqttClientState = value;
}
}
private string _MqttClientStateMsg;
/// <summary>
/// Mqttt连接状态消息
/// </summary>
public string MqttClientStateMsg
{
get { return _MqttClientStateMsg; }
set { _MqttClientStateMsg = value; RaisePropertyChanged(); }
}
private Random Random = new Random();
/// <summary>
/// 指令
/// </summary>
private string[] Quality = new string[] { "Good", "Bad" };
/// <summary>
/// Mqtt线程使能
/// </summary>
private bool MqttThrendEnable { get; set; } = true;
/// <summary>
/// ScanDevice扫描Task
/// </summary>
static Task CycleSendTask { get; set; }
public IFreeSql FreeSql { get; }
public ILogService LogService { get; }
public ConfigDataService ConfigDataService { get; }
public BmsDataService BmsDataService { get; }
public SolarEnergyService SolarEnergyService { get; }
public InPowerPCSDataService InPowerPCSDataService { get; }
/// <summary>
/// 当前发布的数据
/// </summary>
public PubData CurPubData { get; set; } = new PubData();
/// <summary>
///扫描
/// </summary>
private void CycleSendStart()
{
Thread.Sleep(1000);
CycleSendTask = Task.Run(async () =>
{
//延后5秒
await Task.Delay(5000);
while (MqttThrendEnable)
{
await Task.Delay(5000);
GetPubData();
//发布数据
OperateResult send = mqttClient.PublishMessage(new MqttApplicationMessage()
{
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce,
Topic = Topic,
Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize<PubData>(CurPubData))
});
if (!send.IsSuccess)
{
var DATA = 1;
//_Logger.Error($"发送失败:{send.Message}");
}
}
});
}
/// <summary>
/// 获取PubData数据
/// </summary>
/// <returns></returns>
private void GetPubData()
{
CurPubData.BmsAccCharg = BmsDataService.BmsAccCharg.RtValue;
CurPubData.BmsAccDisCharg = BmsDataService.BmsAccDisCharg.RtValue;
CurPubData.BmsCur = BmsDataService.BmsCur.RtValue;
CurPubData.BmsVol = BmsDataService.BmsVol.RtValue;
CurPubData.BmsSOC = BmsDataService.BmsSOC.RtValue;
CurPubData.BmsSOH = BmsDataService.BmsSOH.RtValue;
CurPubData.BmsSOE = BmsDataService.BmsSOE.RtValue;
CurPubData.BmsResP = BmsDataService.BmsResP.RtValue;
CurPubData.BmsResN = BmsDataService.BmsResN.RtValue;
//CurPubData.BmsPw = BmsDataService.BmsPw.RtValue;
CurPubData.BmsBatState = BmsDataService.BmsBatStateMsg;
CurPubData.MaxChargePower = BmsDataService.MaxChargePowerCell.RtValue;
CurPubData.MaxDisChargePower = BmsDataService.MaxDisChargePowerCell.RtValue;
CurPubData.PcsPower = InPowerPCSDataService.Power;
CurPubData.PcsTotalReactivePw = InPowerPCSDataService.TotalReactivePw;
CurPubData.PcsTotalApparentPw = InPowerPCSDataService.TotalApparentPw;
CurPubData.PcsInputVol = InPowerPCSDataService.InputVol;
CurPubData.PcsInputCur = InPowerPCSDataService.InputCur;
CurPubData.PcsInputPw = InPowerPCSDataService.InputPw;
CurPubData.PcsCurAlarmStateStr = InPowerPCSDataService.PcsRunState.PcsStateMsg;
CurPubData.PcsAVol = InPowerPCSDataService.AVol;
CurPubData.PcsBVol = InPowerPCSDataService.BVol;
CurPubData.PcsCVol = InPowerPCSDataService.CVol;
CurPubData.PcsACur = InPowerPCSDataService.ACur;
CurPubData.PcsBCur = InPowerPCSDataService.BCur;
CurPubData.PcsCCur = InPowerPCSDataService.CCur;
CurPubData.PCSFaultStateStr = InPowerPCSDataService.PcsRunState.PcsSysFaultStateMsg;
CurPubData.SolarAccPw = SolarEnergyService.AccPw;
CurPubData.SolarDayPw = SolarEnergyService.AccDayPw;
CurPubData.SolarEfficiency = SolarEnergyService.Efficiency;
//CurPubData.SolarInternalTemp = SolarEnergyService.te;
//CurPubData.SolarMonthPw = SolarEnergyService.Mon;
CurPubData.SolarPw = SolarEnergyService.Power;
CurPubData.SolarPwFactor = SolarEnergyService.PwFactor;
CurPubData.SolarReactivePw = SolarEnergyService.ReactivePw;
//CurPubData.SolarResV = SolarEnergyService.res;
//CurPubData.SolarState = SolarEnergyService.BmsAccCharg.RtValue;
CurPubData.LoadMeterPw = BmsDataService.BmsAccCharg.RtValue;
//CurPubData.XX = BmsDataService.BmsAccCharg.RtValue;
//CurPubData.XX = BmsDataService.BmsAccCharg.RtValue;
//CurPubData.XX = BmsDataService.BmsAccCharg.RtValue;
}
/// <summary>
/// Mqtt初始化
/// </summary>
private async void MqttClientInit()
{
try
{
Topic = ConfigDataService.MqttTopic;
MqttConnectionOptions options = new MqttConnectionOptions()
{
IpAddress = ConfigDataService.MqttServerUrl,//IpAddress = "www.orpaon.com",
Port = ConfigDataService.MqttServerPort,
ClientId = ConfigDataService.MqttClientId,
KeepAlivePeriod = TimeSpan.FromSeconds(int.Parse("100")),
};
options.Credentials = new MqttCredential(ConfigDataService.MqttUser, ConfigDataService.MqttPwd);
mqttClient?.ConnectClose();
mqttClient = new MqttClient(options);
mqttClient.LogNet = new HslCommunication.LogNet.LogNetSingle(string.Empty);
//mqttClient.LogNet.BeforeSaveToFile += LogNet_BeforeSaveToFileByMqttClient;
mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
mqttClient.OnNetworkError += MqttClient_OnNetworkError;
OperateResult connect = mqttClient.ConnectServer();
if (connect.IsSuccess)
{
//panel2.Enabled = true;
//button1.Enabled = false;
//button2.Enabled = true;
//panel2.Enabled = true;
MqttClientState = true;
//MessageBox.Show(StringResources.Language.ConnectServerSuccess);
//txtMsg.AppendText($"MqttServer 连接成功! {Environment.NewLine}");
//logger.Info($"时间:{DateTime.Now.ToString()} - MqttServer 连接成功");
//立刻订阅数据
OperateResult send = mqttClient.SubscribeMessage(Topic);
if (send.IsSuccess)
{
//TxtMqttMessage.AppendText("Mqtt 客户端订阅成功" + Environment.NewLine);
}
else
{
MessageBox.Show("Send Failed:" + send.Message);
}
}
else
{
MqttClientState = false;
//button1.Enabled = true;
MessageBox.Show(connect.Message);
}
}
catch (Exception ex)
{
LogService.Error(String.Format("ErrSource : {0} ErrMsg : {1}", ex.StackTrace.ToString(), ex.Message.ToString()));
}
}
/// <summary>
/// Mqtt连接关闭
/// </summary>
public void MqttClientClose()
{
OperateResult send = mqttClient.UnSubscribeMessage(Topic);
//logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt连接关闭");
MqttClientState = false;
mqttClient.ConnectClose();
}
/// <summary>
/// Mqtt订阅主题
/// </summary>
public void MqttClientSubscribe()
{
OperateResult send = mqttClient.SubscribeMessage(Topic);
if (send.IsSuccess)
{
//logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt订阅主题");
//btnMqttSubscribe.Enabled = false;
//btnMqttUnSubscribe.Enabled = true;
//TxtMqttMessage.AppendText("Mqtt 客户端订阅成功" + Environment.NewLine);
}
else
{
MessageBox.Show("Send Failed:" + send.Message);
}
}
/// <summary>
/// Mqtt取消订阅主题
/// </summary>
public void MqttClientUnSubscribe()
{
OperateResult send = mqttClient.UnSubscribeMessage(Topic);
if (send.IsSuccess)
{
//logger.Info($"时间:{DateTime.Now.ToString()} - Mqtt取消订阅主题");
//btnMqttSubscribe.Enabled = true;
//btnMqttUnSubscribe.Enabled = false;
//TxtMqttMessage.AppendText("Mqtt 客户端取消订阅" + Environment.NewLine);
}
else
{
MessageBox.Show("Send Failed:" + send.Message);
}
}
private void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message)
{
//var data = message;
}
private void MqttClient_OnNetworkError(object sender, EventArgs e)
{
//logger.Info($"时间:{DateTime.Now.ToString()} - MqttClient_OnNetworkError");
// 当网络异常的时候触发,可以在此处重连服务器
if (sender is MqttClient client)
{
// 开始重连服务器,直到连接成功为止
client.LogNet?.WriteInfo("网络异常准备10秒后重新连接。");
while (true)
{
// 每隔10秒重连
System.Threading.Thread.Sleep(10_000);
client.LogNet?.WriteInfo("准备重新连接服务器...");
OperateResult connect = client.ConnectServer();
if (connect.IsSuccess)
{
// 连接成功后可以在下方break之前进行订阅或是数据初始化操作
client.LogNet?.WriteInfo("连接服务器成功!");
break;
}
client.LogNet?.WriteInfo("连接失败准备10秒后重新连接。");
}
}
}
}
}