From 55ed7729e1e7a6d95fb1bfedc2bdeb4eef4f5466 Mon Sep 17 00:00:00 2001 From: huangxianguo Date: Mon, 2 Sep 2024 21:02:04 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=95=B0=E6=8D=AE=E9=87=87?= =?UTF-8?q?=E9=9B=86=E8=BF=9B=E7=A8=8B=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MasstransferCommon/Events/EventAction.cs | 10 +++ MasstransferCommon/Events/EventBus.cs | 65 +++++++++++++++++ MasstransferCommon/Events/EventListener.cs | 10 +++ MasstransferCommon/Events/EventType.cs | 15 ++++ MasstransferCommon/MasstransferCommon.csproj | 1 + .../DataExporter/ConfigService.cs | 8 ++- .../ImageExporter/ImageService.cs | 6 ++ .../License/LicenseService.cs | 11 ++- .../LogExporter/LogFileExporter.cs | 8 ++- .../OTA/Service/OTAService.cs | 69 ++++++++++--------- MasstransferExporter/Program.cs | 36 ++++++---- .../RemoteControl/RemoteLockService.cs | 26 ++++++- .../Process/Service/ProcessCommunicator.cs | 2 +- 13 files changed, 214 insertions(+), 53 deletions(-) create mode 100644 MasstransferCommon/Events/EventAction.cs create mode 100644 MasstransferCommon/Events/EventBus.cs create mode 100644 MasstransferCommon/Events/EventListener.cs create mode 100644 MasstransferCommon/Events/EventType.cs diff --git a/MasstransferCommon/Events/EventAction.cs b/MasstransferCommon/Events/EventAction.cs new file mode 100644 index 0000000..380f8f1 --- /dev/null +++ b/MasstransferCommon/Events/EventAction.cs @@ -0,0 +1,10 @@ +namespace MasstransferCommon.Events; + +/// +/// 用于声明当前方法为事件处理器 +/// +[AttributeUsage(AttributeTargets.Method)] +public class EventAction(params EventType[] types) : Attribute +{ + public EventType[] Types => types; +} \ No newline at end of file diff --git a/MasstransferCommon/Events/EventBus.cs b/MasstransferCommon/Events/EventBus.cs new file mode 100644 index 0000000..e158594 --- /dev/null +++ b/MasstransferCommon/Events/EventBus.cs @@ -0,0 +1,65 @@ +using Serilog; + +namespace MasstransferCommon.Events; + +/// +/// 事件总线 +/// +/// +public abstract class EventBus +{ + private static readonly Dictionary> Subscribers = new(); + + /// + /// 添加订阅 + /// + /// + /// + public static void AddEventHandler(EventType type, Delegate action) + { + if (Subscribers.TryGetValue(type, out var subscribers)) + { + var any = subscribers.Any(item => item.Equals(action)); + if (!any) subscribers.Add(action); + } + else + { + Subscribers.Add(type, [action]); + } + } + + /// + /// 移除订阅逻辑 + /// + /// 事件类型 + /// 回调方法 + public static void RemoveEventHandler(EventType type, Delegate action) + { + if (!Subscribers.TryGetValue(type, out var handlers)) return; + + handlers.Remove(action); + } + + /// + /// 发布事件 + /// + /// + /// + public static void Publish(EventType type, T data) + { + if (!Subscribers.TryGetValue(type, out var subscribers)) return; + + // 创建一个副本,避免在回调中修改订阅列表导致迭代异常 + var actions = subscribers.ToList(); + + foreach (var action in actions) + try + { + action.DynamicInvoke(type, data); + } + catch (Exception e) + { + Log.Error(e, e.Message); + } + } +} \ No newline at end of file diff --git a/MasstransferCommon/Events/EventListener.cs b/MasstransferCommon/Events/EventListener.cs new file mode 100644 index 0000000..dc0c28b --- /dev/null +++ b/MasstransferCommon/Events/EventListener.cs @@ -0,0 +1,10 @@ +namespace MasstransferCommon.Events; + +/// +/// 用于声明当前类为事件监听 +/// +[AttributeUsage(AttributeTargets.Class)] +public class EventListener(string name = "") : Attribute +{ + private string Name => name; +} \ No newline at end of file diff --git a/MasstransferCommon/Events/EventType.cs b/MasstransferCommon/Events/EventType.cs new file mode 100644 index 0000000..e1df75b --- /dev/null +++ b/MasstransferCommon/Events/EventType.cs @@ -0,0 +1,15 @@ +using System.ComponentModel; + +namespace MasstransferCommon.Events; + +/// +/// 通过事件发布订阅模式实现系统各组件的解耦 +/// 这里定义的是事件的驱动类型 +/// +public enum EventType +{ + // 系统初始化事件 + [Description("系统初始化事件")] SetupNotify, + + [Description("系统已经启动事件")] StartUp, +} \ No newline at end of file diff --git a/MasstransferCommon/MasstransferCommon.csproj b/MasstransferCommon/MasstransferCommon.csproj index 188e4da..6d2c11b 100644 --- a/MasstransferCommon/MasstransferCommon.csproj +++ b/MasstransferCommon/MasstransferCommon.csproj @@ -4,6 +4,7 @@ net7.0 enable enable + preview diff --git a/MasstransferExporter/DataExporter/ConfigService.cs b/MasstransferExporter/DataExporter/ConfigService.cs index 6615651..17239b8 100644 --- a/MasstransferExporter/DataExporter/ConfigService.cs +++ b/MasstransferExporter/DataExporter/ConfigService.cs @@ -1,4 +1,5 @@ -using MasstransferCommon.Model.Constant; +using MasstransferCommon.Events; +using MasstransferCommon.Model.Constant; using MasstransferCommon.Utils; using MasstransferCommunicate.Mqtt.Client; using MasstransferExporter.DataExporter.Model; @@ -11,6 +12,11 @@ namespace MasstransferExporter.DataExporter; /// public class ConfigService { + static ConfigService() + { + EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenConfigIssuedEvent(); }); + } + /// /// 监听配置下发事件 /// diff --git a/MasstransferExporter/ImageExporter/ImageService.cs b/MasstransferExporter/ImageExporter/ImageService.cs index ac4ebfc..92cceb7 100644 --- a/MasstransferExporter/ImageExporter/ImageService.cs +++ b/MasstransferExporter/ImageExporter/ImageService.cs @@ -1,4 +1,5 @@ using System.Drawing; +using MasstransferCommon.Events; using MasstransferCommon.Model.Constant; using MasstransferCommon.Utils; using MasstransferCommunicate.Minio; @@ -18,6 +19,11 @@ public class ImageService private const string BasePath = "masstransfer"; + static ImageService() + { + EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenImageQueryEvent(); }); + } + /// /// 监听图片查询事件 /// diff --git a/MasstransferExporter/License/LicenseService.cs b/MasstransferExporter/License/LicenseService.cs index 5bbb75e..04b57f1 100644 --- a/MasstransferExporter/License/LicenseService.cs +++ b/MasstransferExporter/License/LicenseService.cs @@ -1,7 +1,7 @@ -using MasstransferCommon.Model.Constant; +using MasstransferCommon.Events; +using MasstransferCommon.Model.Constant; using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Process.Service; -using MasstransferInfrastructure.Mqtt.Client; namespace MasstransferExporter.License; @@ -10,6 +10,11 @@ namespace MasstransferExporter.License; /// public class LicenseService { + static LicenseService() + { + EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenLicenseUpdateEvent(); }); + } + /// /// 启动监听证书更新事件 /// @@ -17,7 +22,7 @@ public class LicenseService { await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleUpdateLicenseEvent); } - + /// /// 处理接收到的证书更新事件 /// diff --git a/MasstransferExporter/LogExporter/LogFileExporter.cs b/MasstransferExporter/LogExporter/LogFileExporter.cs index 2fa48a4..6da2a45 100644 --- a/MasstransferExporter/LogExporter/LogFileExporter.cs +++ b/MasstransferExporter/LogExporter/LogFileExporter.cs @@ -1,4 +1,5 @@ -using MasstransferCommon.Model.Constant; +using MasstransferCommon.Events; +using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Entity; using MasstransferCommon.Utils; using MasstransferCommunicate.Minio; @@ -17,6 +18,11 @@ public class LogFileExporter private static readonly MinioHelper Minio = MinioHelper.GetInstance(); + static LogFileExporter() + { + EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenQueryLogEvent(); }); + } + /// /// 监听查询日志事件 /// diff --git a/MasstransferExporter/OTA/Service/OTAService.cs b/MasstransferExporter/OTA/Service/OTAService.cs index 893a5ed..a2df4c3 100644 --- a/MasstransferExporter/OTA/Service/OTAService.cs +++ b/MasstransferExporter/OTA/Service/OTAService.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using MasstransferCommon.Events; using MasstransferCommon.Model.Constant; using MasstransferCommon.Utils; using MasstransferCommunicate.Mqtt.Client; @@ -13,29 +14,32 @@ public class OTAService { private static readonly SqliteHelper Db = SqliteHelper.GetInstance(); private static OTAUpdateFileManager _otaUpdateFileManager; - - + + static OTAService() + { + EventBus.AddEventHandler(EventType.StartUp, async () => { await StartOTAService(); }); + } + /// /// 启动OTA服务 /// public static async Task StartOTAService() { - _otaUpdateFileManager = GetOTAUpdateFileManager(); - + //订阅云端发包 await MessageQueueHelper.Subscribe(Topics.IssuedOTAPackage, HandleIssuedOTAPackage); - + //订阅Masstransfer进程 请求OTA事件 ProcessCommunicator.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent); - + //订阅Masstransfer进程 启动更新事件 ProcessCommunicator.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent); - + //订阅订阅Masstransfer进程 下载安装包事件 ProcessCommunicator.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent); } - + /// /// 从数据库读取文件操作地址、关键文件后缀 /// @@ -49,8 +53,8 @@ public class OTAService { await ProcessCommunicator.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData); } - - + + /// /// Masstransfer 请求OTA事件响应,向云端发送请求信息 /// @@ -69,7 +73,7 @@ public class OTAService try { await ApiClient.DownloadFileAsync(url, updatePackagePath); - if(!OTAClient.CheckMD5(checksum, updatePackagePath)) throw new Exception("MD5 check failed."); + if (!OTAClient.CheckMD5(checksum, updatePackagePath)) throw new Exception("MD5 check failed."); result = true; } catch (Exception e) @@ -79,27 +83,27 @@ public class OTAService } await ProcessCommunicator.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result); - } + //Masstransfer 通知-启动更新 public static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData) { var version = ""; - + OTAResultData otaResultData = new OTAResultData(); - + //启动更新 otaResultData.Result = Install() ? (byte)1 : (byte)0; //restart masstransfer - + //更新反馈 otaResultData.OtaSoftwareVersion = version; - otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本 + otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本 await MessageQueueHelper.Publish(Topics.OTAUpgradeFeedback, otaResultData); } - + /// /// 安装压缩包 /// @@ -115,10 +119,10 @@ public class OTAService { //初始化 initDir(criticalBackupDir); - + //备份重要文件 OTAClient.BackupEssentialFiles(appDir, criticalBackupDir, criticalFileExtension, criticalSourceLogPath); - + //备份旧版本文件 OTAClient.CompressDirectory(appDir, previousBackupPath); @@ -126,21 +130,20 @@ public class OTAService { //删除旧版本原文件 OTAClient.DeleteDirectory(appDir); - + //解压更新包 OTAClient.ExtractDirectory(updatePackagePath, appDir); - + //恢复重要文件 OTAClient.RecoverCriticalFiles(criticalSourceLogPath); - + //删除旧版本备份文件 OTAClient.DeleteFile(previousBackupPath); - + //删除更新包 OTAClient.DeleteFile(updatePackagePath); return true; - } catch (Exception ex) { @@ -152,9 +155,9 @@ public class OTAService catch (Exception ex) { Console.WriteLine($"更新异常e: {ex.Message}"); - + //如果重要文件备份已经产生,则删除 - if(Directory.Exists(criticalBackupDir)) + if (Directory.Exists(criticalBackupDir)) { OTAClient.DeleteDirectory(criticalBackupDir); } @@ -162,7 +165,7 @@ public class OTAService return false; } } - + /// /// 初始化重要文件备份目录 /// @@ -172,17 +175,18 @@ public class OTAService { Directory.Delete(criticalBackupDir, true); } + Directory.CreateDirectory(criticalBackupDir); - } /// /// 回滚 /// - private static void RollBack(string appDir, string criticalBackupDir, string updatePackagePath, string previousBackupPath) + private static void RollBack(string appDir, string criticalBackupDir, string updatePackagePath, + string previousBackupPath) { //删除安装过程中创建的文件、目录 - var directoryList = new string[] { appDir, criticalBackupDir}; + var directoryList = new string[] { appDir, criticalBackupDir }; var filePath = new String[] { updatePackagePath }; foreach (var directory in directoryList) @@ -200,12 +204,11 @@ public class OTAService OTAClient.DeleteFile(file); } } - + //恢复旧版本文件 OTAClient.ExtractDirectory(previousBackupPath, appDir); - + //删除旧版本备份 OTAClient.DeleteFile(previousBackupPath); - } } \ No newline at end of file diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs index 47bdb29..82acfb4 100644 --- a/MasstransferExporter/Program.cs +++ b/MasstransferExporter/Program.cs @@ -1,23 +1,35 @@ -using MasstransferCommunicate.Mqtt.Client; -using MasstransferExporter.DataExporter; -using MasstransferExporter.OTA.Service; +using MasstransferCommon.Events; +using MasstransferCommon.Model.Entity; +using MasstransferCommunicate.Mqtt.Client; +using MasstransferCommunicate.Process.Service; +using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Mqtt.Model; +namespace MasstransferExporter; + class Program { - static async Task Main() + private static readonly SqliteHelper Db = SqliteHelper.GetInstance(); + + public static async Task Main() { - var options = new MqttConnectOptions + var mqttParams = Db.Query("select * from mqtt_params").FirstOrDefault(); + + // 启动mqtt连接 + await MessageQueueHelper.InitConnect(new MqttConnectOptions() { - ServerAddress = "cloud.haiju-tech.com", - Port = 8884, - EnableTls = false - }; + ServerAddress = mqttParams.ServerAddress, + Port = mqttParams.Port, + UserName = mqttParams.UserName, + Password = mqttParams.Password + }); - await MessageQueueHelper.InitConnect(options); + // 启动与主程序的通信 + await ProcessCommunicator.Connect(); + + // 启动完成后,广播启动通知 + EventBus.Publish(EventType.StartUp, true); - CoordinateService.CoordinateExporter(); - await OTAService.StartOTAService(); Console.WriteLine("按任意键退出"); Console.ReadKey(); } diff --git a/MasstransferExporter/RemoteControl/RemoteLockService.cs b/MasstransferExporter/RemoteControl/RemoteLockService.cs index 6973ca6..048f3e9 100644 --- a/MasstransferExporter/RemoteControl/RemoteLockService.cs +++ b/MasstransferExporter/RemoteControl/RemoteLockService.cs @@ -1,5 +1,8 @@ -using MasstransferCommon.Model.Enum; +using MasstransferCommon.Events; +using MasstransferCommon.Model.Constant; +using MasstransferCommon.Model.Enum; using MasstransferCommon.Utils; +using MasstransferCommunicate.Mqtt.Client; using MasstransferExporter.RemoteControl.Model; using Microsoft.Win32; @@ -12,11 +15,30 @@ public class RemoteLockService { private const string KeyPath = @"Software\Masstransfer\Security"; + + static RemoteLockService() + { + EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenRemoteLockEvent(); }); + } + + /// + /// 监听远程锁机事件 + /// + public static async Task ListenRemoteLockEvent() + { + await MessageQueueHelper.Subscribe(Topics.RemoteControl, HandleLockCmd); + } + + /// /// 处理接收到锁机业务指令 /// - public static void HandleLockCmd(LockCmd cmd) + private static void HandleLockCmd(string topic, string payload) { + var cmd = JsonUtil.FromJson(payload); + + if (cmd == null) return; + var action = cmd.Action; if (action == 0) diff --git a/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs b/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs index 5bb369b..d2afbd7 100644 --- a/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs +++ b/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs @@ -19,7 +19,7 @@ public class ProcessCommunicator /// public static async Task Connect() { - _helper = await ProcessHelper.CreateServer("Masstransfer"); + _helper = await ProcessHelper.CreateClient("Masstransfer"); _helper.MessageReceived += HandleMessageReceived; }