HostMqttClient.cs 8.6 KB


  1. using Core.Dtos;
  2. using HslCommunication.LogNet;
  3. using HslCommunication.MQTT;
  4. using Microsoft.EntityFrameworkCore;
  5. using NPOI.SS.Formula.Functions;
  6. using OfficeOpenXml.FormulaParsing.Excel.Functions.Text;
  7. using ProductionLineMonitor.Application.Services.CimService;
  8. using ProductionLineMonitor.Core.Dtos;
  9. using ProductionLineMonitor.Core.Services;
  10. using ProductionLineMonitor.Core.Services.Fault;
  11. using ProductionLineMonitor.Core.Utils;
  12. using ProductionLineMonitor.EntityFramework;
  13. using ProductionLineMonitor.EntityFramework.Repositories;
  14. using System.Reflection;
  15. using System.Text;
  16. namespace ProductionLineMonitor.WebAPI
  17. {
  18. public static class HostMqttClient
  19. {
  20. public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder,
  21. string ip, int port, string clientId, ILogNet log)
  22. {
  23. _log = log;
  24. Task.Run(() => Start(ip, port, clientId));
  25. return builder;
  26. }
  27. private static MqttClient? _mqttClient = null;
  28. private static readonly string[] _topics =
  29. new string[] {
  30. "/Machine/EQPData",
  31. "/Machine/State",
  32. "/Machine/CimState",
  33. "/MQTT/ClientState",
  34. "/Machine/Fault",
  35. "/Machine/GetEQPData",
  36. "/Machine/GetFault"
  37. };
  38. private static ILogNet? _log;
  39. private static void Start(string ip, int port, string clientId)
  40. {
  41. DoWork();
  42. var options = new MqttConnectionOptions()
  43. {
  44. IpAddress = ip,
  45. Port = port,
  46. ClientId = clientId
  47. };
  48. _mqttClient = new MqttClient(options);
  49. _mqttClient.OnClientConnected += MqttClient_OnClientConnected;
  50. _mqttClient.OnNetworkError += MqttClient_OnNetworkError;
  51. _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
  52. MqttClientConnect();
  53. _log?.WriteInfo("HostMqttClient start.");
  54. }
  55. private static void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message)
  56. {
  57. if (message.Topic == "/Machine/CimState")
  58. {
  59. Task.Run(() =>
  60. {
  61. AnalysisCimState(message.Payload);
  62. });
  63. }
  64. else
  65. {
  66. _messages[_messageWriteIndex] = message;
  67. _messageWriteIndex++;
  68. if (_messageWriteIndex > _messageMaxCount)
  69. _messageWriteIndex = 0;
  70. }
  71. }
  72. private static int _messageWriteIndex = 0;
  73. private static int _messageReadIndex = 0;
  74. private static int _messageMaxCount = 10000;
  75. private static MqttApplicationMessage[] _messages
  76. = new MqttApplicationMessage[_messageMaxCount + 1];
  77. private static void DoWork()
  78. {
  79. Task.Run(() =>
  80. {
  81. while (true)
  82. {
  83. if (_messageReadIndex != _messageWriteIndex)
  84. {
  85. Work(_messages[_messageReadIndex]);
  86. _messageReadIndex++;
  87. }
  88. if (_messageReadIndex > _messageMaxCount)
  89. {
  90. _messageReadIndex = 0;
  91. }
  92. Thread.Sleep(1);
  93. }
  94. });
  95. }
  96. private static void Work(MqttApplicationMessage message)
  97. {
  98. try
  99. {
  100. switch (message.Topic)
  101. {
  102. case "/Machine/EQPData":
  103. AnalysisEQPData(message.Payload);
  104. break;
  105. case "/Machine/State":
  106. break;
  107. case "/Machine/CimState":
  108. Task.Run(() =>
  109. {
  110. AnalysisCimState(message.Payload);
  111. });
  112. break;
  113. case "/MQTT/ClientState":
  114. AnalysisClientState(message.Payload);
  115. break;
  116. case "/Machine/Fault":
  117. Task.Run(() =>
  118. {
  119. AnalysisFault(message.Payload);
  120. });
  121. break;
  122. case "/Machine/GetEQPData":
  123. AnalysisBachupEQPData(message.Payload);
  124. break;
  125. case "/Machine/GetFault":
  126. AnalysisFault(message.Payload);
  127. break;
  128. default:
  129. break;
  130. }
  131. }
  132. catch (Exception ex)
  133. {
  134. //Console.WriteLine(DateTime.Now);
  135. //Console.WriteLine(message.Topic);
  136. //Console.WriteLine(message.Payload);
  137. //Console.WriteLine(ex);
  138. _log?.WriteError(ex.Message);
  139. }
  140. }
  141. private static void MqttClientConnect()
  142. {
  143. while (true)
  144. {
  145. if (_mqttClient != null)
  146. {
  147. var rev = _mqttClient.ConnectServer();
  148. if (rev.IsSuccess)
  149. {
  150. _mqttClient.SubscribeMessage(_topics);
  151. _log?.WriteInfo("MqttClient connect success.");
  152. break;
  153. }
  154. }
  155. Thread.Sleep(10 * 1000);
  156. }
  157. }
  158. private static void MqttClient_OnNetworkError(object sender, EventArgs e)
  159. {
  160. MqttClientConnect();
  161. _log?.WriteInfo("MqttClient OnNetworkError.");
  162. }
  163. private static void MqttClient_OnClientConnected(MqttClient client)
  164. {
  165. }
  166. private static void AnalysisBachupEQPData(byte[] payload)
  167. {
  168. string json = Encoding.UTF8.GetString(payload);
  169. EQPDataDto dto = json.ToObject<EQPDataDto>();
  170. using UnitOfWork unitOfWork = GetUnitOfWork();
  171. ExternalService externalService = new ExternalService(unitOfWork);
  172. externalService.AddEQPData(dto);
  173. }
  174. private static void AnalysisFault(byte[] payload)
  175. {
  176. string json = Encoding.UTF8.GetString(payload);
  177. IEnumerable<EQPDataFaultRecordDto> faults = json.ToObject<IEnumerable<EQPDataFaultRecordDto>>();
  178. using UnitOfWork unitOfWork = GetUnitOfWork();
  179. FaultService faultService = new FaultService(unitOfWork);
  180. faultService.AddMachineFaults(faults);
  181. }
  182. private static void AnalysisCimState(byte[] payload)
  183. {
  184. string json = Encoding.UTF8.GetString(payload);
  185. var state = json.ToObject<CimDto>();
  186. using UnitOfWork unitOfWork = GetUnitOfWork();
  187. CimService cimService = new CimService(unitOfWork);
  188. cimService.CreateOrUpdate(state);
  189. }
  190. private static void AnalysisEQPData(byte[] payload)
  191. {
  192. string json = Encoding.UTF8.GetString(payload);
  193. EQPDataDto dto = json.ToObject<EQPDataDto>();
  194. using UnitOfWork unitOfWork = GetUnitOfWork();
  195. ExternalService externalService = new ExternalService(unitOfWork);
  196. externalService.AddEQPData(dto);
  197. }
  198. private static void AnalysisClientState(byte[] payload)
  199. {
  200. string json = Encoding.UTF8.GetString(payload);
  201. var state = json.ToObject<MqttClientStateDto>();
  202. using UnitOfWork unitOfWork = GetUnitOfWork();
  203. CimService cimService = new CimService(unitOfWork);
  204. cimService.UpdateMqttClientState(state);
  205. }
  206. private static UnitOfWork GetUnitOfWork()
  207. {
  208. var optionsBuilder = new DbContextOptionsBuilder<ProductionLineContext>();
  209. optionsBuilder.UseSqlite(DBOptions.ConnectionString);
  210. var options = optionsBuilder.Options;
  211. ProductionLineContext context = new ProductionLineContext(options);
  212. UnitOfWork unitOfWork = new UnitOfWork(context);
  213. return unitOfWork;
  214. }
  215. public static void Pub(string topic, string json)
  216. {
  217. if (_mqttClient == null || _mqttClient.IsConnected == false)
  218. {
  219. return;
  220. }
  221. _mqttClient.PublishMessage(new MqttApplicationMessage()
  222. {
  223. Topic = topic,
  224. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  225. Payload = Encoding.UTF8.GetBytes(json)
  226. });
  227. }
  228. }
  229. }