using System.Security.Cryptography.X509Certificates; using MasstransferCommon.Utils; using MasstransferInfrastructure.Mqtt.Model; using MasstransferSecurity.Utils; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; namespace MasstransferInfrastructure.Mqtt.Client; class MqttClient { private IMqttClient? _client; public event EventHandler MessageReceived; public bool IsConnected => _client is { IsConnected: true }; /// /// 获取连接参数 /// /// /// private MqttClientOptions GetConnectionOptions(MqttConnectOptions options) { var clientId = DeviceInfoUtil.GenerateUniqueID(); // var caCert = GetCertificate(options.CaCert); // var clientCert = GetCertificate(options.ClientCert); // // var chain = new X509Certificate2Collection(new[] { caCert, clientCert }); return new MqttClientOptionsBuilder() .WithTcpServer(options.ServerAddress, options.Port) // .WithCredentials(options.UserName, options.Password) .WithClientId(clientId) .WithCleanSession() .WithTlsOptions( o => { o.UseTls(options.EnableTls); o.WithSslProtocols(options.Protocols); // o.WithTrustChain(chain); } ) .Build(); } private X509Certificate2 GetCertificate(string certBase64) { var certBytes = Convert.FromBase64String(certBase64); return new X509Certificate2(certBytes); } /// /// 连接MQTT /// /// public async Task ConnectAsync(MqttConnectOptions options) { var ops = GetConnectionOptions(options); var client = new MqttFactory().CreateMqttClient(); var connectResult = await client.ConnectAsync(ops); if (connectResult.ResultCode != MqttClientConnectResultCode.Success) { return false; } client.ApplicationMessageReceivedAsync += (e) => { MessageReceived?.Invoke(client, e); return Task.CompletedTask; }; _client = client; return true; } /// /// 断开连接 /// public async Task DisconnectAsync() { if (_client is { IsConnected: true }) { return; } await _client.DisconnectAsync(); _client = null; } /// /// 发送消息 /// /// /// /// /// public async Task Publish(string topic, object message, MqttQualityOfServiceLevel qos) { if (_client is not { IsConnected: true }) { return false; } var payload = message as string ?? JsonUtil.ToJson(message); var result = await _client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) .Build()); return result.IsSuccess; } /// /// 订阅主题 /// /// /// /// public async Task Subscribe(string topic, MqttQualityOfServiceLevel qos) { if (_client is not { IsConnected: true }) { return false; } await _client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qos) .Build()); return true; } /// /// 取消订阅主题 /// /// /// public async Task Unsubscribe(string topic) { if (_client is not { IsConnected: true }) { return false; } await _client.UnsubscribeAsync(topic); return true; } }