MasstransferExporter/MasstransferInfrastructure/Process/Client/ProcessHelper.cs

99 lines
2.6 KiB
C#
Raw Normal View History

2024-09-03 08:13:49 +00:00
using MasstransferCommon.Utils;
using MasstransferCommunicate.Process.Model;
using Serilog;
2024-06-19 07:53:34 +00:00
namespace MasstransferCommunicate.Process.Client;
2024-09-03 08:13:49 +00:00
/// <summary>
/// 进程通讯工具类
/// </summary>
2024-06-19 07:53:34 +00:00
public class ProcessHelper
{
2024-09-03 08:13:49 +00:00
private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
2024-06-19 07:53:34 +00:00
2024-09-03 08:13:49 +00:00
private static PipeClient? _pipe;
2024-06-19 07:53:34 +00:00
2024-09-03 08:13:49 +00:00
/// <summary>
/// 初始化通讯管道
/// </summary>
public static void Init()
2024-06-19 07:53:34 +00:00
{
2024-09-03 08:13:49 +00:00
if (_pipe != null) return;
2024-09-04 02:11:04 +00:00
_pipe = new PipeClient();
2024-09-10 08:15:26 +00:00
_pipe.MessageReceived += HandleMessageReceived;
2024-09-03 08:13:49 +00:00
_pipe.Start();
2024-06-19 07:53:34 +00:00
}
2024-09-03 08:13:49 +00:00
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="topic"></param>
/// <param name="delegate"></param>
public static void Subscribe(string topic, Delegate @delegate)
2024-06-19 07:53:34 +00:00
{
2024-09-03 08:13:49 +00:00
if (!Subscribers.ContainsKey(topic))
2024-06-19 07:53:34 +00:00
{
2024-09-03 08:13:49 +00:00
Subscribers.Add(topic, []);
2024-06-19 07:53:34 +00:00
}
2024-09-03 08:13:49 +00:00
Subscribers[topic].Add(@delegate);
2024-06-19 07:53:34 +00:00
}
2024-09-03 08:13:49 +00:00
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <typeparam name="T"></typeparam>
public static void Send<T>(string topic, T message)
2024-06-19 07:53:34 +00:00
{
2024-09-03 08:13:49 +00:00
var payload = new Message<T>(topic, message);
_pipe?.SendMessage(JsonUtil.ToJson(payload));
2024-06-19 07:53:34 +00:00
}
2024-09-03 08:13:49 +00:00
/// <summary>
/// 处理接收到的消息
/// </summary>
/// <returns></returns>
private static void HandleMessageReceived(string? message)
2024-06-19 07:53:34 +00:00
{
2024-09-03 08:13:49 +00:00
if (message == null) return;
2024-06-19 07:53:34 +00:00
2024-09-03 08:13:49 +00:00
var dictionary = JsonUtil.ToDictionary(message);
if (dictionary == null) return;
var topic = dictionary["Topic"] as string;
if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers)
2024-06-19 07:53:34 +00:00
{
try
{
2024-09-03 08:13:49 +00:00
var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue;
var dataType = parameters[1].ParameterType;
var type = typeof(Message<>).MakeGenericType(dataType);
var payload = JsonUtil.FromJson(type, message);
if (payload == null) continue;
var property = type.GetProperty("Data");
var data = property?.GetValue(payload);
2024-09-03 08:13:49 +00:00
// 通知订阅者
subscriber.DynamicInvoke(topic, data);
2024-06-19 07:53:34 +00:00
}
2024-09-03 08:13:49 +00:00
catch (Exception exception)
2024-06-19 07:53:34 +00:00
{
2024-09-03 08:13:49 +00:00
Log.Error(exception, "订阅主题 {Topic} 时发生错误", topic);
2024-06-19 07:53:34 +00:00
}
}
}
}