using MasstransferCommon.Utils; using MasstransferCommunicate.Process.Client; using MasstransferCommunicate.Process.Model; using Serilog; namespace MasstransferCommunicate.Process.Service; /// /// 进程间通讯器 /// public class ProcessCommunicator { private static readonly Dictionary> Subscribers = new(); private static ProcessHelper? _helper; /// /// 启动连接 /// public static async Task Connect() { _helper = await ProcessHelper.CreateServer("Masstransfer"); _helper.MessageReceived += HandleMessageReceived; } /// /// 订阅某个主题 /// /// /// public static void Subscribe(string topic, Delegate @delegate) { if (!Subscribers.ContainsKey(topic)) { Subscribers.Add(topic, []); } Subscribers[topic].Add(@delegate); } /// /// 发送消息 /// /// /// public static async Task Send(string topic, T message) { await _helper?.SendMessageAsync(JsonUtil.ToJson(new Message(topic, message)))!; return true; } /// /// 处理接收到的消息 /// /// private static void HandleMessageReceived(string? message) { if (message == null) return; var dictionary = JsonUtil.ToDictionary(message); if (dictionary == null) return; var topic = dictionary["Topic"] as string; var data = dictionary["Data"]; if (!Subscribers.TryGetValue(topic, out var subscribers)) return; foreach (var subscriber in subscribers) { try { var methodInfo = subscriber.Method; var parameters = methodInfo.GetParameters(); if (parameters.Length != 2) continue; var type = parameters[1].ParameterType; // 通知订阅者 subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message)); } catch (Exception exception) { Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic); } } } }