添加消息发送失败后记录异常记录

This commit is contained in:
huangxianguo 2024-07-08 15:54:00 +08:00
parent eb4e1b69de
commit 2bdf58a0c8
4 changed files with 60 additions and 6 deletions

View File

@ -0,0 +1,13 @@
using System.ComponentModel;
using SQLite;
namespace MasstransferCommon.Model.Entity;
[Table("message_failure_record"), Description("消息发送失败记录")]
public class MessageFailureRecord : Entity
{
[Column("topic"), Description("主题")] public string Topic { get; set; }
[Column("payload"), Description("消息内容")]
public string Payload { get; set; }
}

View File

@ -15,7 +15,11 @@ public class SqliteHelper
private const string Password = "88888888"; private const string Password = "88888888";
private readonly SQLiteConnection _db; private readonly SQLiteConnection _db;
public SqliteHelper() private static SqliteHelper? _instance;
private static readonly object Locker = new();
private SqliteHelper()
{ {
var profile = Environment.GetEnvironmentVariable("USERPROFILE"); var profile = Environment.GetEnvironmentVariable("USERPROFILE");
var path = Path.Combine(profile, "masstransfer", "mass-transfer.db"); var path = Path.Combine(profile, "masstransfer", "mass-transfer.db");
@ -34,6 +38,17 @@ public class SqliteHelper
_db.Execute($"PRAGMA key = '{Password}'"); _db.Execute($"PRAGMA key = '{Password}'");
} }
public static SqliteHelper GetInstance()
{
lock (Locker)
{
// 如果类的实例不存在则创建,否则直接返回
_instance ??= new SqliteHelper();
}
return _instance;
}
/// <summary> /// <summary>
/// 插入数据 /// 插入数据

View File

@ -1,5 +1,6 @@
using System.Reflection; using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
@ -10,6 +11,8 @@ namespace MasstransferInfrastructure.Mqtt.Client;
public class MessageQueueHelper public class MessageQueueHelper
{ {
private static readonly SqliteHelper Helper = SqliteHelper.GetInstance();
private static readonly Dictionary<string, List<Delegate>> Subscribers = new(); private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
private static readonly MqttClient Client = new(); private static readonly MqttClient Client = new();
@ -63,7 +66,29 @@ public class MessageQueueHelper
public static async Task<bool> Publish(string topic, object message, public static async Task<bool> Publish(string topic, object message,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{ {
return await Client.Publish(topic, message, qos); try
{
var isSuccess = await Client.Publish(topic, message, qos);
if (!isSuccess)
{
throw new Exception("发送消息失败");
}
return true;
}
catch (Exception)
{
MessageFailureRecord record = new()
{
Payload = JsonUtil.ToJson(message),
Topic = topic
};
// 将当前消息记录到数据库
Helper.Insert(record);
return false;
}
} }
/// <summary> /// <summary>

View File

@ -108,12 +108,13 @@ class MqttClient
var payload = message as string ?? JsonUtil.ToJson(message); var payload = message as string ?? JsonUtil.ToJson(message);
await _client.PublishAsync(new MqttApplicationMessageBuilder() var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(payload) .WithPayload(payload)
.WithQualityOfServiceLevel(qos) .WithQualityOfServiceLevel(qos)
.Build()); .Build());
return true;
return result.IsSuccess;
} }
/// <summary> /// <summary>