diff --git a/MasstransferCommon/Utils/JsonUtil.cs b/MasstransferCommon/Utils/JsonUtil.cs index 37b453c..945d980 100644 --- a/MasstransferCommon/Utils/JsonUtil.cs +++ b/MasstransferCommon/Utils/JsonUtil.cs @@ -40,6 +40,18 @@ public class JsonUtil } } + public static object? FromJson(Type type, string json) + { + try + { + return JsonConvert.DeserializeObject(json, type); + } + catch (Exception e) + { + throw new ArgumentException($" 无效的json 字符串 {json} "); + } + } + public static T FromJsonOrDefault(string json) { try diff --git a/MasstransferExporter/Stat/HeartbeatExporter.cs b/MasstransferExporter/Stat/HeartbeatExporter.cs new file mode 100644 index 0000000..d7ceaef --- /dev/null +++ b/MasstransferExporter/Stat/HeartbeatExporter.cs @@ -0,0 +1,22 @@ +using MasstransferCommon.Scheduler; + +namespace MasstransferExporter.Stat; + +/// +/// 心跳 +/// +public class HeartbeatExporter +{ + /// + /// 启动心跳线程 + /// + public static void StartHeartBeat() + { + JobScheduler.AddTask("HeartbeatExporter#StartHeartBeat", HeartBeat, 10000); + } + + private static void HeartBeat() + { + + } +} \ No newline at end of file diff --git a/MasstransferInfrastructure/MasstransferInfrastructure.csproj b/MasstransferInfrastructure/MasstransferInfrastructure.csproj index a3dd507..52f38e2 100644 --- a/MasstransferInfrastructure/MasstransferInfrastructure.csproj +++ b/MasstransferInfrastructure/MasstransferInfrastructure.csproj @@ -15,6 +15,7 @@ + diff --git a/MasstransferInfrastructure/Minio/MinioHelper.cs b/MasstransferInfrastructure/Minio/MinioHelper.cs new file mode 100644 index 0000000..7f0a86e --- /dev/null +++ b/MasstransferInfrastructure/Minio/MinioHelper.cs @@ -0,0 +1,88 @@ +using Minio; +using Minio.DataModel.Args; + +namespace MasstransferCommunicate.Minio; + +/// +/// Minio工具类 +/// +public class MinioHelper +{ + private readonly IMinioClient _client; + + public MinioHelper(string endpoint, string accessKey, string secretKey) + { + _client = new MinioClient() + .WithEndpoint(endpoint) + .WithCredentials(accessKey, secretKey) + .Build(); + } + + /// + /// 判断bucket 是否存在 + /// + /// + /// + public async Task BucketExistsAsync(string bucketName) + { + return await _client.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName)); + } + + /// + /// 上传文件 + /// + /// + /// + /// + /// + public async Task UploadFileAsync(string bucketName, string fileName, string filePath) + { + if (!await BucketExistsAsync(bucketName)) + { + await _client.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName)); + } + + await _client.PutObjectAsync(new PutObjectArgs() + .WithBucket(bucketName) + .WithObject(fileName) + .WithFileName(filePath)); + + return $"{bucketName}/{fileName}"; + } + + /// + /// 下载文件 + /// + /// + /// + /// + public async Task DownloadFileAsync(string bucketName, string fileName, string filePath) + { + if (!await BucketExistsAsync(bucketName)) + { + await _client.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName)); + } + + await _client.GetObjectAsync(new GetObjectArgs() + .WithBucket(bucketName) + .WithObject(fileName) + .WithFile(filePath)); + } + + /// + /// 删除文件 + /// + /// + /// + public async Task DeleteFileAsync(string bucketName, string fileName) + { + if (!await BucketExistsAsync(bucketName)) + { + await _client.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName)); + } + + await _client.RemoveObjectAsync(new RemoveObjectArgs() + .WithBucket(bucketName) + .WithObject(fileName)); + } +} \ No newline at end of file diff --git a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs index 9f42de8..f645ff5 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs @@ -1,4 +1,5 @@ -using MasstransferCommon.Utils; +using System.Reflection; +using MasstransferCommon.Utils; using MasstransferInfrastructure.Mqtt.Model; using MQTTnet; using MQTTnet.Client; @@ -7,11 +8,10 @@ using Serilog; namespace MasstransferInfrastructure.Mqtt.Client; -public class MessageQueueHelper +public class MessageQueueHelper { - private static readonly Dictionary>> Subscribers = new(); + private static readonly Dictionary> Subscribers = new(); - // ReSharper disable once StaticMemberInGenericType private static readonly MqttClient Client = new(); @@ -41,7 +41,7 @@ public class MessageQueueHelper /// /// /// - public static async Task Subscribe(string topic, Action @delegate, + public static async Task Subscribe(string topic, Delegate @delegate, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) { if (!Subscribers.ContainsKey(topic)) @@ -84,8 +84,12 @@ public class MessageQueueHelper { try { + var methodInfo = subscriber.Method; + var parameters = methodInfo.GetParameters(); + if (parameters.Length != 2) continue; + var type = parameters[1].ParameterType; // 通知订阅者 - subscriber(topic, JsonUtil.FromJson(message)); + subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message)); } catch (Exception exception) {