using Core.Dtos; using HslCommunication.MQTT; using Microsoft.AspNetCore.Builder; using Microsoft.EntityFrameworkCore; using ProductionLineMonitor.Application.Services.CimService; using ProductionLineMonitor.Core.Dtos; using ProductionLineMonitor.Core.Services; using ProductionLineMonitor.Core.Services.Fault; using ProductionLineMonitor.Core.Utils; using ProductionLineMonitor.EntityFramework; using ProductionLineMonitor.EntityFramework.Repositories; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ProductionLineMonitor.Web.HostedServices { public static class HostMqttClient { private static MqttClient _mqttClient = null; private static readonly string[] _topics = new string[] { //"/Machine/EQPData", //"/Machine/State", //"/Machine/CimState", //"/MQTT/ClientState", //"/Machine/Fault", //"/Machine/GetEQPData", //"/Machine/GetFault" }; public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder, string ip, int port, string clientId) { Task.Run(() => Start(ip, port, clientId)); return builder; } private static void Start(string ip, int port, string clientId) { var options = new MqttConnectionOptions() { IpAddress = ip, Port = port, ClientId = clientId }; _mqttClient = new MqttClient(options); _mqttClient.OnClientConnected += MqttClient_OnClientConnected; _mqttClient.OnNetworkError += MqttClient_OnNetworkError; _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived; Run(); } private static void Run() { while (true) { var rev = _mqttClient.ConnectServer(); if (rev.IsSuccess) { _mqttClient.SubscribeMessage(_topics); break; } Thread.Sleep(10 * 1000); } } private static void MqttClient_OnNetworkError(object sender, EventArgs e) { Run(); } private static void MqttClient_OnClientConnected(MqttClient client) { } private static void MqttClient_OnMqttMessageReceived(MqttClient client, string topic, byte[] payload) { Task.Run(() => { try { switch (topic) { case "/Machine/EQPData": AnalysisEQPData(payload); break; case "/Machine/State": break; case "/Machine/CimState": AnalysisCimState(payload); break; case "/MQTT/ClientState": AnalysisClientState(payload); break; case "/Machine/Fault": AnalysisFault(payload); break; case "/Machine/GetEQPData": AnalysisBachupEQPData(payload); break; case "/Machine/GetFault": AnalysisFault(payload); break; default: break; } } catch (Exception ex) { Console.WriteLine(DateTime.Now); Console.WriteLine(topic); Console.WriteLine(payload); Console.WriteLine(ex); } }); } private static void AnalysisBachupEQPData(byte[] payload) { string json = Encoding.UTF8.GetString(payload); EQPDataDto dto = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); ExternalService externalService = new ExternalService(unitOfWork); externalService.AddEQPData(dto); } private static void AnalysisFault(byte[] payload) { string json = Encoding.UTF8.GetString(payload); IEnumerable faults = json.ToObject>(); using UnitOfWork unitOfWork = GetUnitOfWork(); FaultService faultService = new FaultService(unitOfWork); faultService.AddMachineFaults(faults); } private static void AnalysisCimState(byte[] payload) { string json = Encoding.UTF8.GetString(payload); var state = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); CimService cimService = new CimService(unitOfWork); cimService.CreateOrUpdate(state); } private static void AnalysisEQPData(byte[] payload) { string json = Encoding.UTF8.GetString(payload); EQPDataDto dto = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); ExternalService externalService = new ExternalService(unitOfWork); externalService.AddEQPData(dto); } private static void AnalysisClientState(byte[] payload) { string json = Encoding.UTF8.GetString(payload); var state = json.ToObject(); using UnitOfWork unitOfWork = GetUnitOfWork(); CimService cimService = new CimService(unitOfWork); cimService.UpdateMqttClientState(state); } private static UnitOfWork GetUnitOfWork() { var optionsBuilder = new DbContextOptionsBuilder(); optionsBuilder.UseSqlite(DBOptions.ConnectionString); var options = optionsBuilder.Options; ProductionLineContext context = new ProductionLineContext(options); UnitOfWork unitOfWork = new UnitOfWork(context); return unitOfWork; } public static void Pub(string topic, string json) { if (_mqttClient == null || _mqttClient.IsConnected == false) { return; } _mqttClient.PublishMessage(new MqttApplicationMessage() { Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Payload = Encoding.UTF8.GetBytes(json) }); } } }