添加Minio工具类

This commit is contained in:
huangxianguo 2024-06-19 19:28:48 +08:00
parent 18d1ca9bd4
commit 4cbee59acb
5 changed files with 133 additions and 6 deletions

View File

@ -40,6 +40,18 @@ public class JsonUtil
}
}
public static object? FromJson(Type type, string json)
{
try
{
return JsonConvert.DeserializeObject(json, type);
}
catch (Exception e)
{
throw new ArgumentException($" 无效的json 字符串 {json} ");
}
}
public static T FromJsonOrDefault<T>(string json)
{
try

View File

@ -0,0 +1,22 @@
using MasstransferCommon.Scheduler;
namespace MasstransferExporter.Stat;
/// <summary>
/// 心跳
/// </summary>
public class HeartbeatExporter
{
/// <summary>
/// 启动心跳线程
/// </summary>
public static void StartHeartBeat()
{
JobScheduler.AddTask("HeartbeatExporter#StartHeartBeat", HeartBeat, 10000);
}
private static void HeartBeat()
{
}
}

View File

@ -15,6 +15,7 @@
<ItemGroup>
<PackageReference Include="Masuit.Tools.Core" Version="2024.3.4" />
<PackageReference Include="Minio" Version="6.0.2" />
<PackageReference Include="MQTTnet" Version="4.3.6.1152"/>
<PackageReference Include="Serilog" Version="4.0.0"/>
<PackageReference Include="Microsoft.Data.Sqlite" Version="9.0.0-preview.2.24128.4"/>

View File

@ -0,0 +1,88 @@
using Minio;
using Minio.DataModel.Args;
namespace MasstransferCommunicate.Minio;
/// <summary>
/// Minio工具类
/// </summary>
public class MinioHelper
{
private readonly IMinioClient _client;
public MinioHelper(string endpoint, string accessKey, string secretKey)
{
_client = new MinioClient()
.WithEndpoint(endpoint)
.WithCredentials(accessKey, secretKey)
.Build();
}
/// <summary>
/// 判断bucket 是否存在
/// </summary>
/// <param name="bucketName"></param>
/// <returns></returns>
public async Task<bool> BucketExistsAsync(string bucketName)
{
return await _client.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName));
}
/// <summary>
/// 上传文件
/// </summary>
/// <param name="bucketName"></param>
/// <param name="fileName"></param>
/// <param name="filePath"></param>
/// <returns></returns>
public async Task<string> UploadFileAsync(string bucketName, string fileName, string filePath)
{
if (!await BucketExistsAsync(bucketName))
{
await _client.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName));
}
await _client.PutObjectAsync(new PutObjectArgs()
.WithBucket(bucketName)
.WithObject(fileName)
.WithFileName(filePath));
return $"{bucketName}/{fileName}";
}
/// <summary>
/// 下载文件
/// </summary>
/// <param name="bucketName"></param>
/// <param name="fileName"></param>
/// <param name="filePath"></param>
public async Task DownloadFileAsync(string bucketName, string fileName, string filePath)
{
if (!await BucketExistsAsync(bucketName))
{
await _client.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName));
}
await _client.GetObjectAsync(new GetObjectArgs()
.WithBucket(bucketName)
.WithObject(fileName)
.WithFile(filePath));
}
/// <summary>
/// 删除文件
/// </summary>
/// <param name="bucketName"></param>
/// <param name="fileName"></param>
public async Task DeleteFileAsync(string bucketName, string fileName)
{
if (!await BucketExistsAsync(bucketName))
{
await _client.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName));
}
await _client.RemoveObjectAsync(new RemoveObjectArgs()
.WithBucket(bucketName)
.WithObject(fileName));
}
}

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Utils;
using System.Reflection;
using MasstransferCommon.Utils;
using MasstransferInfrastructure.Mqtt.Model;
using MQTTnet;
using MQTTnet.Client;
@ -7,11 +8,10 @@ using Serilog;
namespace MasstransferInfrastructure.Mqtt.Client;
public class MessageQueueHelper<T>
public class MessageQueueHelper
{
private static readonly Dictionary<string, List<Action<string, T>>> Subscribers = new();
private static readonly Dictionary<string, List<Delegate>> Subscribers = new();
// ReSharper disable once StaticMemberInGenericType
private static readonly MqttClient Client = new();
@ -41,7 +41,7 @@ public class MessageQueueHelper<T>
/// <param name="topic"></param>
/// <param name="delegate"></param>
/// <param name="qos"></param>
public static async Task<bool> Subscribe(string topic, Action<string, T> @delegate,
public static async Task<bool> Subscribe(string topic, Delegate @delegate,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
if (!Subscribers.ContainsKey(topic))
@ -84,8 +84,12 @@ public class MessageQueueHelper<T>
{
try
{
var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue;
var type = parameters[1].ParameterType;
// 通知订阅者
subscriber(topic, JsonUtil.FromJson<T>(message));
subscriber.DynamicInvoke(topic, JsonUtil.FromJson(type, message));
}
catch (Exception exception)
{