diff --git a/MasstransferCommon/Events/EventAction.cs b/MasstransferCommon/Events/EventAction.cs
new file mode 100644
index 0000000..380f8f1
--- /dev/null
+++ b/MasstransferCommon/Events/EventAction.cs
@@ -0,0 +1,10 @@
+namespace MasstransferCommon.Events;
+
+///
+/// 用于声明当前方法为事件处理器
+///
+[AttributeUsage(AttributeTargets.Method)]
+public class EventAction(params EventType[] types) : Attribute
+{
+ public EventType[] Types => types;
+}
\ No newline at end of file
diff --git a/MasstransferCommon/Events/EventBus.cs b/MasstransferCommon/Events/EventBus.cs
new file mode 100644
index 0000000..e158594
--- /dev/null
+++ b/MasstransferCommon/Events/EventBus.cs
@@ -0,0 +1,65 @@
+using Serilog;
+
+namespace MasstransferCommon.Events;
+
+///
+/// 事件总线
+///
+///
+public abstract class EventBus
+{
+ private static readonly Dictionary> Subscribers = new();
+
+ ///
+ /// 添加订阅
+ ///
+ ///
+ ///
+ public static void AddEventHandler(EventType type, Delegate action)
+ {
+ if (Subscribers.TryGetValue(type, out var subscribers))
+ {
+ var any = subscribers.Any(item => item.Equals(action));
+ if (!any) subscribers.Add(action);
+ }
+ else
+ {
+ Subscribers.Add(type, [action]);
+ }
+ }
+
+ ///
+ /// 移除订阅逻辑
+ ///
+ /// 事件类型
+ /// 回调方法
+ public static void RemoveEventHandler(EventType type, Delegate action)
+ {
+ if (!Subscribers.TryGetValue(type, out var handlers)) return;
+
+ handlers.Remove(action);
+ }
+
+ ///
+ /// 发布事件
+ ///
+ ///
+ ///
+ public static void Publish(EventType type, T data)
+ {
+ if (!Subscribers.TryGetValue(type, out var subscribers)) return;
+
+ // 创建一个副本,避免在回调中修改订阅列表导致迭代异常
+ var actions = subscribers.ToList();
+
+ foreach (var action in actions)
+ try
+ {
+ action.DynamicInvoke(type, data);
+ }
+ catch (Exception e)
+ {
+ Log.Error(e, e.Message);
+ }
+ }
+}
\ No newline at end of file
diff --git a/MasstransferCommon/Events/EventListener.cs b/MasstransferCommon/Events/EventListener.cs
new file mode 100644
index 0000000..dc0c28b
--- /dev/null
+++ b/MasstransferCommon/Events/EventListener.cs
@@ -0,0 +1,10 @@
+namespace MasstransferCommon.Events;
+
+///
+/// 用于声明当前类为事件监听
+///
+[AttributeUsage(AttributeTargets.Class)]
+public class EventListener(string name = "") : Attribute
+{
+ private string Name => name;
+}
\ No newline at end of file
diff --git a/MasstransferCommon/Events/EventType.cs b/MasstransferCommon/Events/EventType.cs
new file mode 100644
index 0000000..e1df75b
--- /dev/null
+++ b/MasstransferCommon/Events/EventType.cs
@@ -0,0 +1,15 @@
+using System.ComponentModel;
+
+namespace MasstransferCommon.Events;
+
+///
+/// 通过事件发布订阅模式实现系统各组件的解耦
+/// 这里定义的是事件的驱动类型
+///
+public enum EventType
+{
+ // 系统初始化事件
+ [Description("系统初始化事件")] SetupNotify,
+
+ [Description("系统已经启动事件")] StartUp,
+}
\ No newline at end of file
diff --git a/MasstransferCommon/MasstransferCommon.csproj b/MasstransferCommon/MasstransferCommon.csproj
index 188e4da..6d2c11b 100644
--- a/MasstransferCommon/MasstransferCommon.csproj
+++ b/MasstransferCommon/MasstransferCommon.csproj
@@ -4,6 +4,7 @@
net7.0
enable
enable
+ preview
diff --git a/MasstransferExporter/DataExporter/ConfigService.cs b/MasstransferExporter/DataExporter/ConfigService.cs
index 6615651..17239b8 100644
--- a/MasstransferExporter/DataExporter/ConfigService.cs
+++ b/MasstransferExporter/DataExporter/ConfigService.cs
@@ -1,4 +1,5 @@
-using MasstransferCommon.Model.Constant;
+using MasstransferCommon.Events;
+using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferExporter.DataExporter.Model;
@@ -11,6 +12,11 @@ namespace MasstransferExporter.DataExporter;
///
public class ConfigService
{
+ static ConfigService()
+ {
+ EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenConfigIssuedEvent(); });
+ }
+
///
/// 监听配置下发事件
///
diff --git a/MasstransferExporter/ImageExporter/ImageService.cs b/MasstransferExporter/ImageExporter/ImageService.cs
index ac4ebfc..92cceb7 100644
--- a/MasstransferExporter/ImageExporter/ImageService.cs
+++ b/MasstransferExporter/ImageExporter/ImageService.cs
@@ -1,4 +1,5 @@
using System.Drawing;
+using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Minio;
@@ -18,6 +19,11 @@ public class ImageService
private const string BasePath = "masstransfer";
+ static ImageService()
+ {
+ EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenImageQueryEvent(); });
+ }
+
///
/// 监听图片查询事件
///
diff --git a/MasstransferExporter/License/LicenseService.cs b/MasstransferExporter/License/LicenseService.cs
index 5bbb75e..04b57f1 100644
--- a/MasstransferExporter/License/LicenseService.cs
+++ b/MasstransferExporter/License/LicenseService.cs
@@ -1,7 +1,7 @@
-using MasstransferCommon.Model.Constant;
+using MasstransferCommon.Events;
+using MasstransferCommon.Model.Constant;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Service;
-using MasstransferInfrastructure.Mqtt.Client;
namespace MasstransferExporter.License;
@@ -10,6 +10,11 @@ namespace MasstransferExporter.License;
///
public class LicenseService
{
+ static LicenseService()
+ {
+ EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenLicenseUpdateEvent(); });
+ }
+
///
/// 启动监听证书更新事件
///
@@ -17,7 +22,7 @@ public class LicenseService
{
await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleUpdateLicenseEvent);
}
-
+
///
/// 处理接收到的证书更新事件
///
diff --git a/MasstransferExporter/LogExporter/LogFileExporter.cs b/MasstransferExporter/LogExporter/LogFileExporter.cs
index 2fa48a4..6da2a45 100644
--- a/MasstransferExporter/LogExporter/LogFileExporter.cs
+++ b/MasstransferExporter/LogExporter/LogFileExporter.cs
@@ -1,4 +1,5 @@
-using MasstransferCommon.Model.Constant;
+using MasstransferCommon.Events;
+using MasstransferCommon.Model.Constant;
using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Minio;
@@ -17,6 +18,11 @@ public class LogFileExporter
private static readonly MinioHelper Minio = MinioHelper.GetInstance();
+ static LogFileExporter()
+ {
+ EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenQueryLogEvent(); });
+ }
+
///
/// 监听查询日志事件
///
diff --git a/MasstransferExporter/OTA/Service/OTAService.cs b/MasstransferExporter/OTA/Service/OTAService.cs
index 893a5ed..a2df4c3 100644
--- a/MasstransferExporter/OTA/Service/OTAService.cs
+++ b/MasstransferExporter/OTA/Service/OTAService.cs
@@ -1,4 +1,5 @@
using System.Diagnostics;
+using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client;
@@ -13,29 +14,32 @@ public class OTAService
{
private static readonly SqliteHelper Db = SqliteHelper.GetInstance();
private static OTAUpdateFileManager _otaUpdateFileManager;
-
-
+
+ static OTAService()
+ {
+ EventBus.AddEventHandler(EventType.StartUp, async () => { await StartOTAService(); });
+ }
+
///
/// 启动OTA服务
///
public static async Task StartOTAService()
{
-
_otaUpdateFileManager = GetOTAUpdateFileManager();
-
+
//订阅云端发包
await MessageQueueHelper.Subscribe(Topics.IssuedOTAPackage, HandleIssuedOTAPackage);
-
+
//订阅Masstransfer进程 请求OTA事件
ProcessCommunicator.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent);
-
+
//订阅Masstransfer进程 启动更新事件
ProcessCommunicator.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent);
-
+
//订阅订阅Masstransfer进程 下载安装包事件
ProcessCommunicator.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent);
}
-
+
///
/// 从数据库读取文件操作地址、关键文件后缀
///
@@ -49,8 +53,8 @@ public class OTAService
{
await ProcessCommunicator.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData);
}
-
-
+
+
///
/// Masstransfer 请求OTA事件响应,向云端发送请求信息
///
@@ -69,7 +73,7 @@ public class OTAService
try
{
await ApiClient.DownloadFileAsync(url, updatePackagePath);
- if(!OTAClient.CheckMD5(checksum, updatePackagePath)) throw new Exception("MD5 check failed.");
+ if (!OTAClient.CheckMD5(checksum, updatePackagePath)) throw new Exception("MD5 check failed.");
result = true;
}
catch (Exception e)
@@ -79,27 +83,27 @@ public class OTAService
}
await ProcessCommunicator.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result);
-
}
+
//Masstransfer 通知-启动更新
public static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData)
{
var version = "";
-
+
OTAResultData otaResultData = new OTAResultData();
-
+
//启动更新
otaResultData.Result = Install() ? (byte)1 : (byte)0;
//restart masstransfer
-
+
//更新反馈
otaResultData.OtaSoftwareVersion = version;
- otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本
+ otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本
await MessageQueueHelper.Publish(Topics.OTAUpgradeFeedback, otaResultData);
}
-
+
///
/// 安装压缩包
///
@@ -115,10 +119,10 @@ public class OTAService
{
//初始化
initDir(criticalBackupDir);
-
+
//备份重要文件
OTAClient.BackupEssentialFiles(appDir, criticalBackupDir, criticalFileExtension, criticalSourceLogPath);
-
+
//备份旧版本文件
OTAClient.CompressDirectory(appDir, previousBackupPath);
@@ -126,21 +130,20 @@ public class OTAService
{
//删除旧版本原文件
OTAClient.DeleteDirectory(appDir);
-
+
//解压更新包
OTAClient.ExtractDirectory(updatePackagePath, appDir);
-
+
//恢复重要文件
OTAClient.RecoverCriticalFiles(criticalSourceLogPath);
-
+
//删除旧版本备份文件
OTAClient.DeleteFile(previousBackupPath);
-
+
//删除更新包
OTAClient.DeleteFile(updatePackagePath);
return true;
-
}
catch (Exception ex)
{
@@ -152,9 +155,9 @@ public class OTAService
catch (Exception ex)
{
Console.WriteLine($"更新异常e: {ex.Message}");
-
+
//如果重要文件备份已经产生,则删除
- if(Directory.Exists(criticalBackupDir))
+ if (Directory.Exists(criticalBackupDir))
{
OTAClient.DeleteDirectory(criticalBackupDir);
}
@@ -162,7 +165,7 @@ public class OTAService
return false;
}
}
-
+
///
/// 初始化重要文件备份目录
///
@@ -172,17 +175,18 @@ public class OTAService
{
Directory.Delete(criticalBackupDir, true);
}
+
Directory.CreateDirectory(criticalBackupDir);
-
}
///
/// 回滚
///
- private static void RollBack(string appDir, string criticalBackupDir, string updatePackagePath, string previousBackupPath)
+ private static void RollBack(string appDir, string criticalBackupDir, string updatePackagePath,
+ string previousBackupPath)
{
//删除安装过程中创建的文件、目录
- var directoryList = new string[] { appDir, criticalBackupDir};
+ var directoryList = new string[] { appDir, criticalBackupDir };
var filePath = new String[] { updatePackagePath };
foreach (var directory in directoryList)
@@ -200,12 +204,11 @@ public class OTAService
OTAClient.DeleteFile(file);
}
}
-
+
//恢复旧版本文件
OTAClient.ExtractDirectory(previousBackupPath, appDir);
-
+
//删除旧版本备份
OTAClient.DeleteFile(previousBackupPath);
-
}
}
\ No newline at end of file
diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs
index 47bdb29..82acfb4 100644
--- a/MasstransferExporter/Program.cs
+++ b/MasstransferExporter/Program.cs
@@ -1,23 +1,35 @@
-using MasstransferCommunicate.Mqtt.Client;
-using MasstransferExporter.DataExporter;
-using MasstransferExporter.OTA.Service;
+using MasstransferCommon.Events;
+using MasstransferCommon.Model.Entity;
+using MasstransferCommunicate.Mqtt.Client;
+using MasstransferCommunicate.Process.Service;
+using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model;
+namespace MasstransferExporter;
+
class Program
{
- static async Task Main()
+ private static readonly SqliteHelper Db = SqliteHelper.GetInstance();
+
+ public static async Task Main()
{
- var options = new MqttConnectOptions
+ var mqttParams = Db.Query("select * from mqtt_params").FirstOrDefault();
+
+ // 启动mqtt连接
+ await MessageQueueHelper.InitConnect(new MqttConnectOptions()
{
- ServerAddress = "cloud.haiju-tech.com",
- Port = 8884,
- EnableTls = false
- };
+ ServerAddress = mqttParams.ServerAddress,
+ Port = mqttParams.Port,
+ UserName = mqttParams.UserName,
+ Password = mqttParams.Password
+ });
- await MessageQueueHelper.InitConnect(options);
+ // 启动与主程序的通信
+ await ProcessCommunicator.Connect();
+
+ // 启动完成后,广播启动通知
+ EventBus.Publish(EventType.StartUp, true);
- CoordinateService.CoordinateExporter();
- await OTAService.StartOTAService();
Console.WriteLine("按任意键退出");
Console.ReadKey();
}
diff --git a/MasstransferExporter/RemoteControl/RemoteLockService.cs b/MasstransferExporter/RemoteControl/RemoteLockService.cs
index 6973ca6..048f3e9 100644
--- a/MasstransferExporter/RemoteControl/RemoteLockService.cs
+++ b/MasstransferExporter/RemoteControl/RemoteLockService.cs
@@ -1,5 +1,8 @@
-using MasstransferCommon.Model.Enum;
+using MasstransferCommon.Events;
+using MasstransferCommon.Model.Constant;
+using MasstransferCommon.Model.Enum;
using MasstransferCommon.Utils;
+using MasstransferCommunicate.Mqtt.Client;
using MasstransferExporter.RemoteControl.Model;
using Microsoft.Win32;
@@ -12,11 +15,30 @@ public class RemoteLockService
{
private const string KeyPath = @"Software\Masstransfer\Security";
+
+ static RemoteLockService()
+ {
+ EventBus.AddEventHandler(EventType.StartUp, async () => { await ListenRemoteLockEvent(); });
+ }
+
+ ///
+ /// 监听远程锁机事件
+ ///
+ public static async Task ListenRemoteLockEvent()
+ {
+ await MessageQueueHelper.Subscribe(Topics.RemoteControl, HandleLockCmd);
+ }
+
+
///
/// 处理接收到锁机业务指令
///
- public static void HandleLockCmd(LockCmd cmd)
+ private static void HandleLockCmd(string topic, string payload)
{
+ var cmd = JsonUtil.FromJson(payload);
+
+ if (cmd == null) return;
+
var action = cmd.Action;
if (action == 0)
diff --git a/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs b/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs
index 5bb369b..d2afbd7 100644
--- a/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs
+++ b/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs
@@ -19,7 +19,7 @@ public class ProcessCommunicator
///
public static async Task Connect()
{
- _helper = await ProcessHelper.CreateServer("Masstransfer");
+ _helper = await ProcessHelper.CreateClient("Masstransfer");
_helper.MessageReceived += HandleMessageReceived;
}