using Core.Dtos; using HslCommunication.LogNet; using HslCommunication.MQTT; using Microsoft.EntityFrameworkCore; using NPOI.SS.Formula.Functions; using ProductionLineMonitor.Application.Services.CimService; using ProductionLineMonitor.Core.Dtos; using ProductionLineMonitor.Core.Services; using ProductionLineMonitor.Core.Services.Fault; using ProductionLineMonitor.Core.Utils; using ProductionLineMonitor.EntityFramework; using ProductionLineMonitor.EntityFramework.Repositories; using System.Reflection; using System.Text; namespace ProductionLineMonitor.WebAPI { public static class HostMqttClient { public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder, string ip, int port, string clientId, ILogNet log) { _log = log; Task.Run(() => Start(ip, port, clientId)); return builder; } private static MqttClient? _mqttClient = null; private static readonly string[] _topics = new string[] { "/Machine/EQPData", "/Machine/State", "/Machine/CimState", "/MQTT/ClientState", "/Machine/Fault", "/Machine/GetEQPData", "/Machine/GetFault" }; private static ILogNet? _log; private static void Start(string ip, int port, string clientId) { DoWork(); var options = new MqttConnectionOptions() { IpAddress = ip, Port = port, ClientId = clientId }; _mqttClient = new MqttClient(options); _mqttClient.OnClientConnected += MqttClient_OnClientConnected; _mqttClient.OnNetworkError += MqttClient_OnNetworkError; _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived; MqttClientConnect(); _log?.WriteInfo("HostMqttClient start."); } private static void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message) { if (message.Topic == "/Machine/CimState") { Task.Run(() => { AnalysisCimState(message.Payload); }); } else { _messages[_messageWriteIndex] = message; _messageWriteIndex++; if (_messageWriteIndex > _messageMaxCount) _messageWriteIndex = 0; } } private static int _messageWriteIndex = 0; private static int _messageReadIndex = 0; private static int _messageMaxCount = 10000; private static MqttApplicationMessage[] _messages = new MqttApplicationMessage[_messageMaxCount + 1]; private static void DoWork() { Task.Run(() => { while (true) { if (_messageReadIndex != _messageWriteIndex) { Work(_messages[_messageReadIndex]); _messageReadIndex++; } if (_messageReadIndex > _messageMaxCount) { _messageReadIndex = 0; } Thread.Sleep(1); } }); } private static void Work(MqttApplicationMessage message) { try { switch (message.Topic) { case "/Machine/EQPData": AnalysisEQPData(message.Payload); break; case "/Machine/State": break; case "/Machine/CimState": Task.Run(() => { AnalysisCimState(message.Payload); }); break; case "/MQTT/ClientState": AnalysisClientState(message.Payload); break; case "/Machine/Fault": Task.Run(() => { AnalysisFault(message.Payload); }); break; case "/Machine/GetEQPData": AnalysisBachupEQPData(message.Payload); break; case "/Machine/GetFault": AnalysisFault(message.Payload); break; default: break; } } catch (Exception ex) { //Console.WriteLine(DateTime.Now); //Console.WriteLine(message.Topic); //Console.WriteLine(message.Payload); //Console.WriteLine(ex); _log?.WriteError(ex.Message); } } private static void MqttClientConnect() { while (true) { if (_mqttClient != null) { var rev = _mqttClient.ConnectServer(); if (rev.IsSuccess) { _mqttClient.SubscribeMessage(_topics); _log?.WriteInfo("MqttClient connect success."); break; } } Thread.Sleep(10 * 1000); } } private static void MqttClient_OnNetworkError(object sender, EventArgs e) { MqttClientConnect(); _log?.WriteInfo("MqttClient OnNetworkError."); } private static void MqttClient_OnClientConnected(MqttClient client) { } private static void AnalysisBachupEQPData(byte[] payload) { string json = Encoding.UTF8.GetString(payload); EQPDataDto dto = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); ExternalService externalService = new ExternalService(unitOfWork); externalService.AddEQPData(dto); } private static void AnalysisFault(byte[] payload) { string json = Encoding.UTF8.GetString(payload); IEnumerable faults = json.ToObject>(); using UnitOfWork unitOfWork = GetUnitOfWork(); FaultService faultService = new FaultService(unitOfWork); faultService.AddMachineFaults(faults); } private static void AnalysisCimState(byte[] payload) { string json = Encoding.UTF8.GetString(payload); var state = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); CimService cimService = new CimService(unitOfWork); cimService.CreateOrUpdate(state); } private static void AnalysisEQPData(byte[] payload) { string json = Encoding.UTF8.GetString(payload); EQPDataDto dto = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); ExternalService externalService = new ExternalService(unitOfWork); externalService.AddEQPData(dto); } private static void AnalysisClientState(byte[] payload) { string json = Encoding.UTF8.GetString(payload); var state = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); CimService cimService = new CimService(unitOfWork); cimService.UpdateMqttClientState(state); } private static UnitOfWork GetUnitOfWork() { var optionsBuilder = new DbContextOptionsBuilder(); optionsBuilder.UseSqlite(DBOptions.ConnectionString); var options = optionsBuilder.Options; ProductionLineContext context = new ProductionLineContext(options); UnitOfWork unitOfWork = new UnitOfWork(context); return unitOfWork; } public static void Pub(string topic, string json) { if (_mqttClient == null || _mqttClient.IsConnected == false) { return; } _mqttClient.PublishMessage(new MqttApplicationMessage() { Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Payload = Encoding.UTF8.GetBytes(json) }); } } }