添加进程间通讯工具类封装

This commit is contained in:
huangxianguo 2024-06-19 15:53:34 +08:00
parent e7ad6629f0
commit e820a3af53
11 changed files with 358 additions and 9 deletions

6
.gitignore vendored
View File

@ -1,5 +1,5 @@
.idea .idea/
**/bin **/bin/
**/obj **/obj/
global.json global.json

View File

@ -17,4 +17,22 @@
<Folder Include="Model\" /> <Folder Include="Model\" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Content Include="bin\Debug\net7.0\MasstransferCommon.deps.json" />
<Content Include="bin\Debug\net7.0\MasstransferCommon.dll" />
<Content Include="bin\Debug\net7.0\MasstransferCommon.pdb" />
</ItemGroup>
<ItemGroup>
<Compile Remove="bin\**" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Remove="bin\**" />
</ItemGroup>
<ItemGroup>
<None Remove="bin\**" />
</ItemGroup>
</Project> </Project>

View File

@ -0,0 +1,22 @@
namespace MasstransferCommon.Scheduler;
/// <summary>
/// 延时定时任务
/// </summary>
public class DelayScheduler
{
private Timer _timer;
private Action _action;
public void Schedule(Action action, TimeSpan delay)
{
_action = action;
_timer = new Timer(TimerCallback, null, delay, Timeout.InfiniteTimeSpan);
}
private void TimerCallback(object? state)
{
_timer?.Dispose();
_action?.Invoke();
}
}

View File

@ -16,6 +16,18 @@ public class JsonUtil
} }
} }
public static Dictionary<string, object>? ToDictionary(string json)
{
try
{
return JsonConvert.DeserializeObject<Dictionary<string, object>>(json);
}
catch (Exception e)
{
throw new ArgumentException($" 无效的json 对象 {json} ");
}
}
public static T FromJson<T>(string json) public static T FromJson<T>(string json)
{ {
try try

View File

@ -0,0 +1,75 @@
using System.Net.Http.Json;
namespace MasstransferCommon.Utils;
public class ApiClient : IDisposable
{
private static readonly HttpClient Client;
static ApiClient()
{
Client = new HttpClient();
Client.DefaultRequestHeaders.Accept.Clear();
Client.DefaultRequestHeaders.Add("Accept", "application/json");
Client.DefaultRequestHeaders.Add("Content-Type", "application/json");
}
/// <summary>
/// 异步Get请求
/// </summary>
/// <param name="url"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T?> GetAsync<T>(string url)
{
using var response = await Client.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<T>();
}
/// <summary>
/// 异步 Post请求
/// </summary>
/// <param name="url"></param>
/// <param name="data"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T?> PostAsync<T>(string url, object? data)
{
using var response = await Client.PostAsJsonAsync(url, data);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<T>();
}
/// <summary>
/// 异步 Put请求
/// </summary>
/// <param name="url"></param>
/// <param name="data"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T?> PutAsync<T>(string url, object? data)
{
using var response = await Client.PutAsJsonAsync(url, data);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<T>();
}
/// <summary>
/// 异步 Delete请求
/// </summary>
/// <param name="url"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T?> DeleteAsync<T>(string url)
{
using var response = await Client.DeleteAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<T>();
}
public void Dispose()
{
Client.Dispose();
}
}

View File

@ -8,7 +8,6 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Folder Include="Http\" />
<Folder Include="Mqtt\Service\" /> <Folder Include="Mqtt\Service\" />
</ItemGroup> </ItemGroup>

View File

@ -0,0 +1,116 @@
using System.IO.Pipes;
namespace MasstransferCommunicate.Process.Client;
public class ProcessHelper
{
public event Action<string> MessageReceived;
private readonly string _pipeName;
private readonly bool _isServer;
private PipeStream _pipeStream;
private StreamReader _reader;
private StreamWriter _writer;
private CancellationTokenSource _cts;
private ProcessHelper(string pipeName, bool isServer)
{
_pipeName = pipeName;
_isServer = isServer;
_cts = new CancellationTokenSource();
}
public static async Task<ProcessHelper> CreateServer(string pipeName)
{
var helper = new ProcessHelper(pipeName, true);
await helper.ConnectAsync();
return helper;
}
public static async Task<ProcessHelper> CreateClient(string pipeName)
{
var helper = new ProcessHelper(pipeName, false);
await helper.ConnectAsync();
return helper;
}
private async Task ConnectAsync()
{
if (_isServer)
{
_pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
await ((NamedPipeServerStream)_pipeStream).WaitForConnectionAsync(_cts.Token);
}
else
{
_pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
await ((NamedPipeClientStream)_pipeStream).ConnectAsync(_cts.Token);
}
_reader = new StreamReader(_pipeStream);
_writer = new StreamWriter(_pipeStream) { AutoFlush = true };
StartListeningAsync();
}
public async Task SendMessageAsync(string message)
{
try
{
await _writer.WriteLineAsync(message);
}
catch (IOException)
{
await ReconnectAsync();
await SendMessageAsync(message);
}
}
private void StartListeningAsync()
{
Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
var message = await _reader.ReadLineAsync();
if (message == null) throw new IOException("Pipe disconnected.");
MessageReceived?.Invoke(message);
}
catch (IOException)
{
// Handle disconnection and retries
await ReconnectAsync();
}
}
});
}
private async Task ReconnectAsync()
{
Close();
while (!_pipeStream.IsConnected)
{
try
{
Console.WriteLine("尝试重新连接...");
await ConnectAsync();
Console.WriteLine("重新连接成功");
break;
}
catch (Exception)
{
Console.WriteLine("重新连接失败,等待 1 秒后重试...");
await Task.Delay(1000);
}
}
}
public void Close()
{
_cts.Cancel();
_pipeStream?.Close();
_cts = new CancellationTokenSource();
}
}

View File

@ -0,0 +1,14 @@
namespace MasstransferCommunicate.Process.Model;
public class Message<T>
{
public string Topic { get; set; }
public T Data { get; set; }
public Message(string topic, T data)
{
Topic = topic;
Data = data;
}
}

View File

@ -0,0 +1,86 @@
using MasstransferCommon.Utils;
using MasstransferCommunicate.Process.Client;
using MasstransferCommunicate.Process.Model;
using Serilog;
namespace MasstransferCommunicate.Process.Service;
/// <summary>
/// 进程间通讯器
/// </summary>
public class ProcessCommunicator
{
private static readonly Dictionary<string, List<Action<string>>> Subscribers = new();
private static ProcessHelper? _helper;
/// <summary>
/// 连接到服务端
/// </summary>
public static async Task Connect()
{
_helper = await ProcessHelper.CreateClient("Masstransfer");
_helper.MessageReceived += HandleMessageReceived;
}
/// <summary>
/// 订阅某个主题
/// </summary>
/// <param name="topic"></param>
/// <param name="delegate"></param>
public static Task<bool> Subscribe<T>(string topic, Action<string> @delegate)
{
if (!Subscribers.ContainsKey(topic))
{
Subscribers.Add(topic, []);
}
Subscribers[topic].Add(@delegate);
return Task.FromResult(true);
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
public static async Task<bool> Send<T>(string topic, T message)
{
await _helper?.SendMessageAsync(JsonUtil.ToJson(new Message<T>(topic, message)))!;
return true;
}
/// <summary>
/// 处理接收到的消息
/// </summary>
/// <returns></returns>
private static void HandleMessageReceived(string? message)
{
if (message == null) return;
Console.WriteLine($"收到来自服务端的消息: {message}");
var dictionary = JsonUtil.ToDictionary(message);
if (dictionary == null) return;
var topic = dictionary["Topic"] as string;
var data = dictionary["Data"];
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers)
{
try
{
// 通知订阅者
subscriber(JsonUtil.ToJson(data));
}
catch (Exception exception)
{
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
}
}
}
}

View File

@ -16,6 +16,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\MasstransferCommon\MasstransferCommon.csproj" /> <ProjectReference Include="..\MasstransferCommon\MasstransferCommon.csproj" />
<ProjectReference Include="..\MasstransferCommunicate\MasstransferCommunicate.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,13 +1,19 @@
using MasstransferCommon.Scheduler; using MasstransferCommunicate.Process.Client;
using MasstransferExporter.Stat; using MasstransferCommunicate.Process.Service;
class Program class Program
{ {
static void Main() static async Task Main()
{ {
JobScheduler.AddTask("Test", SystemStatExporter.Collect, 3000); await ProcessCommunicator.Connect();
Console.WriteLine("输入要发送的消息 (输入 'exit' 退出):");
Console.ReadLine(); while (true)
{
string messageToSend = Console.ReadLine();
if (messageToSend == "exit") break;
await ProcessCommunicator.Send("ClientMessage", messageToSend);
}
} }
} }