修复mqtt消费消息反序列化异常问题

This commit is contained in:
huangxianguo 2024-09-05 19:41:09 +08:00
parent b9a7b1350b
commit c5b12fa559
16 changed files with 144 additions and 64 deletions

View File

@ -9,7 +9,7 @@
public static class Topics public static class Topics
{ {
// ReSharper disable once InconsistentNaming // ReSharper disable once InconsistentNaming
private const string SN = "5506771257"; private const string SN = "G5506771257";
private const string Version = "1.0.0"; private const string Version = "1.0.0";

View File

@ -0,0 +1,9 @@
namespace MasstransferCommon.Atrributes;
public interface Instant
{
/// <summary>
/// 初始化对象后进行回调
/// </summary>
void Initialized();
}

View File

@ -1,4 +1,5 @@
using Newtonsoft.Json; using System.Text.Json;
using Newtonsoft.Json;
namespace MasstransferCommon.Utils; namespace MasstransferCommon.Utils;
@ -28,11 +29,15 @@ public class JsonUtil
} }
} }
public static T FromJson<T>(string json) public static T? FromJson<T>(string json)
{ {
try try
{ {
return JsonConvert.DeserializeObject<T>(json); var settings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
};
return JsonConvert.DeserializeObject<T>(json, settings);
} }
catch (Exception e) catch (Exception e)
{ {
@ -52,7 +57,7 @@ public class JsonUtil
} }
} }
public static T FromJsonOrDefault<T>(string json) public static T? FromJsonOrDefault<T>(string json)
{ {
try try
{ {

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Events; using MasstransferCommon.Atrributes;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
@ -10,17 +11,12 @@ namespace MasstransferExporter.DataExporter;
/// <summary> /// <summary>
/// 配置服务 /// 配置服务
/// </summary> /// </summary>
public class ConfigService public class ConfigService : Instant
{ {
static ConfigService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenConfigIssuedEvent(); });
}
/// <summary> /// <summary>
/// 监听配置下发事件 /// 监听配置下发事件
/// </summary> /// </summary>
public static async Task ListenConfigIssuedEvent() private static async Task ListenConfigIssuedEvent(EventType type, bool start)
{ {
await MessageQueueHelper.Subscribe(Topics.DownloadConfigData, HandleConfigIssuedEvent); await MessageQueueHelper.Subscribe(Topics.DownloadConfigData, HandleConfigIssuedEvent);
} }
@ -54,4 +50,9 @@ public class ConfigService
} }
} }
} }
public void Initialized()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, ListenConfigIssuedEvent);
}
} }

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Model.Constant; using MasstransferCommon.Atrributes;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferCommunicate.Minio; using MasstransferCommunicate.Minio;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
@ -10,7 +11,7 @@ namespace MasstransferExporter.DataExporter;
/// <summary> /// <summary>
/// 坐标信息业务类 /// 坐标信息业务类
/// </summary> /// </summary>
public class CoordinateService public class CoordinateService : Instant
{ {
private static readonly SqliteHelper Db = SqliteHelper.GetInstance(); private static readonly SqliteHelper Db = SqliteHelper.GetInstance();
@ -111,4 +112,8 @@ public class CoordinateService
}; };
await MessageQueueHelper.Publish(Topics.CoordinateUpload, data); await MessageQueueHelper.Publish(Topics.CoordinateUpload, data);
} }
public void Initialized()
{
}
} }

View File

@ -1,4 +1,5 @@
using System.Drawing; using System.Drawing;
using MasstransferCommon.Atrributes;
using MasstransferCommon.Events; using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
@ -10,7 +11,7 @@ using MasstransferInfrastructure.Database.Sqlite;
namespace MasstransferExporter.ImageExporter; namespace MasstransferExporter.ImageExporter;
public class ImageService public class ImageService : Instant
{ {
private static readonly MinioHelper Minio = MinioHelper.GetInstance(); private static readonly MinioHelper Minio = MinioHelper.GetInstance();
@ -19,15 +20,10 @@ public class ImageService
private const string BasePath = "masstransfer"; private const string BasePath = "masstransfer";
static ImageService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenImageQueryEvent(); });
}
/// <summary> /// <summary>
/// 监听图片查询事件 /// 监听图片查询事件
/// </summary> /// </summary>
public static async Task ListenImageQueryEvent() private static async Task ListenImageQueryEvent(EventType type, bool start)
{ {
await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleImageQueryEvent); await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleImageQueryEvent);
} }
@ -265,4 +261,9 @@ public class ImageService
Console.WriteLine($"图片导出失败,{e}"); Console.WriteLine($"图片导出失败,{e}");
} }
} }
public void Initialized()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, ListenImageQueryEvent);
}
} }

View File

@ -0,0 +1,33 @@
using System.Reflection;
using MasstransferCommon.Atrributes;
namespace MasstransferExporter.Init;
public class InstantUtil
{
/// <summary>
/// 启动是对象的初始化
/// </summary>
public static void Init()
{
// 获取当前程序集
var currentAssembly = Assembly.GetExecutingAssembly();
// 获取所有类型
var types = currentAssembly.GetTypes();
// 找到实现了 Instant 接口的类型
var instantImplementations =
types.Where(t => typeof(Instant).IsAssignableFrom(t) && t is { IsInterface: false, IsAbstract: false });
foreach (var type in instantImplementations)
{
// 创建实例
if (Activator.CreateInstance(type) is Instant instance)
{
// 调用 Initialized 方法
instance.Initialized();
}
}
}
}

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Events; using MasstransferCommon.Atrributes;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Constant;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Client; using MasstransferCommunicate.Process.Client;
@ -8,17 +9,12 @@ namespace MasstransferExporter.License;
/// <summary> /// <summary>
/// 证书业务 /// 证书业务
/// </summary> /// </summary>
public class LicenseService public class LicenseService : Instant
{ {
static LicenseService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenLicenseUpdateEvent(); });
}
/// <summary> /// <summary>
/// 启动监听证书更新事件 /// 启动监听证书更新事件
/// </summary> /// </summary>
public static async Task ListenLicenseUpdateEvent() private static async Task ListenLicenseUpdateEvent(EventType type, bool start)
{ {
await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleUpdateLicenseEvent); await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleUpdateLicenseEvent);
} }
@ -42,4 +38,9 @@ public class LicenseService
{ {
await MessageQueueHelper.Publish(Topics.UpdateLicenseEventFeedback, result); await MessageQueueHelper.Publish(Topics.UpdateLicenseEventFeedback, result);
} }
public void Initialized()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, ListenLicenseUpdateEvent);
}
} }

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Events; using MasstransferCommon.Atrributes;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
@ -9,20 +10,15 @@ using MasstransferInfrastructure.Database.Sqlite;
namespace MasstransferExporter.OTA.Service; namespace MasstransferExporter.OTA.Service;
public class OTAService public class OTAService : Instant
{ {
private static readonly SqliteHelper Db = SqliteHelper.GetInstance(); private static readonly SqliteHelper Db = SqliteHelper.GetInstance();
private static OTAUpdateFileManager _otaUpdateFileManager; private static OTAUpdateFileManager _otaUpdateFileManager;
static OTAService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await StartOTAService(); });
}
/// <summary> /// <summary>
/// 启动OTA服务 /// 启动OTA服务
/// </summary> /// </summary>
private static async Task StartOTAService() private static async Task StartOTAService(EventType type, bool start)
{ {
_otaUpdateFileManager = GetOTAUpdateFileManager(); _otaUpdateFileManager = GetOTAUpdateFileManager();
@ -116,7 +112,7 @@ public class OTAService
try try
{ {
//初始化 //初始化
initDir(criticalBackupDir); InitDir(criticalBackupDir);
//备份重要文件 //备份重要文件
OTAClient.BackupEssentialFiles(appDir, criticalBackupDir, criticalFileExtension, criticalSourceLogPath); OTAClient.BackupEssentialFiles(appDir, criticalBackupDir, criticalFileExtension, criticalSourceLogPath);
@ -167,7 +163,7 @@ public class OTAService
/// <summary> /// <summary>
/// 初始化重要文件备份目录 /// 初始化重要文件备份目录
/// </summary> /// </summary>
private static void initDir(string criticalBackupDir) private static void InitDir(string criticalBackupDir)
{ {
if (Directory.Exists(criticalBackupDir)) if (Directory.Exists(criticalBackupDir))
{ {
@ -185,7 +181,7 @@ public class OTAService
{ {
//删除安装过程中创建的文件、目录 //删除安装过程中创建的文件、目录
var directoryList = new string[] { appDir, criticalBackupDir }; var directoryList = new string[] { appDir, criticalBackupDir };
var filePath = new String[] { updatePackagePath }; var filePath = new string[] { updatePackagePath };
foreach (var directory in directoryList) foreach (var directory in directoryList)
{ {
@ -209,4 +205,9 @@ public class OTAService
//删除旧版本备份 //删除旧版本备份
OTAClient.DeleteFile(previousBackupPath); OTAClient.DeleteFile(previousBackupPath);
} }
public void Initialized()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, StartOTAService);
}
} }

View File

@ -1,7 +1,11 @@
using MasstransferCommon.Events; using MasstransferCommon.Events;
using MasstransferCommon.Model.Entity; using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Mqtt.Model;
using MasstransferCommunicate.Process.Client; using MasstransferCommunicate.Process.Client;
using MasstransferExporter.Init;
using MasstransferExporter.RemoteControl.Model;
using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
@ -13,6 +17,9 @@ class Program
public static async Task Main() public static async Task Main()
{ {
// 进行初始化调用
InstantUtil.Init();
var mqttParams = Db.Query<MqttParams>("select * from mqtt_params").FirstOrDefault(); var mqttParams = Db.Query<MqttParams>("select * from mqtt_params").FirstOrDefault();
// 启动mqtt连接 // 启动mqtt连接
@ -24,14 +31,17 @@ class Program
Password = mqttParams.Password Password = mqttParams.Password
}); });
// 启动与主程序的通信 Thread.Sleep(3000);
ProcessHelper.Init();
// 启动完成后,广播启动通知 // 启动完成后,广播启动通知
EventBus<bool>.Publish(EventType.StartUp, true); EventBus<bool>.Publish(EventType.StartUp, true);
// 启动与主程序的通信
// ProcessHelper.Init();
Console.WriteLine("按任意键退出"); Console.WriteLine("按任意键退出");
Console.ReadKey(); Console.ReadKey();
} }
} }

View File

@ -1,4 +1,6 @@
namespace MasstransferExporter.RemoteControl.Model; using Newtonsoft.Json;
namespace MasstransferExporter.RemoteControl.Model;
/// <summary> /// <summary>
/// 锁机指令 /// 锁机指令
@ -8,16 +10,19 @@ public class LockCmd
/// <summary> /// <summary>
/// 0 锁机 1 解锁 /// 0 锁机 1 解锁
/// </summary> /// </summary>
[JsonProperty("action")]
public int Action { get; set; } public int Action { get; set; }
/// <summary> /// <summary>
/// 0 立即执行 /// 0 立即执行
/// 1 到期执行 /// 1 到期执行
/// </summary> /// </summary>
[JsonProperty("lockType")]
public int LockType { get; set; } public int LockType { get; set; }
/// <summary> /// <summary>
/// 到期时间 /// 到期时间
/// </summary> /// </summary>
[JsonProperty("expiryTime")]
public long ExpiryTime { get; set; } public long ExpiryTime { get; set; }
} }

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Events; using MasstransferCommon.Atrributes;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Constant;
using MasstransferCommon.Model.Enum; using MasstransferCommon.Model.Enum;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
@ -11,20 +12,14 @@ namespace MasstransferExporter.RemoteControl;
/// <summary> /// <summary>
/// 远程锁定服务 /// 远程锁定服务
/// </summary> /// </summary>
public class RemoteLockService public class RemoteLockService : Instant
{ {
private const string KeyPath = @"Software\Masstransfer\Security"; private const string KeyPath = @"Software\Masstransfer\Security";
static RemoteLockService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenRemoteLockEvent(); });
}
/// <summary> /// <summary>
/// 监听远程锁机事件 /// 监听远程锁机事件
/// </summary> /// </summary>
public static async Task ListenRemoteLockEvent() private static async Task ListenRemoteLockEvent(EventType type, bool start)
{ {
await MessageQueueHelper.Subscribe(Topics.RemoteControl, HandleLockCmd); await MessageQueueHelper.Subscribe(Topics.RemoteControl, HandleLockCmd);
} }
@ -75,4 +70,9 @@ public class RemoteLockService
} }
} }
} }
public void Initialized()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, ListenRemoteLockEvent);
}
} }

View File

@ -1,5 +1,6 @@
using MasstransferCommon.Model.Entity; using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Model;
using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
using MQTTnet; using MQTTnet;
@ -113,9 +114,12 @@ public class MessageQueueHelper
var methodInfo = subscriber.Method; var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters(); var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue; if (parameters.Length != 2) continue;
var type = parameters[1].ParameterType; var payload = JsonUtil.FromJson<Payload<object>>(message);
if (payload == null) continue;
// 通知订阅者 // 通知订阅者
subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message)); subscriber.DynamicInvoke(topic, JsonUtil.ToJson(payload.Data));
} }
catch (Exception exception) catch (Exception exception)
{ {

View File

@ -1,4 +1,5 @@
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
using MasstransferSecurity.Utils; using MasstransferSecurity.Utils;
@ -33,7 +34,7 @@ class MqttClient
return new MqttClientOptionsBuilder() return new MqttClientOptionsBuilder()
.WithTcpServer(options.ServerAddress, options.Port) .WithTcpServer(options.ServerAddress, options.Port)
// .WithCredentials(options.UserName, options.Password) // .WithCredentials(options.UserName, options.Password)
.WithClientId(clientId) .WithClientId(Constants.SN)
.WithCleanSession() .WithCleanSession()
.WithTlsOptions( .WithTlsOptions(
o => o =>

View File

@ -1,8 +1,10 @@
namespace MasstransferInfrastructure.Mqtt.Model; using MasstransferCommunicate.Mqtt.Model;
public class Message namespace MasstransferInfrastructure.Mqtt.Model;
public class Message<T>
{ {
public string Topic { get; set; } public string Topic { get; set; }
public Payload Payload { get; set; } public Payload<T> Payload { get; set; }
} }

View File

@ -1,10 +1,12 @@
namespace MasstransferInfrastructure.Mqtt.Model; using Newtonsoft.Json;
public class Payload namespace MasstransferCommunicate.Mqtt.Model;
public class Payload<T>
{ {
public string MsgId { get; set; } [JsonProperty("msgId")] public string MsgId { get; set; }
public DateTime ConsumeTime { get; set; } [JsonProperty("consumeTime")] public string ConsumeTime { get; set; }
public object Data { get; set; } [JsonProperty("data")] public T Data { get; set; }
} }