MasstransferExporter/MasstransferInfrastructure/Process/Client/ProcessHelper.cs

116 lines
3.2 KiB
C#
Raw Normal View History

2024-06-19 07:53:34 +00:00
using System.IO.Pipes;
namespace MasstransferCommunicate.Process.Client;
public class ProcessHelper
{
public event Action<string> MessageReceived;
private readonly string _pipeName;
private readonly bool _isServer;
private PipeStream _pipeStream;
private StreamReader _reader;
private StreamWriter _writer;
private CancellationTokenSource _cts;
private ProcessHelper(string pipeName, bool isServer)
{
_pipeName = pipeName;
_isServer = isServer;
_cts = new CancellationTokenSource();
}
public static async Task<ProcessHelper> CreateServer(string pipeName)
{
var helper = new ProcessHelper(pipeName, true);
await helper.ConnectAsync();
return helper;
}
public static async Task<ProcessHelper> CreateClient(string pipeName)
{
var helper = new ProcessHelper(pipeName, false);
await helper.ConnectAsync();
return helper;
}
private async Task ConnectAsync()
{
if (_isServer)
{
_pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
await ((NamedPipeServerStream)_pipeStream).WaitForConnectionAsync(_cts.Token);
}
else
{
_pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
await ((NamedPipeClientStream)_pipeStream).ConnectAsync(_cts.Token);
}
_reader = new StreamReader(_pipeStream);
_writer = new StreamWriter(_pipeStream) { AutoFlush = true };
StartListeningAsync();
}
public async Task SendMessageAsync(string message)
{
try
{
await _writer.WriteLineAsync(message);
}
catch (IOException)
{
await ReconnectAsync();
await SendMessageAsync(message);
}
}
private void StartListeningAsync()
{
Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
var message = await _reader.ReadLineAsync();
if (message == null) throw new IOException("Pipe disconnected.");
MessageReceived?.Invoke(message);
}
catch (IOException)
{
// Handle disconnection and retries
await ReconnectAsync();
}
}
});
}
private async Task ReconnectAsync()
{
Close();
while (!_pipeStream.IsConnected)
{
try
{
Console.WriteLine("尝试重新连接...");
await ConnectAsync();
Console.WriteLine("重新连接成功");
break;
}
catch (Exception)
{
Console.WriteLine("重新连接失败,等待 1 秒后重试...");
await Task.Delay(1000);
}
}
}
public void Close()
{
_cts.Cancel();
_pipeStream?.Close();
_cts = new CancellationTokenSource();
}
}