using System.IO.Pipes; namespace MasstransferCommunicate.Process.Client; public class ProcessHelper { public event Action MessageReceived; private readonly string _pipeName; private readonly bool _isServer; private PipeStream _pipeStream; private StreamReader _reader; private StreamWriter _writer; private CancellationTokenSource _cts; private ProcessHelper(string pipeName, bool isServer) { _pipeName = pipeName; _isServer = isServer; _cts = new CancellationTokenSource(); } public static async Task CreateServer(string pipeName) { var helper = new ProcessHelper(pipeName, true); await helper.ConnectAsync(); return helper; } public static async Task CreateClient(string pipeName) { var helper = new ProcessHelper(pipeName, false); await helper.ConnectAsync(); return helper; } private async Task ConnectAsync() { if (_isServer) { _pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); await ((NamedPipeServerStream)_pipeStream).WaitForConnectionAsync(_cts.Token); } else { _pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); await ((NamedPipeClientStream)_pipeStream).ConnectAsync(_cts.Token); } _reader = new StreamReader(_pipeStream); _writer = new StreamWriter(_pipeStream) { AutoFlush = true }; StartListeningAsync(); } public async Task SendMessageAsync(string message) { try { await _writer.WriteLineAsync(message); } catch (IOException) { await ReconnectAsync(); await SendMessageAsync(message); } } private void StartListeningAsync() { Task.Run(async () => { while (!_cts.Token.IsCancellationRequested) { try { var message = await _reader.ReadLineAsync(); if (message == null) throw new IOException("Pipe disconnected."); MessageReceived?.Invoke(message); } catch (IOException) { // Handle disconnection and retries await ReconnectAsync(); } } }); } private async Task ReconnectAsync() { Close(); while (!_pipeStream.IsConnected) { try { Console.WriteLine("尝试重新连接..."); await ConnectAsync(); Console.WriteLine("重新连接成功"); break; } catch (Exception) { Console.WriteLine("重新连接失败,等待 1 秒后重试..."); await Task.Delay(1000); } } } public void Close() { _cts.Cancel(); _pipeStream?.Close(); _cts = new CancellationTokenSource(); } }