166 lines
4.4 KiB
C#
166 lines
4.4 KiB
C#
using System.Security.Cryptography.X509Certificates;
|
|
using MasstransferCommon.Model.Constant;
|
|
using MasstransferCommon.Utils;
|
|
using MasstransferCommunicate.Mqtt.Model;
|
|
using MasstransferInfrastructure.Mqtt.Model;
|
|
using MasstransferSecurity.Utils;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using MQTTnet.Protocol;
|
|
|
|
namespace MasstransferCommunicate.Mqtt.Client;
|
|
|
|
internal class MqttClient
|
|
{
|
|
private IMqttClient? _client;
|
|
|
|
public event EventHandler<MqttApplicationMessageReceivedEventArgs>? MessageReceived;
|
|
|
|
private static readonly DateTime Epoch = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
|
|
|
public bool IsConnected => _client is { IsConnected: true };
|
|
|
|
/// <summary>
|
|
/// 获取连接参数
|
|
/// </summary>
|
|
/// <param name="options"></param>
|
|
/// <returns></returns>
|
|
private MqttClientOptions GetConnectionOptions(MqttConnectOptions options)
|
|
{
|
|
var clientId = DeviceInfoUtil.GenerateUniqueID();
|
|
|
|
return new MqttClientOptionsBuilder()
|
|
.WithTcpServer(options.ServerAddress, options.Port)
|
|
// .WithCredentials(options.UserName, options.Password)
|
|
.WithClientId(Constants.SN)
|
|
.WithCleanSession()
|
|
.WithTlsOptions(
|
|
o =>
|
|
{
|
|
o.UseTls(options.EnableTls);
|
|
o.WithSslProtocols(options.Protocols);
|
|
}
|
|
)
|
|
.Build();
|
|
}
|
|
|
|
private X509Certificate2 GetCertificate(string certBase64)
|
|
{
|
|
var certBytes = Convert.FromBase64String(certBase64);
|
|
return new X509Certificate2(certBytes);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 连接MQTT
|
|
/// </summary>
|
|
/// <param name="options"></param>
|
|
public async Task<bool> ConnectAsync(MqttConnectOptions options)
|
|
{
|
|
var ops = GetConnectionOptions(options);
|
|
var client = new MqttFactory().CreateMqttClient();
|
|
var connectResult = await client.ConnectAsync(ops);
|
|
if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
client.ApplicationMessageReceivedAsync += (e) =>
|
|
{
|
|
MessageReceived?.Invoke(client, e);
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
_client = client;
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 断开连接
|
|
/// </summary>
|
|
public async Task DisconnectAsync()
|
|
{
|
|
if (_client is { IsConnected: true })
|
|
{
|
|
return;
|
|
}
|
|
|
|
await _client.DisconnectAsync();
|
|
_client = null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发送消息
|
|
/// </summary>
|
|
/// <param name="topic"></param>
|
|
/// <param name="data"></param>
|
|
/// <param name="qos"></param>
|
|
/// <returns></returns>
|
|
public async Task<bool> Publish<T>(string topic, T data,
|
|
MqttQualityOfServiceLevel qos)
|
|
{
|
|
if (_client is not { IsConnected: true })
|
|
{
|
|
return false;
|
|
}
|
|
|
|
var message = new Payload<T>()
|
|
{
|
|
MsgId = GetCurrentTimestamp(),
|
|
Data = data,
|
|
ConsumeTime = DateTime.Now,
|
|
};
|
|
|
|
var payload = JsonUtil.ToJson(message);
|
|
|
|
Console.WriteLine(payload);
|
|
|
|
var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(payload)
|
|
.WithQualityOfServiceLevel(qos)
|
|
.Build());
|
|
|
|
return result.IsSuccess;
|
|
}
|
|
|
|
private static long GetCurrentTimestamp()
|
|
{
|
|
return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 订阅主题
|
|
/// </summary>
|
|
/// <param name="topic"></param>
|
|
/// <param name="qos"></param>
|
|
/// <returns></returns>
|
|
public async Task<bool> Subscribe(string topic,
|
|
MqttQualityOfServiceLevel qos)
|
|
{
|
|
if (_client is not { IsConnected: true })
|
|
{
|
|
return false;
|
|
}
|
|
|
|
await _client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qos)
|
|
.Build());
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 取消订阅主题
|
|
/// </summary>
|
|
/// <param name="topic"></param>
|
|
/// <returns></returns>
|
|
public async Task<bool> Unsubscribe(string topic)
|
|
{
|
|
if (_client is not { IsConnected: true })
|
|
{
|
|
return false;
|
|
}
|
|
|
|
await _client.UnsubscribeAsync(topic);
|
|
return true;
|
|
}
|
|
} |