HostMqttClient.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. using Core.Dtos;
  2. using HslCommunication.MQTT;
  3. using Microsoft.AspNetCore.Builder;
  4. using Microsoft.EntityFrameworkCore;
  5. using ProductionLineMonitor.Application.Services.CimService;
  6. using ProductionLineMonitor.Core.Dtos;
  7. using ProductionLineMonitor.Core.Services;
  8. using ProductionLineMonitor.Core.Services.Fault;
  9. using ProductionLineMonitor.Core.Utils;
  10. using ProductionLineMonitor.EntityFramework;
  11. using ProductionLineMonitor.EntityFramework.Repositories;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Text;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. namespace ProductionLineMonitor.Web.HostedServices
  18. {
  19. public static class HostMqttClient
  20. {
  21. private static MqttClient _mqttClient = null;
  22. private static readonly string[] _topics =
  23. new string[] {
  24. //"/Machine/EQPData",
  25. //"/Machine/State",
  26. //"/Machine/CimState",
  27. //"/MQTT/ClientState",
  28. //"/Machine/Fault",
  29. //"/Machine/GetEQPData",
  30. //"/Machine/GetFault"
  31. };
  32. public static IApplicationBuilder UseHostMqtt(this IApplicationBuilder builder,
  33. string ip, int port, string clientId)
  34. {
  35. Task.Run(() => Start(ip, port, clientId));
  36. return builder;
  37. }
  38. private static void Start(string ip, int port, string clientId)
  39. {
  40. var options = new MqttConnectionOptions()
  41. {
  42. IpAddress = ip,
  43. Port = port,
  44. ClientId = clientId
  45. };
  46. _mqttClient = new MqttClient(options);
  47. _mqttClient.OnClientConnected += MqttClient_OnClientConnected;
  48. _mqttClient.OnNetworkError += MqttClient_OnNetworkError;
  49. _mqttClient.OnMqttMessageReceived += MqttClient_OnMqttMessageReceived;
  50. Run();
  51. }
  52. private static void Run()
  53. {
  54. while (true)
  55. {
  56. var rev = _mqttClient.ConnectServer();
  57. if (rev.IsSuccess)
  58. {
  59. _mqttClient.SubscribeMessage(_topics);
  60. break;
  61. }
  62. Thread.Sleep(10 * 1000);
  63. }
  64. }
  65. private static void MqttClient_OnNetworkError(object sender, EventArgs e)
  66. {
  67. Run();
  68. }
  69. private static void MqttClient_OnClientConnected(MqttClient client)
  70. {
  71. }
  72. private static void MqttClient_OnMqttMessageReceived(MqttClient client, string topic, byte[] payload)
  73. {
  74. Task.Run(() =>
  75. {
  76. try
  77. {
  78. switch (topic)
  79. {
  80. case "/Machine/EQPData":
  81. AnalysisEQPData(payload);
  82. break;
  83. case "/Machine/State":
  84. break;
  85. case "/Machine/CimState":
  86. AnalysisCimState(payload);
  87. break;
  88. case "/MQTT/ClientState":
  89. AnalysisClientState(payload);
  90. break;
  91. case "/Machine/Fault":
  92. AnalysisFault(payload);
  93. break;
  94. case "/Machine/GetEQPData":
  95. AnalysisBachupEQPData(payload);
  96. break;
  97. case "/Machine/GetFault":
  98. AnalysisFault(payload);
  99. break;
  100. default:
  101. break;
  102. }
  103. }
  104. catch (Exception ex)
  105. {
  106. Console.WriteLine(DateTime.Now);
  107. Console.WriteLine(topic);
  108. Console.WriteLine(payload);
  109. Console.WriteLine(ex);
  110. }
  111. });
  112. }
  113. private static void AnalysisBachupEQPData(byte[] payload)
  114. {
  115. string json = Encoding.UTF8.GetString(payload);
  116. EQPDataDto dto = json.ToObject<EQPDataDto>();
  117. using UnitOfWork unitOfWork = GetUnitOfWork();
  118. ExternalService externalService = new ExternalService(unitOfWork);
  119. externalService.AddEQPData(dto);
  120. }
  121. private static void AnalysisFault(byte[] payload)
  122. {
  123. string json = Encoding.UTF8.GetString(payload);
  124. IEnumerable<EQPDataFaultRecordDto> faults = json.ToObject<IEnumerable<EQPDataFaultRecordDto>>();
  125. using UnitOfWork unitOfWork = GetUnitOfWork();
  126. FaultService faultService = new FaultService(unitOfWork);
  127. faultService.AddMachineFaults(faults);
  128. }
  129. private static void AnalysisCimState(byte[] payload)
  130. {
  131. string json = Encoding.UTF8.GetString(payload);
  132. var state = json.ToObject<CimDto>();
  133. using UnitOfWork unitOfWork = GetUnitOfWork();
  134. CimService cimService = new CimService(unitOfWork);
  135. cimService.CreateOrUpdate(state);
  136. }
  137. private static void AnalysisEQPData(byte[] payload)
  138. {
  139. string json = Encoding.UTF8.GetString(payload);
  140. EQPDataDto dto = json.ToObject<EQPDataDto>();
  141. using UnitOfWork unitOfWork = GetUnitOfWork();
  142. ExternalService externalService = new ExternalService(unitOfWork);
  143. externalService.AddEQPData(dto);
  144. }
  145. private static void AnalysisClientState(byte[] payload)
  146. {
  147. string json = Encoding.UTF8.GetString(payload);
  148. var state = json.ToObject<MqttClientStateDto>();
  149. using UnitOfWork unitOfWork = GetUnitOfWork();
  150. CimService cimService = new CimService(unitOfWork);
  151. cimService.UpdateMqttClientState(state);
  152. }
  153. private static UnitOfWork GetUnitOfWork()
  154. {
  155. var optionsBuilder = new DbContextOptionsBuilder<ProductionLineContext>();
  156. optionsBuilder.UseSqlite(DBOptions.ConnectionString);
  157. var options = optionsBuilder.Options;
  158. ProductionLineContext context = new ProductionLineContext(options);
  159. UnitOfWork unitOfWork = new UnitOfWork(context);
  160. return unitOfWork;
  161. }
  162. public static void Pub(string topic, string json)
  163. {
  164. if (_mqttClient == null || _mqttClient.IsConnected == false)
  165. {
  166. return;
  167. }
  168. _mqttClient.PublishMessage(new MqttApplicationMessage()
  169. {
  170. Topic = topic,
  171. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  172. Payload = Encoding.UTF8.GetBytes(json)
  173. });
  174. }
  175. }
  176. }