MasstransferExporter/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs

154 lines
4.2 KiB
C#

using System.Security.Cryptography.X509Certificates;
using MasstransferCommon.Utils;
using MasstransferInfrastructure.Mqtt.Model;
using MasstransferSecurity.Utils;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace MasstransferInfrastructure.Mqtt.Client;
class MqttClient
{
private IMqttClient? _client;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> MessageReceived;
public bool IsConnected => _client is { IsConnected: true };
/// <summary>
/// 获取连接参数
/// </summary>
/// <param name="options"></param>
/// <returns></returns>
private MqttClientOptions GetConnectionOptions(MqttConnectOptions options)
{
var clientId = DeviceInfoUtil.GenerateUniqueID();
// var caCert = GetCertificate(options.CaCert);
// var clientCert = GetCertificate(options.ClientCert);
//
// var chain = new X509Certificate2Collection(new[] { caCert, clientCert });
return new MqttClientOptionsBuilder()
.WithTcpServer(options.ServerAddress, options.Port)
// .WithCredentials(options.UserName, options.Password)
.WithClientId(clientId)
.WithCleanSession()
.WithTlsOptions(
o =>
{
o.UseTls(options.EnableTls);
o.WithSslProtocols(options.Protocols);
// o.WithTrustChain(chain);
}
)
.Build();
}
private X509Certificate2 GetCertificate(string certBase64)
{
var certBytes = Convert.FromBase64String(certBase64);
return new X509Certificate2(certBytes);
}
/// <summary>
/// 连接MQTT
/// </summary>
/// <param name="options"></param>
public async Task<bool> ConnectAsync(MqttConnectOptions options)
{
var ops = GetConnectionOptions(options);
var client = new MqttFactory().CreateMqttClient();
var connectResult = await client.ConnectAsync(ops);
if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
return false;
}
client.ApplicationMessageReceivedAsync += (e) =>
{
MessageReceived?.Invoke(client, e);
return Task.CompletedTask;
};
_client = client;
return true;
}
/// <summary>
/// 断开连接
/// </summary>
public async Task DisconnectAsync()
{
if (_client is { IsConnected: true })
{
return;
}
await _client.DisconnectAsync();
_client = null;
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <param name="qos"></param>
/// <returns></returns>
public async Task<bool> Publish(string topic, object message,
MqttQualityOfServiceLevel qos)
{
if (_client is not { IsConnected: true })
{
return false;
}
var payload = message as string ?? JsonUtil.ToJson(message);
var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.Build());
return result.IsSuccess;
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
/// <param name="qos"></param>
/// <returns></returns>
public async Task<bool> Subscribe(string topic,
MqttQualityOfServiceLevel qos)
{
if (_client is not { IsConnected: true })
{
return false;
}
await _client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qos)
.Build());
return true;
}
/// <summary>
/// 取消订阅主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task<bool> Unsubscribe(string topic)
{
if (_client is not { IsConnected: true })
{
return false;
}
await _client.UnsubscribeAsync(topic);
return true;
}
}