修复进程间通讯异常问题

This commit is contained in:
huangxianguo 2024-09-03 16:13:49 +08:00
parent 55ed7729e1
commit 8cfae665be
7 changed files with 181 additions and 220 deletions

View File

@ -2,6 +2,11 @@
public class ProcessTopics
{
/// <summary>
/// 测试topic
/// </summary>
public const string TestEvent = "TestEvent";
/// <summary>
/// 证书更新事件
/// </summary>

View File

@ -1,7 +1,7 @@
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Service;
using MasstransferCommunicate.Process.Client;
namespace MasstransferExporter.License;
@ -26,11 +26,11 @@ public class LicenseService
/// <summary>
/// 处理接收到的证书更新事件
/// </summary>
public static async Task HandleUpdateLicenseEvent(string topic, string license)
private static void HandleUpdateLicenseEvent(string topic, string license)
{
ProcessCommunicator.Subscribe(ProcessTopics.LicenseUpdateEventFeedback, HandleUpdateLicenseEventFeedback);
ProcessHelper.Subscribe(ProcessTopics.LicenseUpdateEventFeedback, HandleUpdateLicenseEventFeedback);
await ProcessCommunicator.Send(ProcessTopics.LicenseUpdateEvent, license);
ProcessHelper.Send(ProcessTopics.LicenseUpdateEvent, license);
}
/// <summary>
@ -38,7 +38,7 @@ public class LicenseService
/// </summary>
/// <param name="topic"></param>
/// <param name="result"></param>
public static async Task HandleUpdateLicenseEventFeedback(string topic, string result)
private static async Task HandleUpdateLicenseEventFeedback(string topic, string result)
{
await MessageQueueHelper.Publish(Topics.UpdateLicenseEventFeedback, result);
}

View File

@ -1,9 +1,8 @@
using System.Diagnostics;
using MasstransferCommon.Events;
using MasstransferCommon.Events;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Service;
using MasstransferCommunicate.Process.Client;
using MasstransferExporter.OTA.Client;
using MasstransferExporter.OTA.Model;
using MasstransferInfrastructure.Database.Sqlite;
@ -23,7 +22,7 @@ public class OTAService
/// <summary>
/// 启动OTA服务
/// </summary>
public static async Task StartOTAService()
private static async Task StartOTAService()
{
_otaUpdateFileManager = GetOTAUpdateFileManager();
@ -31,13 +30,13 @@ public class OTAService
await MessageQueueHelper.Subscribe(Topics.IssuedOTAPackage, HandleIssuedOTAPackage);
//订阅Masstransfer进程 请求OTA事件
ProcessCommunicator.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent);
ProcessHelper.Subscribe(ProcessTopics.OTAQueryEvent, HandleOTAQueryEvent);
//订阅Masstransfer进程 启动更新事件
ProcessCommunicator.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent);
ProcessHelper.Subscribe(ProcessTopics.OTAUpdateEvent, HandleOTAUpdateEvent);
//订阅订阅Masstransfer进程 下载安装包事件
ProcessCommunicator.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent);
ProcessHelper.Subscribe(ProcessTopics.DownloadUpdatePackageEvent, HandleDownloadUpdatePackageEvent);
}
/// <summary>
@ -49,9 +48,9 @@ public class OTAService
return Db.Query<OTAUpdateFileManager>("").FirstOrDefault();
}
public static async Task HandleIssuedOTAPackage(string topic, OTAUpdateData otaUpdateData)
private static void HandleIssuedOTAPackage(string topic, OTAUpdateData otaUpdateData)
{
await ProcessCommunicator.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData);
ProcessHelper.Send(ProcessTopics.OTAQueryEventFeedback, otaUpdateData);
}
@ -82,24 +81,23 @@ public class OTAService
throw;
}
await ProcessCommunicator.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result);
ProcessHelper.Send(ProcessTopics.DownloadUpdatePackageEventFeedback, result);
}
//Masstransfer 通知-启动更新
public static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData)
private static async Task HandleOTAUpdateEvent(string topic, OTAUpdateData otaUpdateData)
{
var version = "";
OTAResultData otaResultData = new OTAResultData();
//启动更新
otaResultData.Result = Install() ? (byte)1 : (byte)0;
//restart masstransfer
//更新反馈
otaResultData.OtaSoftwareVersion = version;
otaResultData.CurrentSoftwareVersion = version; //**需要修改为当前版本
var otaResultData = new OTAResultData
{
//启动更新
Result = Install() ? (byte)1 : (byte)0,
//restart masstransfer
//更新反馈
OtaSoftwareVersion = version,
CurrentSoftwareVersion = version //**需要修改为当前版本
};
await MessageQueueHelper.Publish(Topics.OTAUpgradeFeedback, otaResultData);
}
@ -107,7 +105,7 @@ public class OTAService
/// <summary>
/// 安装压缩包
/// </summary>
public static bool Install()
private static bool Install()
{
var appDir = _otaUpdateFileManager.AppDir;
var criticalBackupDir = _otaUpdateFileManager.CriticalBackupDir;

View File

@ -1,9 +1,6 @@
using MasstransferCommon.Events;
using MasstransferCommon.Model.Entity;
using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Process.Service;
using MasstransferCommon.Model.Constant;
using MasstransferCommunicate.Process.Client;
using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model;
namespace MasstransferExporter;
@ -13,24 +10,31 @@ class Program
public static async Task Main()
{
var mqttParams = Db.Query<MqttParams>("select * from mqtt_params").FirstOrDefault();
// var mqttParams = Db.Query<MqttParams>("select * from mqtt_params").FirstOrDefault();
//
// // 启动mqtt连接
// await MessageQueueHelper.InitConnect(new MqttConnectOptions()
// {
// ServerAddress = mqttParams.ServerAddress,
// Port = mqttParams.Port,
// UserName = mqttParams.UserName,
// Password = mqttParams.Password
// });
//
// // 启动与主程序的通信
// await ProcessCommunicator.Connect();
//
// // 启动完成后,广播启动通知
// EventBus<bool>.Publish(EventType.StartUp, true);rewrwe
// 启动mqtt连接
await MessageQueueHelper.InitConnect(new MqttConnectOptions()
ProcessHelper.Init();
while (Console.ReadLine() is { } input)
{
ServerAddress = mqttParams.ServerAddress,
Port = mqttParams.Port,
UserName = mqttParams.UserName,
Password = mqttParams.Password
});
ProcessHelper.Send(ProcessTopics.TestEvent, input);
}
// 启动与主程序的通信
await ProcessCommunicator.Connect();
// 启动完成后,广播启动通知
EventBus<bool>.Publish(EventType.StartUp, true);
Console.WriteLine("按任意键退出");
Console.ReadKey();
// Console.WriteLine("按任意键退出");
// Console.ReadKey();
}
}

View File

@ -0,0 +1,69 @@
using System.IO.Pipes;
namespace MasstransferCommunicate.Process.Client;
internal class PipeClient(string pipeName)
{
private NamedPipeClientStream? _pipeClient;
public event Action<string> MessageReceived;
/// <summary>
/// 启动连接
/// </summary>
public void Start()
{
Task.Run(ConnectToServer);
}
/// <summary>
/// 等待连接并监听信息
/// </summary>
private void ConnectToServer()
{
while (true)
{
try
{
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
Console.WriteLine("正在连接服务器");
_pipeClient.Connect(5000);
Console.WriteLine("已连接到服务器");
using var reader = new StreamReader(_pipeClient);
while (_pipeClient.IsConnected)
{
var message = reader.ReadLine();
if (message != null)
{
Console.WriteLine($"Received message: {message}");
MessageReceived?.Invoke(message);
}
}
}
catch (Exception e)
{
Console.WriteLine($"Error: {e.Message}");
}
finally
{
_pipeClient?.Dispose();
}
Thread.Sleep(1000); // Retry connection after delay
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="message"></param>
public void SendMessage(string message)
{
if (_pipeClient is { IsConnected: true })
{
var writer = new StreamWriter(_pipeClient) { AutoFlush = true };
writer.WriteLine(message);
}
}
}

View File

@ -1,116 +1,87 @@
using System.IO.Pipes;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Process.Model;
using Serilog;
namespace MasstransferCommunicate.Process.Client;
/// <summary>
/// 进程通讯工具类
/// </summary>
public class ProcessHelper
{
public event Action<string> MessageReceived;
private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
private readonly string _pipeName;
private readonly bool _isServer;
private PipeStream _pipeStream;
private StreamReader _reader;
private StreamWriter _writer;
private CancellationTokenSource _cts;
private static PipeClient? _pipe;
private ProcessHelper(string pipeName, bool isServer)
/// <summary>
/// 初始化通讯管道
/// </summary>
public static void Init()
{
_pipeName = pipeName;
_isServer = isServer;
_cts = new CancellationTokenSource();
if (_pipe != null) return;
_pipe = new PipeClient("MasstransferPipe");
_pipe.Start();
}
public static async Task<ProcessHelper> CreateServer(string pipeName)
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="topic"></param>
/// <param name="delegate"></param>
public static void Subscribe(string topic, Delegate @delegate)
{
var helper = new ProcessHelper(pipeName, true);
await helper.ConnectAsync();
return helper;
}
public static async Task<ProcessHelper> CreateClient(string pipeName)
{
var helper = new ProcessHelper(pipeName, false);
await helper.ConnectAsync();
return helper;
}
private async Task ConnectAsync()
{
if (_isServer)
if (!Subscribers.ContainsKey(topic))
{
_pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
await ((NamedPipeServerStream)_pipeStream).WaitForConnectionAsync(_cts.Token);
}
else
{
_pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
await ((NamedPipeClientStream)_pipeStream).ConnectAsync(_cts.Token);
Subscribers.Add(topic, []);
}
_reader = new StreamReader(_pipeStream);
_writer = new StreamWriter(_pipeStream) { AutoFlush = true };
StartListeningAsync();
Subscribers[topic].Add(@delegate);
}
public async Task SendMessageAsync(string message)
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <typeparam name="T"></typeparam>
public static void Send<T>(string topic, T message)
{
try
{
await _writer.WriteLineAsync(message);
}
catch (IOException)
{
await ReconnectAsync();
await SendMessageAsync(message);
}
var payload = new Message<T>(topic, message);
_pipe?.SendMessage(JsonUtil.ToJson(payload));
}
private void StartListeningAsync()
/// <summary>
/// 处理接收到的消息
/// </summary>
/// <returns></returns>
private static void HandleMessageReceived(string? message)
{
Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
var message = await _reader.ReadLineAsync();
if (message == null) throw new IOException("Pipe disconnected.");
MessageReceived?.Invoke(message);
}
catch (IOException)
{
// Handle disconnection and retries
await ReconnectAsync();
}
}
});
}
if (message == null) return;
private async Task ReconnectAsync()
{
Close();
while (!_pipeStream.IsConnected)
var dictionary = JsonUtil.ToDictionary(message);
if (dictionary == null) return;
var topic = dictionary["Topic"] as string;
var data = dictionary["Data"];
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers)
{
try
{
Console.WriteLine("尝试重新连接...");
await ConnectAsync();
Console.WriteLine("重新连接成功");
break;
var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue;
var type = parameters[1].ParameterType;
// 通知订阅者
subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message));
}
catch (Exception)
catch (Exception exception)
{
Console.WriteLine("重新连接失败,等待 1 秒后重试...");
await Task.Delay(1000);
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
}
}
}
public void Close()
{
_cts.Cancel();
_pipeStream?.Close();
_cts = new CancellationTokenSource();
}
}

View File

@ -1,86 +0,0 @@
using MasstransferCommon.Utils;
using MasstransferCommunicate.Process.Client;
using MasstransferCommunicate.Process.Model;
using Serilog;
namespace MasstransferCommunicate.Process.Service;
/// <summary>
/// 进程间通讯器
/// </summary>
public class ProcessCommunicator
{
private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
private static ProcessHelper? _helper;
/// <summary>
/// 启动连接
/// </summary>
public static async Task Connect()
{
_helper = await ProcessHelper.CreateClient("Masstransfer");
_helper.MessageReceived += HandleMessageReceived;
}
/// <summary>
/// 订阅某个主题
/// </summary>
/// <param name="topic"></param>
/// <param name="delegate"></param>
public static void Subscribe(string topic, Delegate @delegate)
{
if (!Subscribers.ContainsKey(topic))
{
Subscribers.Add(topic, []);
}
Subscribers[topic].Add(@delegate);
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
public static async Task<bool> Send<T>(string topic, T message)
{
await _helper?.SendMessageAsync(JsonUtil.ToJson(new Message<T>(topic, message)))!;
return true;
}
/// <summary>
/// 处理接收到的消息
/// </summary>
/// <returns></returns>
private static void HandleMessageReceived(string? message)
{
if (message == null) return;
var dictionary = JsonUtil.ToDictionary(message);
if (dictionary == null) return;
var topic = dictionary["Topic"] as string;
var data = dictionary["Data"];
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers)
{
try
{
var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue;
var type = parameters[1].ParameterType;
// 通知订阅者
subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message));
}
catch (Exception exception)
{
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
}
}
}
}