using MasstransferCommon.Utils;
using MasstransferCommunicate.Process.Model;
using Serilog;
namespace MasstransferCommunicate.Process.Client;
///
/// 进程通讯工具类
///
public class ProcessHelper
{
private static readonly Dictionary> Subscribers = new();
private static PipeClient? _pipe;
///
/// 初始化通讯管道
///
public static void Init()
{
if (_pipe != null) return;
_pipe = new PipeClient();
_pipe.MessageReceived += HandleMessageReceived;
_pipe.Start();
}
///
/// 订阅消息
///
///
///
public static void Subscribe(string topic, Delegate @delegate)
{
if (!Subscribers.ContainsKey(topic))
{
Subscribers.Add(topic, []);
}
Subscribers[topic].Add(@delegate);
}
///
/// 发送消息
///
///
///
///
public static void Send(string topic, T message)
{
var payload = new Message(topic, message);
_pipe?.SendMessage(JsonUtil.ToJson(payload));
}
///
/// 处理接收到的消息
///
///
private static void HandleMessageReceived(string? message)
{
if (message == null) return;
var dictionary = JsonUtil.ToDictionary(message);
if (dictionary == null) return;
var topic = dictionary["Topic"] as string;
var data = dictionary["Data"];
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers)
{
try
{
var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue;
var type = parameters[1].ParameterType;
// 通知订阅者
subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message));
}
catch (Exception exception)
{
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
}
}
}
}