123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- using Core.Dtos;
- using HslCommunication.MQTT;
- using Microsoft.AspNetCore.Builder;
- using Microsoft.EntityFrameworkCore;
- using ProductionLineMonitor.Application.Services.CimService;
- using ProductionLineMonitor.Core.Dtos;
- using ProductionLineMonitor.Core.Services;
- using ProductionLineMonitor.Core.Services.Fault;
- using ProductionLineMonitor.Core.Utils;
- using ProductionLineMonitor.EntityFramework;
- using ProductionLineMonitor.EntityFramework.Repositories;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace ProductionLineMonitor.Web.HostedServices
- {
- public static class HostMqttClient
- {
- private static MqttClient _mqttClient = null;
- private static readonly string[] _topics =
- new string[] {
- //"/Machine/EQPData",
- //"/Machine/State",
- //"/Machine/CimState",
- //"/MQTT/ClientState",
- //"/Machine/Fault",
- //"/Machine/GetEQPData",
- //"/Machine/GetFault"
- };
- public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder,
- string ip, int port, string clientId)
- {
- Task.Run(() => Start(ip, port, clientId));
- return builder;
- }
- private static void Start(string ip, int port, string clientId)
- {
- var options = new MqttConnectionOptions()
- {
- IpAddress = ip,
- Port = port,
- ClientId = clientId
- };
- _mqttClient = new MqttClient(options);
- _mqttClient.OnClientConnected += MqttClient_OnClientConnected;
- _mqttClient.OnNetworkError += MqttClient_OnNetworkError;
- _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
-
- Run();
- }
- private static void Run()
- {
- while (true)
- {
- var rev = _mqttClient.ConnectServer();
- if (rev.IsSuccess)
- {
- _mqttClient.SubscribeMessage(_topics);
- break;
- }
- Thread.Sleep(10 * 1000);
- }
- }
- private static void MqttClient_OnNetworkError(object sender, EventArgs e)
- {
- Run();
- }
- private static void MqttClient_OnClientConnected(MqttClient client)
- {
- }
- private static void MqttClient_OnMqttMessageReceived(MqttClient client, string topic, byte[] payload)
- {
- Task.Run(() =>
- {
- try
- {
- switch (topic)
- {
- case "/Machine/EQPData":
- AnalysisEQPData(payload);
- break;
- case "/Machine/State":
- break;
- case "/Machine/CimState":
- AnalysisCimState(payload);
- break;
- case "/MQTT/ClientState":
- AnalysisClientState(payload);
- break;
- case "/Machine/Fault":
- AnalysisFault(payload);
- break;
- case "/Machine/GetEQPData":
- AnalysisBachupEQPData(payload);
- break;
- case "/Machine/GetFault":
- AnalysisFault(payload);
- break;
- default:
- break;
- }
- }
- catch (Exception ex)
- {
- Console.WriteLine(DateTime.Now);
- Console.WriteLine(topic);
- Console.WriteLine(payload);
- Console.WriteLine(ex);
- }
- });
- }
- private static void AnalysisBachupEQPData(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- EQPDataDto dto = json.ToObject<EQPDataDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- ExternalService externalService = new ExternalService(unitOfWork);
- externalService.AddEQPData(dto);
- }
- private static void AnalysisFault(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- IEnumerable<EQPDataFaultRecordDto> faults = json.ToObject<IEnumerable<EQPDataFaultRecordDto>>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- FaultService faultService = new FaultService(unitOfWork);
- faultService.AddMachineFaults(faults);
- }
- private static void AnalysisCimState(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- var state = json.ToObject<CimDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- CimService cimService = new CimService(unitOfWork);
- cimService.CreateOrUpdate(state);
- }
- private static void AnalysisEQPData(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- EQPDataDto dto = json.ToObject<EQPDataDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- ExternalService externalService = new ExternalService(unitOfWork);
- externalService.AddEQPData(dto);
- }
- private static void AnalysisClientState(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- var state = json.ToObject<MqttClientStateDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- CimService cimService = new CimService(unitOfWork);
- cimService.UpdateMqttClientState(state);
- }
- private static UnitOfWork GetUnitOfWork()
- {
- var optionsBuilder = new DbContextOptionsBuilder<ProductionLineContext>();
- optionsBuilder.UseSqlite(DBOptions.ConnectionString);
- var options = optionsBuilder.Options;
- ProductionLineContext context = new ProductionLineContext(options);
- UnitOfWork unitOfWork = new UnitOfWork(context);
- return unitOfWork;
- }
- public static void Pub(string topic, string json)
- {
- if (_mqttClient == null || _mqttClient.IsConnected == false)
- {
- return;
- }
- _mqttClient.PublishMessage(new MqttApplicationMessage()
- {
- Topic = topic,
- QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
- Payload = Encoding.UTF8.GetBytes(json)
- });
- }
- }
- }
|