diff --git a/MasstransferCommon/Model/Entity/MessageFailureRecord.cs b/MasstransferCommon/Model/Entity/MessageFailureRecord.cs new file mode 100644 index 0000000..7814860 --- /dev/null +++ b/MasstransferCommon/Model/Entity/MessageFailureRecord.cs @@ -0,0 +1,13 @@ +using System.ComponentModel; +using SQLite; + +namespace MasstransferCommon.Model.Entity; + +[Table("message_failure_record"), Description("消息发送失败记录")] +public class MessageFailureRecord : Entity +{ + [Column("topic"), Description("主题")] public string Topic { get; set; } + + [Column("payload"), Description("消息内容")] + public string Payload { get; set; } +} \ No newline at end of file diff --git a/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs b/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs index 3cdee38..e131d54 100644 --- a/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs +++ b/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs @@ -15,7 +15,11 @@ public class SqliteHelper private const string Password = "88888888"; private readonly SQLiteConnection _db; - public SqliteHelper() + private static SqliteHelper? _instance; + + private static readonly object Locker = new(); + + private SqliteHelper() { var profile = Environment.GetEnvironmentVariable("USERPROFILE"); var path = Path.Combine(profile, "masstransfer", "mass-transfer.db"); @@ -34,6 +38,17 @@ public class SqliteHelper _db.Execute($"PRAGMA key = '{Password}'"); } + public static SqliteHelper GetInstance() + { + lock (Locker) + { + // 如果类的实例不存在则创建,否则直接返回 + _instance ??= new SqliteHelper(); + } + + return _instance; + } + /// /// 插入数据 diff --git a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs index fb07a17..4eea18b 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs @@ -1,5 +1,6 @@ -using System.Reflection; +using MasstransferCommon.Model.Entity; using MasstransferCommon.Utils; +using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Mqtt.Model; using MQTTnet; using MQTTnet.Client; @@ -10,6 +11,8 @@ namespace MasstransferInfrastructure.Mqtt.Client; public class MessageQueueHelper { + private static readonly SqliteHelper Helper = SqliteHelper.GetInstance(); + private static readonly Dictionary> Subscribers = new(); private static readonly MqttClient Client = new(); @@ -63,7 +66,29 @@ public class MessageQueueHelper public static async Task Publish(string topic, object message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) { - return await Client.Publish(topic, message, qos); + try + { + var isSuccess = await Client.Publish(topic, message, qos); + if (!isSuccess) + { + throw new Exception("发送消息失败"); + } + + return true; + } + catch (Exception) + { + MessageFailureRecord record = new() + { + Payload = JsonUtil.ToJson(message), + Topic = topic + }; + + // 将当前消息记录到数据库 + Helper.Insert(record); + + return false; + } } /// @@ -87,7 +112,7 @@ public class MessageQueueHelper var methodInfo = subscriber.Method; var parameters = methodInfo.GetParameters(); if (parameters.Length != 2) continue; - var type = parameters[1].ParameterType; + var type = parameters[1].ParameterType; // 通知订阅者 subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message)); } diff --git a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs index fb915a3..728d3d3 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs @@ -108,12 +108,13 @@ class MqttClient var payload = message as string ?? JsonUtil.ToJson(message); - await _client.PublishAsync(new MqttApplicationMessageBuilder() + var result = await _client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) .Build()); - return true; + + return result.IsSuccess; } ///