using MasstransferCommon.Utils; using MasstransferInfrastructure.Mqtt.Model; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using Serilog; namespace MasstransferInfrastructure.Mqtt.Client; public class MessageQueueHelper { private static readonly Dictionary>> Subscribers = new(); // ReSharper disable once StaticMemberInGenericType private static readonly MqttClient Client = new(); /// /// 初始化连接 /// /// public static async Task InitConnect(MqttConnectOptions options) { try { if (!await Client.ConnectAsync(options)) return false; // 连接成功后监听消息 Client.MessageReceived += HandleMessageReceived; return true; } catch (Exception e) { Log.Error(e, "连接MQTT服务器失败"); return false; } } /// /// 订阅某个主题 /// /// /// /// public static async Task Subscribe(string topic, Action @delegate, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) { if (!Subscribers.ContainsKey(topic)) { Subscribers.Add(topic, []); } Subscribers[topic].Add(@delegate); return await Client.Subscribe(topic, qos); } /// /// 发送消息 /// /// /// /// public static async Task Publish(string topic, object message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) { return await Client.Publish(topic, message, qos); } /// /// 处理接收到的消息 /// /// /// /// private static void HandleMessageReceived(object? sender, MqttApplicationMessageReceivedEventArgs e) { var applicationMessage = e.ApplicationMessage; var topic = applicationMessage.Topic; var message = applicationMessage.ConvertPayloadToString(); if (!Subscribers.TryGetValue(topic, out var subscribers)) return; foreach (var subscriber in subscribers) { try { // 通知订阅者 subscriber(topic, JsonUtil.FromJson(message)); } catch (Exception exception) { Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic); } } } }