diff --git a/MasstransferCommon/Model/Constant/ProcessTopics.cs b/MasstransferCommon/Model/Constant/ProcessTopics.cs index e6708f6..5769b9f 100644 --- a/MasstransferCommon/Model/Constant/ProcessTopics.cs +++ b/MasstransferCommon/Model/Constant/ProcessTopics.cs @@ -2,6 +2,11 @@ public class ProcessTopics { + /// + /// 测试topic + /// + public const string TestEvent = "TestEvent"; + /// /// 证书更新事件 /// diff --git a/MasstransferExporter/License/LicenseService.cs b/MasstransferExporter/License/LicenseService.cs index 04b57f1..9d5ccf6 100644 --- a/MasstransferExporter/License/LicenseService.cs +++ b/MasstransferExporter/License/LicenseService.cs @@ -1,7 +1,7 @@ using MasstransferCommon.Events; using MasstransferCommon.Model.Constant; using MasstransferCommunicate.Mqtt.Client; -using MasstransferCommunicate.Process.Service; +using MasstransferCommunicate.Process.Client; namespace MasstransferExporter.License; @@ -26,11 +26,11 @@ public class LicenseService /// /// 处理接收到的证书更新事件 /// - public static async Task HandleUpdateLicenseEvent(string topic, string license) + private static void HandleUpdateLicenseEvent(string topic, string license) { - ProcessCommunicator.Subscribe(ProcessTopics.LicenseUpdateEventFeedback, HandleUpdateLicenseEventFeedback); + ProcessHelper.Subscribe(ProcessTopics.LicenseUpdateEventFeedback, HandleUpdateLicenseEventFeedback); - await ProcessCommunicator.Send(ProcessTopics.LicenseUpdateEvent, license); + ProcessHelper.Send(ProcessTopics.LicenseUpdateEvent, license); } /// @@ -38,7 +38,7 @@ public class LicenseService /// /// /// - public static async Task HandleUpdateLicenseEventFeedback(string topic, string result) + private static async Task HandleUpdateLicenseEventFeedback(string topic, string result) { await MessageQueueHelper.Publish(Topics.UpdateLicenseEventFeedback, result); } diff --git a/MasstransferExporter/OTA/Service/OTAService.cs b/MasstransferExporter/OTA/Service/OTAService.cs index a2df4c3..1deb1b9 100644 --- a/MasstransferExporter/OTA/Service/OTAService.cs +++ b/MasstransferExporter/OTA/Service/OTAService.cs @@ -1,9 +1,8 @@ -using System.Diagnostics; -using MasstransferCommon.Events; +using MasstransferCommon.Events; using MasstransferCommon.Model.Constant; using MasstransferCommon.Utils; using MasstransferCommunicate.Mqtt.Client; -using MasstransferCommunicate.Process.Service; +using MasstransferCommunicate.Process.Client; using MasstransferExporter.OTA.Client; using MasstransferExporter.OTA.Model; using MasstransferInfrastructure.Database.Sqlite; @@ -23,7 +22,7 @@ public class OTAService /// /// 启动OTA服务 /// - public static async Task StartOTAService() + private static async Task StartOTAService() { _otaUpdateFileManager = GetOTAUpdateFileManager(); @@ -31,13 +30,13 @@ public class OTAService await MessageQueueHelper.Subscribe(Topics.IssuedOTAPackage, HandleIssuedOTAPackage); //订阅Masstransfer进程 请求OTA事件 - ProcessCommunicator.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent); + ProcessHelper.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent); //订阅Masstransfer进程 启动更新事件 - ProcessCommunicator.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent); + ProcessHelper.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent); //订阅订阅Masstransfer进程 下载安装包事件 - ProcessCommunicator.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent); + ProcessHelper.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent); } /// @@ -49,9 +48,9 @@ public class OTAService return Db.Query("").FirstOrDefault(); } - public static async Task HandleIssuedOTAPackage(string topic, OTAUpdateData otaUpdateData) + private static void HandleIssuedOTAPackage(string topic, OTAUpdateData otaUpdateData) { - await ProcessCommunicator.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData); + ProcessHelper.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData); } @@ -82,24 +81,23 @@ public class OTAService throw; } - await ProcessCommunicator.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result); + ProcessHelper.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result); } //Masstransfer 通知-启动更新 - public static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData) + private static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData) { var version = ""; - OTAResultData otaResultData = new OTAResultData(); - - //启动更新 - otaResultData.Result = Install() ? (byte)1 : (byte)0; - - //restart masstransfer - - //更新反馈 - otaResultData.OtaSoftwareVersion = version; - otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本 + var otaResultData = new OTAResultData + { + //启动更新 + Result = Install() ? (byte)1 : (byte)0, + //restart masstransfer + //更新反馈 + OtaSoftwareVersion = version, + CurrentSoftwareVersion = version //**需要修改为当前版本 + }; await MessageQueueHelper.Publish(Topics.OTAUpgradeFeedback, otaResultData); } @@ -107,7 +105,7 @@ public class OTAService /// /// 安装压缩包 /// - public static bool Install() + private static bool Install() { var appDir = _otaUpdateFileManager.AppDir; var criticalBackupDir = _otaUpdateFileManager.CriticalBackupDir; diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs index 82acfb4..718d141 100644 --- a/MasstransferExporter/Program.cs +++ b/MasstransferExporter/Program.cs @@ -1,9 +1,6 @@ -using MasstransferCommon.Events; -using MasstransferCommon.Model.Entity; -using MasstransferCommunicate.Mqtt.Client; -using MasstransferCommunicate.Process.Service; +using MasstransferCommon.Model.Constant; +using MasstransferCommunicate.Process.Client; using MasstransferInfrastructure.Database.Sqlite; -using MasstransferInfrastructure.Mqtt.Model; namespace MasstransferExporter; @@ -13,24 +10,31 @@ class Program public static async Task Main() { - var mqttParams = Db.Query("select * from mqtt_params").FirstOrDefault(); + // var mqttParams = Db.Query("select * from mqtt_params").FirstOrDefault(); + // + // // 启动mqtt连接 + // await MessageQueueHelper.InitConnect(new MqttConnectOptions() + // { + // ServerAddress = mqttParams.ServerAddress, + // Port = mqttParams.Port, + // UserName = mqttParams.UserName, + // Password = mqttParams.Password + // }); + // + // // 启动与主程序的通信 + // await ProcessCommunicator.Connect(); + // + // // 启动完成后,广播启动通知 + // EventBus.Publish(EventType.StartUp, true);rewrwe - // 启动mqtt连接 - await MessageQueueHelper.InitConnect(new MqttConnectOptions() + ProcessHelper.Init(); + + while (Console.ReadLine() is { } input) { - ServerAddress = mqttParams.ServerAddress, - Port = mqttParams.Port, - UserName = mqttParams.UserName, - Password = mqttParams.Password - }); + ProcessHelper.Send(ProcessTopics.TestEvent, input); + } - // 启动与主程序的通信 - await ProcessCommunicator.Connect(); - - // 启动完成后,广播启动通知 - EventBus.Publish(EventType.StartUp, true); - - Console.WriteLine("按任意键退出"); - Console.ReadKey(); + // Console.WriteLine("按任意键退出"); + // Console.ReadKey(); } } \ No newline at end of file diff --git a/MasstransferInfrastructure/Process/Client/PipeClient.cs b/MasstransferInfrastructure/Process/Client/PipeClient.cs new file mode 100644 index 0000000..bafaf25 --- /dev/null +++ b/MasstransferInfrastructure/Process/Client/PipeClient.cs @@ -0,0 +1,69 @@ +using System.IO.Pipes; + +namespace MasstransferCommunicate.Process.Client; + +internal class PipeClient(string pipeName) +{ + private NamedPipeClientStream? _pipeClient; + + public event Action MessageReceived; + + /// + /// 启动连接 + /// + public void Start() + { + Task.Run(ConnectToServer); + } + + /// + /// 等待连接并监听信息 + /// + private void ConnectToServer() + { + while (true) + { + try + { + _pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); + Console.WriteLine("正在连接服务器"); + _pipeClient.Connect(5000); + + Console.WriteLine("已连接到服务器"); + using var reader = new StreamReader(_pipeClient); + while (_pipeClient.IsConnected) + { + var message = reader.ReadLine(); + if (message != null) + { + Console.WriteLine($"Received message: {message}"); + MessageReceived?.Invoke(message); + } + } + } + catch (Exception e) + { + Console.WriteLine($"Error: {e.Message}"); + } + finally + { + _pipeClient?.Dispose(); + } + + Thread.Sleep(1000); // Retry connection after delay + } + } + + /// + /// 发送消息 + /// + /// + public void SendMessage(string message) + { + if (_pipeClient is { IsConnected: true }) + { + var writer = new StreamWriter(_pipeClient) { AutoFlush = true }; + writer.WriteLine(message); + } + } +} \ No newline at end of file diff --git a/MasstransferInfrastructure/Process/Client/ProcessHelper.cs b/MasstransferInfrastructure/Process/Client/ProcessHelper.cs index d6015b2..c3eba24 100644 --- a/MasstransferInfrastructure/Process/Client/ProcessHelper.cs +++ b/MasstransferInfrastructure/Process/Client/ProcessHelper.cs @@ -1,116 +1,87 @@ -using System.IO.Pipes; +using MasstransferCommon.Utils; +using MasstransferCommunicate.Process.Model; +using Serilog; namespace MasstransferCommunicate.Process.Client; +/// +/// 进程通讯工具类 +/// public class ProcessHelper { - public event Action MessageReceived; + private static readonly Dictionary> Subscribers = new(); - private readonly string _pipeName; - private readonly bool _isServer; - private PipeStream _pipeStream; - private StreamReader _reader; - private StreamWriter _writer; - private CancellationTokenSource _cts; + private static PipeClient? _pipe; - private ProcessHelper(string pipeName, bool isServer) + + /// + /// 初始化通讯管道 + /// + public static void Init() { - _pipeName = pipeName; - _isServer = isServer; - _cts = new CancellationTokenSource(); + if (_pipe != null) return; + _pipe = new PipeClient("MasstransferPipe"); + _pipe.Start(); } - public static async Task CreateServer(string pipeName) + /// + /// 订阅消息 + /// + /// + /// + public static void Subscribe(string topic, Delegate @delegate) { - var helper = new ProcessHelper(pipeName, true); - await helper.ConnectAsync(); - return helper; - } - - public static async Task CreateClient(string pipeName) - { - var helper = new ProcessHelper(pipeName, false); - await helper.ConnectAsync(); - return helper; - } - - private async Task ConnectAsync() - { - if (_isServer) + if (!Subscribers.ContainsKey(topic)) { - _pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, - PipeOptions.Asynchronous); - await ((NamedPipeServerStream)_pipeStream).WaitForConnectionAsync(_cts.Token); - } - else - { - _pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); - await ((NamedPipeClientStream)_pipeStream).ConnectAsync(_cts.Token); + Subscribers.Add(topic, []); } - _reader = new StreamReader(_pipeStream); - _writer = new StreamWriter(_pipeStream) { AutoFlush = true }; - StartListeningAsync(); + Subscribers[topic].Add(@delegate); } - public async Task SendMessageAsync(string message) + /// + /// 发送消息 + /// + /// + /// + /// + public static void Send(string topic, T message) { - try - { - await _writer.WriteLineAsync(message); - } - catch (IOException) - { - await ReconnectAsync(); - await SendMessageAsync(message); - } + var payload = new Message(topic, message); + _pipe?.SendMessage(JsonUtil.ToJson(payload)); } - private void StartListeningAsync() + /// + /// 处理接收到的消息 + /// + /// + private static void HandleMessageReceived(string? message) { - Task.Run(async () => - { - while (!_cts.Token.IsCancellationRequested) - { - try - { - var message = await _reader.ReadLineAsync(); - if (message == null) throw new IOException("Pipe disconnected."); - MessageReceived?.Invoke(message); - } - catch (IOException) - { - // Handle disconnection and retries - await ReconnectAsync(); - } - } - }); - } + if (message == null) return; - private async Task ReconnectAsync() - { - Close(); - while (!_pipeStream.IsConnected) + var dictionary = JsonUtil.ToDictionary(message); + if (dictionary == null) return; + + var topic = dictionary["Topic"] as string; + var data = dictionary["Data"]; + + if (!Subscribers.TryGetValue(topic, out var subscribers)) return; + + foreach (var subscriber in subscribers) { try { - Console.WriteLine("尝试重新连接..."); - await ConnectAsync(); - Console.WriteLine("重新连接成功"); - break; + var methodInfo = subscriber.Method; + var parameters = methodInfo.GetParameters(); + if (parameters.Length != 2) continue; + var type = parameters[1].ParameterType; + // 通知订阅者 + subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message)); } - catch (Exception) + catch (Exception exception) { - Console.WriteLine("重新连接失败,等待 1 秒后重试..."); - await Task.Delay(1000); + Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic); } } } - - public void Close() - { - _cts.Cancel(); - _pipeStream?.Close(); - _cts = new CancellationTokenSource(); - } } \ No newline at end of file diff --git a/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs b/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs deleted file mode 100644 index d2afbd7..0000000 --- a/MasstransferInfrastructure/Process/Service/ProcessCommunicator.cs +++ /dev/null @@ -1,86 +0,0 @@ -using MasstransferCommon.Utils; -using MasstransferCommunicate.Process.Client; -using MasstransferCommunicate.Process.Model; -using Serilog; - -namespace MasstransferCommunicate.Process.Service; - -/// -/// 进程间通讯器 -/// -public class ProcessCommunicator -{ - private static readonly Dictionary> Subscribers = new(); - - private static ProcessHelper? _helper; - - /// - /// 启动连接 - /// - public static async Task Connect() - { - _helper = await ProcessHelper.CreateClient("Masstransfer"); - _helper.MessageReceived += HandleMessageReceived; - } - - - /// - /// 订阅某个主题 - /// - /// - /// - public static void Subscribe(string topic, Delegate @delegate) - { - if (!Subscribers.ContainsKey(topic)) - { - Subscribers.Add(topic, []); - } - - Subscribers[topic].Add(@delegate); - } - - /// - /// 发送消息 - /// - /// - /// - public static async Task Send(string topic, T message) - { - await _helper?.SendMessageAsync(JsonUtil.ToJson(new Message(topic, message)))!; - return true; - } - - /// - /// 处理接收到的消息 - /// - /// - private static void HandleMessageReceived(string? message) - { - if (message == null) return; - - var dictionary = JsonUtil.ToDictionary(message); - if (dictionary == null) return; - - var topic = dictionary["Topic"] as string; - var data = dictionary["Data"]; - - if (!Subscribers.TryGetValue(topic, out var subscribers)) return; - - foreach (var subscriber in subscribers) - { - try - { - var methodInfo = subscriber.Method; - var parameters = methodInfo.GetParameters(); - if (parameters.Length != 2) continue; - var type = parameters[1].ParameterType; - // 通知订阅者 - subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message)); - } - catch (Exception exception) - { - Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic); - } - } - } -} \ No newline at end of file