123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- using Core.Dtos;
- using HslCommunication.LogNet;
- using HslCommunication.MQTT;
- using Microsoft.EntityFrameworkCore;
- using NPOI.SS.Formula.Functions;
- using ProductionLineMonitor.Application.Services.CimService;
- using ProductionLineMonitor.Core.Dtos;
- using ProductionLineMonitor.Core.Services;
- using ProductionLineMonitor.Core.Services.Fault;
- using ProductionLineMonitor.Core.Utils;
- using ProductionLineMonitor.EntityFramework;
- using ProductionLineMonitor.EntityFramework.Repositories;
- using System.Reflection;
- using System.Text;
- namespace ProductionLineMonitor.WebAPI
- {
- public static class HostMqttClient
- {
- public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder,
- string ip, int port, string clientId, ILogNet log)
- {
- _log = log;
- Task.Run(() => Start(ip, port, clientId));
- return builder;
- }
- private static MqttClient? _mqttClient = null;
- private static readonly string[] _topics =
- new string[] {
- "/Machine/EQPData",
- "/Machine/State",
- "/Machine/CimState",
- "/MQTT/ClientState",
- "/Machine/Fault",
- "/Machine/GetEQPData",
- "/Machine/GetFault"
- };
- private static ILogNet? _log;
-
- private static void Start(string ip, int port, string clientId)
- {
- DoWork();
- var options = new MqttConnectionOptions()
- {
- IpAddress = ip,
- Port = port,
- ClientId = clientId
- };
- _mqttClient = new MqttClient(options);
- _mqttClient.OnClientConnected += MqttClient_OnClientConnected;
- _mqttClient.OnNetworkError += MqttClient_OnNetworkError;
- _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
- MqttClientConnect();
- _log?.WriteInfo("HostMqttClient start.");
- }
- private static void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message)
- {
- if (message.Topic == "/Machine/CimState")
- {
- Task.Run(() =>
- {
- AnalysisCimState(message.Payload);
- });
- }
- else
- {
- _messages[_messageWriteIndex] = message;
- _messageWriteIndex++;
- if (_messageWriteIndex > _messageMaxCount)
- _messageWriteIndex = 0;
- }
- }
- private static int _messageWriteIndex = 0;
- private static int _messageReadIndex = 0;
- private static int _messageMaxCount = 10000;
- private static MqttApplicationMessage[] _messages
- = new MqttApplicationMessage[_messageMaxCount + 1];
- private static void DoWork()
- {
- Task.Run(() =>
- {
- while (true)
- {
- if (_messageReadIndex != _messageWriteIndex)
- {
- Work(_messages[_messageReadIndex]);
- _messageReadIndex++;
- }
- if (_messageReadIndex > _messageMaxCount)
- {
- _messageReadIndex = 0;
- }
- Thread.Sleep(1);
- }
- });
- }
- private static void Work(MqttApplicationMessage message)
- {
- try
- {
- switch (message.Topic)
- {
- case "/Machine/EQPData":
- AnalysisEQPData(message.Payload);
- break;
- case "/Machine/State":
- break;
- case "/Machine/CimState":
- Task.Run(() =>
- {
- AnalysisCimState(message.Payload);
- });
- break;
- case "/MQTT/ClientState":
- AnalysisClientState(message.Payload);
- break;
- case "/Machine/Fault":
- Task.Run(() =>
- {
- AnalysisFault(message.Payload);
- });
- break;
- case "/Machine/GetEQPData":
- AnalysisBachupEQPData(message.Payload);
- break;
- case "/Machine/GetFault":
- AnalysisFault(message.Payload);
- break;
- default:
- break;
- }
- }
- catch (Exception ex)
- {
- //Console.WriteLine(DateTime.Now);
- //Console.WriteLine(message.Topic);
- //Console.WriteLine(message.Payload);
- //Console.WriteLine(ex);
- _log?.WriteError(ex.Message);
- }
- }
- private static void MqttClientConnect()
- {
- while (true)
- {
- if (_mqttClient != null)
- {
- var rev = _mqttClient.ConnectServer();
- if (rev.IsSuccess)
- {
- _mqttClient.SubscribeMessage(_topics);
- _log?.WriteInfo("MqttClient connect success.");
- break;
- }
- }
- Thread.Sleep(10 * 1000);
- }
- }
- private static void MqttClient_OnNetworkError(object sender, EventArgs e)
- {
- MqttClientConnect();
- _log?.WriteInfo("MqttClient OnNetworkError.");
- }
- private static void MqttClient_OnClientConnected(MqttClient client)
- {
- }
- private static void AnalysisBachupEQPData(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- EQPDataDto dto = json.ToObject<EQPDataDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- ExternalService externalService = new ExternalService(unitOfWork);
- externalService.AddEQPData(dto);
- }
- private static void AnalysisFault(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- IEnumerable<EQPDataFaultRecordDto> faults = json.ToObject<IEnumerable<EQPDataFaultRecordDto>>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- FaultService faultService = new FaultService(unitOfWork);
- faultService.AddMachineFaults(faults);
- }
- private static void AnalysisCimState(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- var state = json.ToObject<CimDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- CimService cimService = new CimService(unitOfWork);
- cimService.CreateOrUpdate(state);
- }
- private static void AnalysisEQPData(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- EQPDataDto dto = json.ToObject<EQPDataDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- ExternalService externalService = new ExternalService(unitOfWork);
- externalService.AddEQPData(dto);
- }
- private static void AnalysisClientState(byte[] payload)
- {
- string json = Encoding.UTF8.GetString(payload);
- var state = json.ToObject<MqttClientStateDto>();
- using UnitOfWork unitOfWork = GetUnitOfWork();
- CimService cimService = new CimService(unitOfWork);
- cimService.UpdateMqttClientState(state);
- }
- private static UnitOfWork GetUnitOfWork()
- {
- var optionsBuilder = new DbContextOptionsBuilder<ProductionLineContext>();
- optionsBuilder.UseSqlite(DBOptions.ConnectionString);
- var options = optionsBuilder.Options;
- ProductionLineContext context = new ProductionLineContext(options);
- UnitOfWork unitOfWork = new UnitOfWork(context);
- return unitOfWork;
- }
- public static void Pub(string topic, string json)
- {
- if (_mqttClient == null || _mqttClient.IsConnected == false)
- {
- return;
- }
- _mqttClient.PublishMessage(new MqttApplicationMessage()
- {
- Topic = topic,
- QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
- Payload = Encoding.UTF8.GetBytes(json)
- });
- }
- }
- }
|