MasstransferExporter/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs

159 lines
4.1 KiB
C#
Raw Normal View History

2024-09-10 08:15:26 +00:00
using MasstransferCommon.Model.Constant;
2024-06-03 08:35:49 +00:00
using MasstransferCommon.Utils;
2024-09-05 13:25:51 +00:00
using MasstransferCommunicate.Mqtt.Model;
2024-06-03 08:35:49 +00:00
using MasstransferInfrastructure.Mqtt.Model;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace MasstransferCommunicate.Mqtt.Client;
2024-06-03 08:35:49 +00:00
internal class MqttClient
2024-06-03 08:35:49 +00:00
{
private IMqttClient? _client;
public event EventHandler<MqttApplicationMessageReceivedEventArgs>? MessageReceived;
2024-06-03 08:35:49 +00:00
private static readonly DateTime Epoch = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
2024-06-03 08:35:49 +00:00
public bool IsConnected => _client is { IsConnected: true };
/// <summary>
/// 获取连接参数
/// </summary>
/// <param name="options"></param>
/// <returns></returns>
private MqttClientOptions GetConnectionOptions(MqttConnectOptions options)
{
return new MqttClientOptionsBuilder()
.WithTcpServer(options.ServerAddress, options.Port)
.WithClientId(Constants.SN)
2024-09-10 07:06:10 +00:00
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithCleanStart(true)
2024-09-10 08:54:03 +00:00
.WithWillRetain(false)
2024-06-03 08:35:49 +00:00
.WithCleanSession()
.Build();
}
/// <summary>
/// 连接MQTT
/// </summary>
/// <param name="options"></param>
public async Task<bool> ConnectAsync(MqttConnectOptions options)
{
2024-07-09 03:41:08 +00:00
var ops = GetConnectionOptions(options);
2024-06-03 08:35:49 +00:00
var client = new MqttFactory().CreateMqttClient();
2024-07-09 03:41:08 +00:00
var connectResult = await client.ConnectAsync(ops);
if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
2024-06-03 08:35:49 +00:00
{
return false;
}
client.ApplicationMessageReceivedAsync += (e) =>
{
2024-09-10 07:06:10 +00:00
try
{
MessageReceived?.Invoke(client, e);
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
2024-06-03 08:35:49 +00:00
return Task.CompletedTask;
};
_client = client;
return true;
}
/// <summary>
/// 断开连接
/// </summary>
public async Task DisconnectAsync()
{
if (_client is { IsConnected: true })
{
return;
}
await _client.DisconnectAsync();
_client = null;
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
2024-09-05 13:25:51 +00:00
/// <param name="data"></param>
2024-06-03 08:35:49 +00:00
/// <param name="qos"></param>
/// <returns></returns>
2024-09-05 13:25:51 +00:00
public async Task<bool> Publish<T>(string topic, T data,
2024-06-03 08:35:49 +00:00
MqttQualityOfServiceLevel qos)
{
if (_client is not { IsConnected: true })
{
return false;
}
2024-09-05 13:25:51 +00:00
var message = new Payload<T>()
{
MsgId = GetCurrentTimestamp(),
2024-09-05 13:25:51 +00:00
Data = data,
ConsumeTime = DateTime.Now,
2024-09-05 13:25:51 +00:00
};
var payload = JsonUtil.ToJson(message);
2024-06-03 08:35:49 +00:00
Console.WriteLine(payload);
var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
2024-06-03 08:35:49 +00:00
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.Build());
return result.IsSuccess;
2024-06-03 08:35:49 +00:00
}
private static long GetCurrentTimestamp()
{
return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds;
}
2024-06-03 08:35:49 +00:00
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
/// <param name="qos"></param>
/// <returns></returns>
public async Task<bool> Subscribe(string topic,
MqttQualityOfServiceLevel qos)
{
if (_client is not { IsConnected: true })
{
return false;
}
await _client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qos)
.Build());
return true;
}
/// <summary>
/// 取消订阅主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task<bool> Unsubscribe(string topic)
{
if (_client is not { IsConnected: true })
{
return false;
}
await _client.UnsubscribeAsync(topic);
return true;
}
}