diff --git a/.gitignore b/.gitignore index 3acf3f3..dbe9499 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ -.idea -**/bin -**/obj +.idea/ +**/bin/ +**/obj/ global.json diff --git a/MasstransferCommon/MasstransferCommon.csproj b/MasstransferCommon/MasstransferCommon.csproj index 59a383e..e9a73fb 100644 --- a/MasstransferCommon/MasstransferCommon.csproj +++ b/MasstransferCommon/MasstransferCommon.csproj @@ -17,4 +17,22 @@ + + + + + + + + + + + + + + + + + + diff --git a/MasstransferCommon/Scheduler/DelayScheduler.cs b/MasstransferCommon/Scheduler/DelayScheduler.cs new file mode 100644 index 0000000..5a20278 --- /dev/null +++ b/MasstransferCommon/Scheduler/DelayScheduler.cs @@ -0,0 +1,22 @@ +namespace MasstransferCommon.Scheduler; + +/// +/// 延时定时任务 +/// +public class DelayScheduler +{ + private Timer _timer; + private Action _action; + + public void Schedule(Action action, TimeSpan delay) + { + _action = action; + _timer = new Timer(TimerCallback, null, delay, Timeout.InfiniteTimeSpan); + } + + private void TimerCallback(object? state) + { + _timer?.Dispose(); + _action?.Invoke(); + } +} \ No newline at end of file diff --git a/MasstransferCommon/Utils/JsonUtil.cs b/MasstransferCommon/Utils/JsonUtil.cs index 19dfb49..37b453c 100644 --- a/MasstransferCommon/Utils/JsonUtil.cs +++ b/MasstransferCommon/Utils/JsonUtil.cs @@ -16,6 +16,18 @@ public class JsonUtil } } + public static Dictionary? ToDictionary(string json) + { + try + { + return JsonConvert.DeserializeObject>(json); + } + catch (Exception e) + { + throw new ArgumentException($" 无效的json 对象 {json} "); + } + } + public static T FromJson(string json) { try diff --git a/MasstransferCommunicate/Http/ApiClient.cs b/MasstransferCommunicate/Http/ApiClient.cs new file mode 100644 index 0000000..201e15b --- /dev/null +++ b/MasstransferCommunicate/Http/ApiClient.cs @@ -0,0 +1,75 @@ +using System.Net.Http.Json; + +namespace MasstransferCommon.Utils; + +public class ApiClient : IDisposable +{ + private static readonly HttpClient Client; + + static ApiClient() + { + Client = new HttpClient(); + Client.DefaultRequestHeaders.Accept.Clear(); + Client.DefaultRequestHeaders.Add("Accept", "application/json"); + Client.DefaultRequestHeaders.Add("Content-Type", "application/json"); + } + + /// + /// 异步Get请求 + /// + /// + /// + /// + public static async Task GetAsync(string url) + { + using var response = await Client.GetAsync(url); + response.EnsureSuccessStatusCode(); + return await response.Content.ReadFromJsonAsync(); + } + + /// + /// 异步 Post请求 + /// + /// + /// + /// + /// + public static async Task PostAsync(string url, object? data) + { + using var response = await Client.PostAsJsonAsync(url, data); + response.EnsureSuccessStatusCode(); + return await response.Content.ReadFromJsonAsync(); + } + + /// + /// 异步 Put请求 + /// + /// + /// + /// + /// + public static async Task PutAsync(string url, object? data) + { + using var response = await Client.PutAsJsonAsync(url, data); + response.EnsureSuccessStatusCode(); + return await response.Content.ReadFromJsonAsync(); + } + + /// + /// 异步 Delete请求 + /// + /// + /// + /// + public static async Task DeleteAsync(string url) + { + using var response = await Client.DeleteAsync(url); + response.EnsureSuccessStatusCode(); + return await response.Content.ReadFromJsonAsync(); + } + + public void Dispose() + { + Client.Dispose(); + } +} \ No newline at end of file diff --git a/MasstransferCommunicate/MasstransferCommunicate.csproj b/MasstransferCommunicate/MasstransferCommunicate.csproj index d274234..949171e 100644 --- a/MasstransferCommunicate/MasstransferCommunicate.csproj +++ b/MasstransferCommunicate/MasstransferCommunicate.csproj @@ -8,7 +8,6 @@ - diff --git a/MasstransferCommunicate/Process/Client/ProcessHelper.cs b/MasstransferCommunicate/Process/Client/ProcessHelper.cs new file mode 100644 index 0000000..d6015b2 --- /dev/null +++ b/MasstransferCommunicate/Process/Client/ProcessHelper.cs @@ -0,0 +1,116 @@ +using System.IO.Pipes; + +namespace MasstransferCommunicate.Process.Client; + +public class ProcessHelper +{ + public event Action MessageReceived; + + private readonly string _pipeName; + private readonly bool _isServer; + private PipeStream _pipeStream; + private StreamReader _reader; + private StreamWriter _writer; + private CancellationTokenSource _cts; + + private ProcessHelper(string pipeName, bool isServer) + { + _pipeName = pipeName; + _isServer = isServer; + _cts = new CancellationTokenSource(); + } + + public static async Task CreateServer(string pipeName) + { + var helper = new ProcessHelper(pipeName, true); + await helper.ConnectAsync(); + return helper; + } + + public static async Task CreateClient(string pipeName) + { + var helper = new ProcessHelper(pipeName, false); + await helper.ConnectAsync(); + return helper; + } + + private async Task ConnectAsync() + { + if (_isServer) + { + _pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, + PipeOptions.Asynchronous); + await ((NamedPipeServerStream)_pipeStream).WaitForConnectionAsync(_cts.Token); + } + else + { + _pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); + await ((NamedPipeClientStream)_pipeStream).ConnectAsync(_cts.Token); + } + + _reader = new StreamReader(_pipeStream); + _writer = new StreamWriter(_pipeStream) { AutoFlush = true }; + StartListeningAsync(); + } + + public async Task SendMessageAsync(string message) + { + try + { + await _writer.WriteLineAsync(message); + } + catch (IOException) + { + await ReconnectAsync(); + await SendMessageAsync(message); + } + } + + private void StartListeningAsync() + { + Task.Run(async () => + { + while (!_cts.Token.IsCancellationRequested) + { + try + { + var message = await _reader.ReadLineAsync(); + if (message == null) throw new IOException("Pipe disconnected."); + MessageReceived?.Invoke(message); + } + catch (IOException) + { + // Handle disconnection and retries + await ReconnectAsync(); + } + } + }); + } + + private async Task ReconnectAsync() + { + Close(); + while (!_pipeStream.IsConnected) + { + try + { + Console.WriteLine("尝试重新连接..."); + await ConnectAsync(); + Console.WriteLine("重新连接成功"); + break; + } + catch (Exception) + { + Console.WriteLine("重新连接失败,等待 1 秒后重试..."); + await Task.Delay(1000); + } + } + } + + public void Close() + { + _cts.Cancel(); + _pipeStream?.Close(); + _cts = new CancellationTokenSource(); + } +} \ No newline at end of file diff --git a/MasstransferCommunicate/Process/Model/Message.cs b/MasstransferCommunicate/Process/Model/Message.cs new file mode 100644 index 0000000..57773c5 --- /dev/null +++ b/MasstransferCommunicate/Process/Model/Message.cs @@ -0,0 +1,14 @@ +namespace MasstransferCommunicate.Process.Model; + +public class Message +{ + public string Topic { get; set; } + + public T Data { get; set; } + + public Message(string topic, T data) + { + Topic = topic; + Data = data; + } +} \ No newline at end of file diff --git a/MasstransferCommunicate/Process/Service/ProcessCommunicator.cs b/MasstransferCommunicate/Process/Service/ProcessCommunicator.cs new file mode 100644 index 0000000..8d0580b --- /dev/null +++ b/MasstransferCommunicate/Process/Service/ProcessCommunicator.cs @@ -0,0 +1,86 @@ +using MasstransferCommon.Utils; +using MasstransferCommunicate.Process.Client; +using MasstransferCommunicate.Process.Model; +using Serilog; + +namespace MasstransferCommunicate.Process.Service; + +/// +/// 进程间通讯器 +/// +public class ProcessCommunicator +{ + private static readonly Dictionary>> Subscribers = new(); + + private static ProcessHelper? _helper; + + /// + /// 连接到服务端 + /// + public static async Task Connect() + { + _helper = await ProcessHelper.CreateClient("Masstransfer"); + _helper.MessageReceived += HandleMessageReceived; + } + + + /// + /// 订阅某个主题 + /// + /// + /// + public static Task Subscribe(string topic, Action @delegate) + { + if (!Subscribers.ContainsKey(topic)) + { + Subscribers.Add(topic, []); + } + + Subscribers[topic].Add(@delegate); + return Task.FromResult(true); + } + + /// + /// 发送消息 + /// + /// + /// + public static async Task Send(string topic, T message) + { + await _helper?.SendMessageAsync(JsonUtil.ToJson(new Message(topic, message)))!; + return true; + } + + /// + /// 处理接收到的消息 + /// + /// + private static void HandleMessageReceived(string? message) + { + if (message == null) return; + + Console.WriteLine($"收到来自服务端的消息: {message}"); + + var dictionary = JsonUtil.ToDictionary(message); + if (dictionary == null) return; + + var topic = dictionary["Topic"] as string; + var data = dictionary["Data"]; + + + if (!Subscribers.TryGetValue(topic, out var subscribers)) return; + + foreach (var subscriber in subscribers) + { + try + { + // 通知订阅者 + subscriber(JsonUtil.ToJson(data)); + } + catch (Exception exception) + { + Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic); + } + } + } +} \ No newline at end of file diff --git a/MasstransferExporter/MasstransferExporter.csproj b/MasstransferExporter/MasstransferExporter.csproj index 25c03ab..0b8ae66 100644 --- a/MasstransferExporter/MasstransferExporter.csproj +++ b/MasstransferExporter/MasstransferExporter.csproj @@ -16,6 +16,7 @@ + diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs index 4506e9c..bd0d441 100644 --- a/MasstransferExporter/Program.cs +++ b/MasstransferExporter/Program.cs @@ -1,13 +1,19 @@ -using MasstransferCommon.Scheduler; -using MasstransferExporter.Stat; +using MasstransferCommunicate.Process.Client; +using MasstransferCommunicate.Process.Service; class Program { - static void Main() + static async Task Main() { - JobScheduler.AddTask("Test", SystemStatExporter.Collect, 3000); + await ProcessCommunicator.Connect(); + Console.WriteLine("输入要发送的消息 (输入 'exit' 退出):"); - Console.ReadLine(); + while (true) + { + string messageToSend = Console.ReadLine(); + if (messageToSend == "exit") break; + await ProcessCommunicator.Send("ClientMessage", messageToSend); + } } } \ No newline at end of file