完善数据采集进程逻辑

This commit is contained in:
huangxianguo 2024-09-02 21:02:04 +08:00
parent 3fa0d33bd6
commit 55ed7729e1
13 changed files with 214 additions and 53 deletions

View File

@ -0,0 +1,10 @@
namespace MasstransferCommon.Events;
/// <summary>
/// 用于声明当前方法为事件处理器
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public class EventAction(params EventType[] types) : Attribute
{
public EventType[] Types => types;
}

View File

@ -0,0 +1,65 @@
using Serilog;
namespace MasstransferCommon.Events;
/// <summary>
/// 事件总线
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class EventBus<T>
{
private static readonly Dictionary<EventType, List<Delegate>> Subscribers = new();
/// <summary>
/// 添加订阅
/// </summary>
/// <param name="type"></param>
/// <param name="action"></param>
public static void AddEventHandler(EventType type, Delegate action)
{
if (Subscribers.TryGetValue(type, out var subscribers))
{
var any = subscribers.Any(item => item.Equals(action));
if (!any) subscribers.Add(action);
}
else
{
Subscribers.Add(type, [action]);
}
}
/// <summary>
/// 移除订阅逻辑
/// </summary>
/// <param name="type">事件类型</param>
/// <param name="action">回调方法</param>
public static void RemoveEventHandler(EventType type, Delegate action)
{
if (!Subscribers.TryGetValue(type, out var handlers)) return;
handlers.Remove(action);
}
/// <summary>
/// 发布事件
/// </summary>
/// <param name="type"></param>
/// <param name="data"></param>
public static void Publish(EventType type, T data)
{
if (!Subscribers.TryGetValue(type, out var subscribers)) return;
// 创建一个副本,避免在回调中修改订阅列表导致迭代异常
var actions = subscribers.ToList();
foreach (var action in actions)
try
{
action.DynamicInvoke(type, data);
}
catch (Exception e)
{
Log.Error(e, e.Message);
}
}
}

View File

@ -0,0 +1,10 @@
namespace MasstransferCommon.Events;
/// <summary>
/// 用于声明当前类为事件监听
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class EventListener(string name = "") : Attribute
{
private string Name => name;
}

View File

@ -0,0 +1,15 @@
using System.ComponentModel;
namespace MasstransferCommon.Events;
/// <summary>
/// 通过事件发布订阅模式实现系统各组件的解耦
/// 这里定义的是事件的驱动类型
/// </summary>
public enum EventType
{
// 系统初始化事件
[Description("系统初始化事件")] SetupNotify,
[Description("系统已经启动事件")] StartUp,
}

View File

@ -4,6 +4,7 @@
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferExporter.DataExporter.Model;
@ -11,6 +12,11 @@ namespace MasstransferExporter.DataExporter;
/// </summary>
public class ConfigService
{
static ConfigService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenConfigIssuedEvent(); });
}
/// <summary>
/// 监听配置下发事件
/// </summary>

View File

@ -1,4 +1,5 @@
using System.Drawing;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Minio;
@ -18,6 +19,11 @@ public class ImageService
private const string BasePath = "masstransfer";
static ImageService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenImageQueryEvent(); });
}
/// <summary>
/// 监听图片查询事件
/// </summary>

View File

@ -1,7 +1,7 @@
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Service;
using MasstransferInfrastructure.Mqtt.Client;
namespace MasstransferExporter.License;
@ -10,6 +10,11 @@ namespace MasstransferExporter.License;
/// </summary>
public class LicenseService
{
static LicenseService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenLicenseUpdateEvent(); });
}
/// <summary>
/// 启动监听证书更新事件
/// </summary>
@ -17,7 +22,7 @@ public class LicenseService
{
await MessageQueueHelper.Subscribe(Topics.UpdateLicenseEvent, HandleUpdateLicenseEvent);
}
/// <summary>
/// 处理接收到的证书更新事件
/// </summary>

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Minio;
@ -17,6 +18,11 @@ public class LogFileExporter
private static readonly MinioHelper Minio = MinioHelper.GetInstance();
static LogFileExporter()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenQueryLogEvent(); });
}
/// <summary>
/// 监听查询日志事件
/// </summary>

View File

@ -1,4 +1,5 @@
using System.Diagnostics;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client;
@ -13,29 +14,32 @@ public class OTAService
{
private static readonly SqliteHelper Db = SqliteHelper.GetInstance();
private static OTAUpdateFileManager _otaUpdateFileManager;
static OTAService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await StartOTAService(); });
}
/// <summary>
/// 启动OTA服务
/// </summary>
public static async Task StartOTAService()
{
_otaUpdateFileManager = GetOTAUpdateFileManager();
//订阅云端发包
await MessageQueueHelper.Subscribe(Topics.IssuedOTAPackage, HandleIssuedOTAPackage);
//订阅Masstransfer进程 请求OTA事件
ProcessCommunicator.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent);
//订阅Masstransfer进程 启动更新事件
ProcessCommunicator.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent);
//订阅订阅Masstransfer进程 下载安装包事件
ProcessCommunicator.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent);
}
/// <summary>
/// 从数据库读取文件操作地址、关键文件后缀
/// </summary>
@ -49,8 +53,8 @@ public class OTAService
{
await ProcessCommunicator.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData);
}
/// <summary>
/// Masstransfer 请求OTA事件响应向云端发送请求信息
/// </summary>
@ -69,7 +73,7 @@ public class OTAService
try
{
await ApiClient.DownloadFileAsync(url, updatePackagePath);
if(!OTAClient.CheckMD5(checksum, updatePackagePath)) throw new Exception("MD5 check failed.");
if (!OTAClient.CheckMD5(checksum, updatePackagePath)) throw new Exception("MD5 check failed.");
result = true;
}
catch (Exception e)
@ -79,27 +83,27 @@ public class OTAService
}
await ProcessCommunicator.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result);
}
//Masstransfer 通知-启动更新
public static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData)
{
var version = "";
OTAResultData otaResultData = new OTAResultData();
//启动更新
otaResultData.Result = Install() ? (byte)1 : (byte)0;
//restart masstransfer
//更新反馈
otaResultData.OtaSoftwareVersion = version;
otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本
otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本
await MessageQueueHelper.Publish(Topics.OTAUpgradeFeedback, otaResultData);
}
/// <summary>
/// 安装压缩包
/// </summary>
@ -115,10 +119,10 @@ public class OTAService
{
//初始化
initDir(criticalBackupDir);
//备份重要文件
OTAClient.BackupEssentialFiles(appDir, criticalBackupDir, criticalFileExtension, criticalSourceLogPath);
//备份旧版本文件
OTAClient.CompressDirectory(appDir, previousBackupPath);
@ -126,21 +130,20 @@ public class OTAService
{
//删除旧版本原文件
OTAClient.DeleteDirectory(appDir);
//解压更新包
OTAClient.ExtractDirectory(updatePackagePath, appDir);
//恢复重要文件
OTAClient.RecoverCriticalFiles(criticalSourceLogPath);
//删除旧版本备份文件
OTAClient.DeleteFile(previousBackupPath);
//删除更新包
OTAClient.DeleteFile(updatePackagePath);
return true;
}
catch (Exception ex)
{
@ -152,9 +155,9 @@ public class OTAService
catch (Exception ex)
{
Console.WriteLine($"更新异常e: {ex.Message}");
//如果重要文件备份已经产生,则删除
if(Directory.Exists(criticalBackupDir))
if (Directory.Exists(criticalBackupDir))
{
OTAClient.DeleteDirectory(criticalBackupDir);
}
@ -162,7 +165,7 @@ public class OTAService
return false;
}
}
/// <summary>
/// 初始化重要文件备份目录
/// </summary>
@ -172,17 +175,18 @@ public class OTAService
{
Directory.Delete(criticalBackupDir, true);
}
Directory.CreateDirectory(criticalBackupDir);
}
/// <summary>
/// 回滚
/// </summary>
private static void RollBack(string appDir, string criticalBackupDir, string updatePackagePath, string previousBackupPath)
private static void RollBack(string appDir, string criticalBackupDir, string updatePackagePath,
string previousBackupPath)
{
//删除安装过程中创建的文件、目录
var directoryList = new string[] { appDir, criticalBackupDir};
var directoryList = new string[] { appDir, criticalBackupDir };
var filePath = new String[] { updatePackagePath };
foreach (var directory in directoryList)
@ -200,12 +204,11 @@ public class OTAService
OTAClient.DeleteFile(file);
}
}
//恢复旧版本文件
OTAClient.ExtractDirectory(previousBackupPath, appDir);
//删除旧版本备份
OTAClient.DeleteFile(previousBackupPath);
}
}

View File

@ -1,23 +1,35 @@
using MasstransferCommunicate.Mqtt.Client;
using MasstransferExporter.DataExporter;
using MasstransferExporter.OTA.Service;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Entity;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Service;
using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model;
namespace MasstransferExporter;
class Program
{
static async Task Main()
private static readonly SqliteHelper Db = SqliteHelper.GetInstance();
public static async Task Main()
{
var options = new MqttConnectOptions
var mqttParams = Db.Query<MqttParams>("select * from mqtt_params").FirstOrDefault();
// 启动mqtt连接
await MessageQueueHelper.InitConnect(new MqttConnectOptions()
{
ServerAddress = "cloud.haiju-tech.com",
Port = 8884,
EnableTls = false
};
ServerAddress = mqttParams.ServerAddress,
Port = mqttParams.Port,
UserName = mqttParams.UserName,
Password = mqttParams.Password
});
await MessageQueueHelper.InitConnect(options);
// 启动与主程序的通信
await ProcessCommunicator.Connect();
// 启动完成后,广播启动通知
EventBus<bool>.Publish(EventType.StartUp, true);
CoordinateService.CoordinateExporter();
await OTAService.StartOTAService();
Console.WriteLine("按任意键退出");
Console.ReadKey();
}

View File

@ -1,5 +1,8 @@
using MasstransferCommon.Model.Enum;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Model.Enum;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferExporter.RemoteControl.Model;
using Microsoft.Win32;
@ -12,11 +15,30 @@ public class RemoteLockService
{
private const string KeyPath = @"Software\Masstransfer\Security";
static RemoteLockService()
{
EventBus<bool>.AddEventHandler(EventType.StartUp, async () => { await ListenRemoteLockEvent(); });
}
/// <summary>
/// 监听远程锁机事件
/// </summary>
public static async Task ListenRemoteLockEvent()
{
await MessageQueueHelper.Subscribe(Topics.RemoteControl, HandleLockCmd);
}
/// <summary>
/// 处理接收到锁机业务指令
/// </summary>
public static void HandleLockCmd(LockCmd cmd)
private static void HandleLockCmd(string topic, string payload)
{
var cmd = JsonUtil.FromJson<LockCmd>(payload);
if (cmd == null) return;
var action = cmd.Action;
if (action == 0)

View File

@ -19,7 +19,7 @@ public class ProcessCommunicator
/// </summary>
public static async Task Connect()
{
_helper = await ProcessHelper.CreateServer("Masstransfer");
_helper = await ProcessHelper.CreateClient("Masstransfer");
_helper.MessageReceived += HandleMessageReceived;
}