2024-09-06 07:08:32 +00:00
|
|
|
|
using System.Reflection;
|
|
|
|
|
using MasstransferCommon.Model.Entity;
|
2024-06-19 11:28:48 +00:00
|
|
|
|
using MasstransferCommon.Utils;
|
2024-09-05 11:41:09 +00:00
|
|
|
|
using MasstransferCommunicate.Mqtt.Model;
|
2024-07-08 07:54:00 +00:00
|
|
|
|
using MasstransferInfrastructure.Database.Sqlite;
|
2024-06-03 08:35:49 +00:00
|
|
|
|
using MasstransferInfrastructure.Mqtt.Model;
|
|
|
|
|
using MQTTnet;
|
|
|
|
|
using MQTTnet.Client;
|
|
|
|
|
using MQTTnet.Protocol;
|
|
|
|
|
using Serilog;
|
2024-09-06 08:42:18 +00:00
|
|
|
|
using MqttClient = MasstransferCommunicate.Mqtt.Client.MqttClient;
|
2024-06-03 08:35:49 +00:00
|
|
|
|
|
2024-07-09 03:41:08 +00:00
|
|
|
|
namespace MasstransferCommunicate.Mqtt.Client;
|
2024-06-03 08:35:49 +00:00
|
|
|
|
|
2024-06-19 11:28:48 +00:00
|
|
|
|
public class MessageQueueHelper
|
2024-06-03 08:35:49 +00:00
|
|
|
|
{
|
2024-07-08 07:54:00 +00:00
|
|
|
|
private static readonly SqliteHelper Helper = SqliteHelper.GetInstance();
|
|
|
|
|
|
2024-06-19 11:28:48 +00:00
|
|
|
|
private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
|
2024-06-03 08:35:49 +00:00
|
|
|
|
|
|
|
|
|
private static readonly MqttClient Client = new();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 初始化连接
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="options"></param>
|
|
|
|
|
public static async Task<bool> InitConnect(MqttConnectOptions options)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
if (!await Client.ConnectAsync(options)) return false;
|
|
|
|
|
// 连接成功后监听消息
|
|
|
|
|
Client.MessageReceived += HandleMessageReceived;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e, "连接MQTT服务器失败");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 订阅某个主题
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="topic"></param>
|
|
|
|
|
/// <param name="delegate"></param>
|
|
|
|
|
/// <param name="qos"></param>
|
2024-06-19 11:28:48 +00:00
|
|
|
|
public static async Task<bool> Subscribe(string topic, Delegate @delegate,
|
2024-06-03 08:35:49 +00:00
|
|
|
|
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
|
|
|
|
|
{
|
|
|
|
|
if (!Subscribers.ContainsKey(topic))
|
|
|
|
|
{
|
|
|
|
|
Subscribers.Add(topic, []);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Subscribers[topic].Add(@delegate);
|
|
|
|
|
|
|
|
|
|
return await Client.Subscribe(topic, qos);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 发送消息
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="topic"></param>
|
|
|
|
|
/// <param name="message"></param>
|
|
|
|
|
/// <param name="qos"></param>
|
2024-09-05 13:25:51 +00:00
|
|
|
|
public static async Task<bool> Publish<T>(string topic, T message,
|
2024-06-03 08:35:49 +00:00
|
|
|
|
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
|
|
|
|
|
{
|
2024-07-08 07:54:00 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
var isSuccess = await Client.Publish(topic, message, qos);
|
|
|
|
|
if (!isSuccess)
|
|
|
|
|
{
|
|
|
|
|
throw new Exception("发送消息失败");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
catch (Exception)
|
|
|
|
|
{
|
|
|
|
|
MessageFailureRecord record = new()
|
|
|
|
|
{
|
|
|
|
|
Payload = JsonUtil.ToJson(message),
|
|
|
|
|
Topic = topic
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// 将当前消息记录到数据库
|
|
|
|
|
Helper.Insert(record);
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2024-06-03 08:35:49 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 处理接收到的消息
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="sender"></param>
|
|
|
|
|
/// <param name="e"></param>
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
private static void HandleMessageReceived(object? sender, MqttApplicationMessageReceivedEventArgs e)
|
|
|
|
|
{
|
|
|
|
|
var applicationMessage = e.ApplicationMessage;
|
|
|
|
|
var topic = applicationMessage.Topic;
|
|
|
|
|
var message = applicationMessage.ConvertPayloadToString();
|
|
|
|
|
|
2024-09-06 07:08:32 +00:00
|
|
|
|
Console.WriteLine($"收到消息:{topic} - {message}");
|
|
|
|
|
|
2024-06-03 08:35:49 +00:00
|
|
|
|
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
|
|
|
|
|
|
|
|
|
|
foreach (var subscriber in subscribers)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
2024-06-19 11:28:48 +00:00
|
|
|
|
var methodInfo = subscriber.Method;
|
|
|
|
|
var parameters = methodInfo.GetParameters();
|
|
|
|
|
if (parameters.Length != 2) continue;
|
2024-09-06 07:08:32 +00:00
|
|
|
|
|
|
|
|
|
var dataType = parameters[1].ParameterType;
|
|
|
|
|
|
|
|
|
|
var type = typeof(Payload<>).MakeGenericType(dataType);
|
|
|
|
|
|
|
|
|
|
var payload = JsonUtil.FromJson(type, message);
|
2024-09-05 11:41:09 +00:00
|
|
|
|
|
|
|
|
|
if (payload == null) continue;
|
|
|
|
|
|
2024-09-06 07:08:32 +00:00
|
|
|
|
var property = type.GetProperty("Data");
|
|
|
|
|
|
|
|
|
|
var data = property?.GetValue(payload);
|
2024-06-03 08:35:49 +00:00
|
|
|
|
// 通知订阅者
|
2024-09-06 07:08:32 +00:00
|
|
|
|
subscriber.DynamicInvoke(topic, data);
|
2024-06-03 08:35:49 +00:00
|
|
|
|
}
|
|
|
|
|
catch (Exception exception)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|