HostMqttClient.cs 8.6 KB


  1. using Core.Dtos;
  2. using HslCommunication.LogNet;
  3. using HslCommunication.MQTT;
  4. using Microsoft.EntityFrameworkCore;
  5. using NPOI.SS.Formula.Functions;
  6. using ProductionLineMonitor.Application.Services.CimService;
  7. using ProductionLineMonitor.Core.Dtos;
  8. using ProductionLineMonitor.Core.Services;
  9. using ProductionLineMonitor.Core.Services.Fault;
  10. using ProductionLineMonitor.Core.Utils;
  11. using ProductionLineMonitor.EntityFramework;
  12. using ProductionLineMonitor.EntityFramework.Repositories;
  13. using System.Reflection;
  14. using System.Text;
  15. namespace ProductionLineMonitor.WebAPI
  16. {
  17. public static class HostMqttClient
  18. {
  19. public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder,
  20. string ip, int port, string clientId, ILogNet log)
  21. {
  22. _log = log;
  23. Task.Run(() => Start(ip, port, clientId));
  24. return builder;
  25. }
  26. private static MqttClient? _mqttClient = null;
  27. private static readonly string[] _topics =
  28. new string[] {
  29. "/Machine/EQPData",
  30. "/Machine/State",
  31. "/Machine/CimState",
  32. "/MQTT/ClientState",
  33. "/Machine/Fault",
  34. "/Machine/GetEQPData",
  35. "/Machine/GetFault"
  36. };
  37. private static ILogNet? _log;
  38. private static void Start(string ip, int port, string clientId)
  39. {
  40. DoWork();
  41. var options = new MqttConnectionOptions()
  42. {
  43. IpAddress = ip,
  44. Port = port,
  45. ClientId = clientId
  46. };
  47. _mqttClient = new MqttClient(options);
  48. _mqttClient.OnClientConnected += MqttClient_OnClientConnected;
  49. _mqttClient.OnNetworkError += MqttClient_OnNetworkError;
  50. _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
  51. MqttClientConnect();
  52. _log?.WriteInfo("HostMqttClient start.");
  53. }
  54. private static void MqttClient_OnMqttMessageReceived(MqttClient client, MqttApplicationMessage message)
  55. {
  56. if (message.Topic == "/Machine/CimState")
  57. {
  58. Task.Run(() =>
  59. {
  60. AnalysisCimState(message.Payload);
  61. });
  62. }
  63. else
  64. {
  65. _messages[_messageWriteIndex] = message;
  66. _messageWriteIndex++;
  67. if (_messageWriteIndex > _messageMaxCount)
  68. _messageWriteIndex = 0;
  69. }
  70. }
  71. private static int _messageWriteIndex = 0;
  72. private static int _messageReadIndex = 0;
  73. private static int _messageMaxCount = 10000;
  74. private static MqttApplicationMessage[] _messages
  75. = new MqttApplicationMessage[_messageMaxCount + 1];
  76. private static void DoWork()
  77. {
  78. Task.Run(() =>
  79. {
  80. while (true)
  81. {
  82. if (_messageReadIndex != _messageWriteIndex)
  83. {
  84. Work(_messages[_messageReadIndex]);
  85. _messageReadIndex++;
  86. }
  87. if (_messageReadIndex > _messageMaxCount)
  88. {
  89. _messageReadIndex = 0;
  90. }
  91. Thread.Sleep(1);
  92. }
  93. });
  94. }
  95. private static void Work(MqttApplicationMessage message)
  96. {
  97. try
  98. {
  99. switch (message.Topic)
  100. {
  101. case "/Machine/EQPData":
  102. AnalysisEQPData(message.Payload);
  103. break;
  104. case "/Machine/State":
  105. break;
  106. case "/Machine/CimState":
  107. Task.Run(() =>
  108. {
  109. AnalysisCimState(message.Payload);
  110. });
  111. break;
  112. case "/MQTT/ClientState":
  113. AnalysisClientState(message.Payload);
  114. break;
  115. case "/Machine/Fault":
  116. Task.Run(() =>
  117. {
  118. AnalysisFault(message.Payload);
  119. });
  120. break;
  121. case "/Machine/GetEQPData":
  122. AnalysisBachupEQPData(message.Payload);
  123. break;
  124. case "/Machine/GetFault":
  125. AnalysisFault(message.Payload);
  126. break;
  127. default:
  128. break;
  129. }
  130. }
  131. catch (Exception ex)
  132. {
  133. //Console.WriteLine(DateTime.Now);
  134. //Console.WriteLine(message.Topic);
  135. //Console.WriteLine(message.Payload);
  136. //Console.WriteLine(ex);
  137. _log?.WriteError(ex.Message);
  138. }
  139. }
  140. private static void MqttClientConnect()
  141. {
  142. while (true)
  143. {
  144. if (_mqttClient != null)
  145. {
  146. var rev = _mqttClient.ConnectServer();
  147. if (rev.IsSuccess)
  148. {
  149. _mqttClient.SubscribeMessage(_topics);
  150. _log?.WriteInfo("MqttClient connect success.");
  151. break;
  152. }
  153. }
  154. Thread.Sleep(10 * 1000);
  155. }
  156. }
  157. private static void MqttClient_OnNetworkError(object sender, EventArgs e)
  158. {
  159. MqttClientConnect();
  160. _log?.WriteInfo("MqttClient OnNetworkError.");
  161. }
  162. private static void MqttClient_OnClientConnected(MqttClient client)
  163. {
  164. }
  165. private static void AnalysisBachupEQPData(byte[] payload)
  166. {
  167. string json = Encoding.UTF8.GetString(payload);
  168. EQPDataDto dto = json.ToObject<EQPDataDto>();
  169. using UnitOfWork unitOfWork = GetUnitOfWork();
  170. ExternalService externalService = new ExternalService(unitOfWork);
  171. externalService.AddEQPData(dto);
  172. }
  173. private static void AnalysisFault(byte[] payload)
  174. {
  175. string json = Encoding.UTF8.GetString(payload);
  176. IEnumerable<EQPDataFaultRecordDto> faults = json.ToObject<IEnumerable<EQPDataFaultRecordDto>>();
  177. using UnitOfWork unitOfWork = GetUnitOfWork();
  178. FaultService faultService = new FaultService(unitOfWork);
  179. faultService.AddMachineFaults(faults);
  180. }
  181. private static void AnalysisCimState(byte[] payload)
  182. {
  183. string json = Encoding.UTF8.GetString(payload);
  184. var state = json.ToObject<CimDto>();
  185. using UnitOfWork unitOfWork = GetUnitOfWork();
  186. CimService cimService = new CimService(unitOfWork);
  187. cimService.CreateOrUpdate(state);
  188. }
  189. private static void AnalysisEQPData(byte[] payload)
  190. {
  191. string json = Encoding.UTF8.GetString(payload);
  192. EQPDataDto dto = json.ToObject<EQPDataDto>();
  193. using UnitOfWork unitOfWork = GetUnitOfWork();
  194. ExternalService externalService = new ExternalService(unitOfWork);
  195. externalService.AddEQPData(dto);
  196. }
  197. private static void AnalysisClientState(byte[] payload)
  198. {
  199. string json = Encoding.UTF8.GetString(payload);
  200. var state = json.ToObject<MqttClientStateDto>();
  201. using UnitOfWork unitOfWork = GetUnitOfWork();
  202. CimService cimService = new CimService(unitOfWork);
  203. cimService.UpdateMqttClientState(state);
  204. }
  205. private static UnitOfWork GetUnitOfWork()
  206. {
  207. var optionsBuilder = new DbContextOptionsBuilder<ProductionLineContext>();
  208. optionsBuilder.UseSqlite(DBOptions.ConnectionString);
  209. var options = optionsBuilder.Options;
  210. ProductionLineContext context = new ProductionLineContext(options);
  211. UnitOfWork unitOfWork = new UnitOfWork(context);
  212. return unitOfWork;
  213. }
  214. public static void Pub(string topic, string json)
  215. {
  216. if (_mqttClient == null || _mqttClient.IsConnected == false)
  217. {
  218. return;
  219. }
  220. _mqttClient.PublishMessage(new MqttApplicationMessage()
  221. {
  222. Topic = topic,
  223. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  224. Payload = Encoding.UTF8.GetBytes(json)
  225. });
  226. }
  227. }
  228. }