MasstransferExporter/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs

130 lines
3.8 KiB
C#

using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Model;
using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using Serilog;
using MqttClient = MasstransferInfrastructure.Mqtt.Client.MqttClient;
namespace MasstransferCommunicate.Mqtt.Client;
public class MessageQueueHelper
{
private static readonly SqliteHelper Helper = SqliteHelper.GetInstance();
private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
private static readonly MqttClient Client = new();
/// <summary>
/// 初始化连接
/// </summary>
/// <param name="options"></param>
public static async Task<bool> InitConnect(MqttConnectOptions options)
{
try
{
if (!await Client.ConnectAsync(options)) return false;
// 连接成功后监听消息
Client.MessageReceived += HandleMessageReceived;
return true;
}
catch (Exception e)
{
Log.Error(e, "连接MQTT服务器失败");
return false;
}
}
/// <summary>
/// 订阅某个主题
/// </summary>
/// <param name="topic"></param>
/// <param name="delegate"></param>
/// <param name="qos"></param>
public static async Task<bool> Subscribe(string topic, Delegate @delegate,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
if (!Subscribers.ContainsKey(topic))
{
Subscribers.Add(topic, []);
}
Subscribers[topic].Add(@delegate);
return await Client.Subscribe(topic, qos);
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <param name="qos"></param>
public static async Task<bool> Publish(string topic, object message,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
try
{
var isSuccess = await Client.Publish(topic, message, qos);
if (!isSuccess)
{
throw new Exception("发送消息失败");
}
return true;
}
catch (Exception)
{
MessageFailureRecord record = new()
{
Payload = JsonUtil.ToJson(message),
Topic = topic
};
// 将当前消息记录到数据库
Helper.Insert(record);
return false;
}
}
/// <summary>
/// 处理接收到的消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
/// <returns></returns>
private static void HandleMessageReceived(object? sender, MqttApplicationMessageReceivedEventArgs e)
{
var applicationMessage = e.ApplicationMessage;
var topic = applicationMessage.Topic;
var message = applicationMessage.ConvertPayloadToString();
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers)
{
try
{
var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue;
var payload = JsonUtil.FromJson<Payload<object>>(message);
if (payload == null) continue;
// 通知订阅者
subscriber.DynamicInvoke(topic, JsonUtil.ToJson(payload.Data));
}
catch (Exception exception)
{
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
}
}
}
}