From e54f6012789cf2e27568dcf0b65b3098d73f3639 Mon Sep 17 00:00:00 2001 From: huangxianguo Date: Thu, 5 Sep 2024 21:25:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BA=86=E8=BF=9C=E7=A8=8B?= =?UTF-8?q?=E9=94=81=E6=9C=BA=E5=92=8C=E8=A7=A3=E9=94=81=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Scheduler/DelayScheduler.cs | 44 ++++++-- MasstransferCommon/Utils/JsonUtil.cs | 3 +- MasstransferCommon/Utils/SnowflakeId.cs | 103 ++++++++++++++++++ .../LogExporter/Model/OperationLogData.cs | 18 +-- MasstransferExporter/Program.cs | 7 +- .../Database/Sqlite/SqliteHelper.cs | 2 + .../Mqtt/Client/MessageQueueHelper.cs | 2 +- .../Mqtt/Client/MqttClient.cs | 15 ++- 8 files changed, 165 insertions(+), 29 deletions(-) create mode 100644 MasstransferCommon/Utils/SnowflakeId.cs diff --git a/MasstransferCommon/Scheduler/DelayScheduler.cs b/MasstransferCommon/Scheduler/DelayScheduler.cs index 5a20278..8864e47 100644 --- a/MasstransferCommon/Scheduler/DelayScheduler.cs +++ b/MasstransferCommon/Scheduler/DelayScheduler.cs @@ -1,22 +1,42 @@ -namespace MasstransferCommon.Scheduler; +using Serilog; + +namespace MasstransferCommon.Scheduler; /// /// 延时定时任务 /// public class DelayScheduler { - private Timer _timer; - private Action _action; - - public void Schedule(Action action, TimeSpan delay) + /// + /// 设定延时任务 + /// + /// + /// + /// + /// + /// + public static async void Delay(Action action, TimeSpan delay, CancellationToken cancellationToken = default) { - _action = action; - _timer = new Timer(TimerCallback, null, delay, Timeout.InfiniteTimeSpan); - } + try + { + if (action == null) throw new ArgumentNullException(nameof(action)); + if (delay.TotalMilliseconds < 0) + throw new ArgumentOutOfRangeException(nameof(delay), "延时时间不能为负数"); - private void TimerCallback(object? state) - { - _timer?.Dispose(); - _action?.Invoke(); + await Task.Delay(delay, cancellationToken); + if (cancellationToken.IsCancellationRequested) + { + return; + } + + action(); + } + catch (Exception e) + { + if (e is not TaskCanceledException) + { + Log.Error(e, "延时任务执行失败"); + } + } } } \ No newline at end of file diff --git a/MasstransferCommon/Utils/JsonUtil.cs b/MasstransferCommon/Utils/JsonUtil.cs index 71e4cd6..4667ec4 100644 --- a/MasstransferCommon/Utils/JsonUtil.cs +++ b/MasstransferCommon/Utils/JsonUtil.cs @@ -1,5 +1,4 @@ -using System.Text.Json; -using Newtonsoft.Json; +using Newtonsoft.Json; namespace MasstransferCommon.Utils; diff --git a/MasstransferCommon/Utils/SnowflakeId.cs b/MasstransferCommon/Utils/SnowflakeId.cs new file mode 100644 index 0000000..7ced0e0 --- /dev/null +++ b/MasstransferCommon/Utils/SnowflakeId.cs @@ -0,0 +1,103 @@ +namespace MasstransferCommon.Utils; + +using System; + +public class SnowflakeId +{ + private static readonly DateTime Epoch = new DateTime(2020, 1, 1, 0, 0, 0, DateTimeKind.Utc); + private const int WorkerIdBits = 5; + private const int DatacenterIdBits = 5; + private const int SequenceBits = 12; + + private const long MaxWorkerId = -1L ^ (-1L << WorkerIdBits); + private const long MaxDatacenterId = -1L ^ (-1L << DatacenterIdBits); + + private const long WorkerIdShift = SequenceBits; + private const long DatacenterIdShift = SequenceBits + WorkerIdBits; + private const long TimestampLeftShift = SequenceBits + WorkerIdBits + DatacenterIdBits; + private const long SequenceMask = -1L ^ (-1L << SequenceBits); + + private readonly object _lock = new object(); + private long _lastTimestamp = -1L; + private long _sequence = 0L; + + private static readonly SnowflakeId Instance = new SnowflakeId(0, 0); + + public long WorkerId { get; } + public long DatacenterId { get; } + + public SnowflakeId(long workerId, long datacenterId) + { + if (workerId > MaxWorkerId || workerId < 0) + { + throw new ArgumentException($"workerId must be between 0 and {MaxWorkerId}"); + } + + if (datacenterId > MaxDatacenterId || datacenterId < 0) + { + throw new ArgumentException($"datacenterId must be between 0 and {MaxDatacenterId}"); + } + + WorkerId = workerId; + DatacenterId = datacenterId; + } + + public static string GetNextId() + { + return Instance.NextId().ToString(); + } + + private long NextId() + { + lock (_lock) + { + var timestamp = TimeGen(); + + if (timestamp < _lastTimestamp) + { + throw new InvalidOperationException("Clock moved backwards. Refusing to generate id"); + } + + if (_lastTimestamp == timestamp) + { + _sequence = (_sequence + 1) & SequenceMask; + if (_sequence == 0) + { + timestamp = TilNextMillis(_lastTimestamp); + } + } + else + { + _sequence = 0L; + } + + _lastTimestamp = timestamp; + + return ((timestamp - EpochTicks()) << (int)TimestampLeftShift) | + (DatacenterId << (int)DatacenterIdShift) | + (WorkerId << (int)WorkerIdShift) | + _sequence; + } + } + + private long TilNextMillis(long lastTimestamp) + { + var timestamp = TimeGen(); + while (timestamp <= lastTimestamp) + { + timestamp = TimeGen(); + } + + return timestamp; + } + + private long TimeGen() + { + return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds; + } + + private static long EpochTicks() + { + return (long)(Epoch - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds; + } +} \ No newline at end of file diff --git a/MasstransferExporter/LogExporter/Model/OperationLogData.cs b/MasstransferExporter/LogExporter/Model/OperationLogData.cs index 9cf58ac..1449ff9 100644 --- a/MasstransferExporter/LogExporter/Model/OperationLogData.cs +++ b/MasstransferExporter/LogExporter/Model/OperationLogData.cs @@ -1,21 +1,23 @@ -namespace MasstransferExporter.LogExporter.Model; +using Newtonsoft.Json; + +namespace MasstransferExporter.LogExporter.Model; /// /// 用户操作日志 /// public class OperationLogData { - public string UserName { get; set; } + [JsonProperty("userName")] public string UserName { get; set; } - public DateTime ControlTimestamp { get; set; } + [JsonProperty("controlTimestamp")] public DateTime ControlTimestamp { get; set; } - public string ControlType { get; set; } + [JsonProperty("controlType")] public string ControlType { get; set; } - public string ControlResult { get; set; } + [JsonProperty("controlResult")] public string ControlResult { get; set; } - public string ControlTarget { get; set; } + [JsonProperty("controlTarget")] public string ControlTarget { get; set; } - public string ControlParams { get; set; } + [JsonProperty("controlParams")] public string ControlParams { get; set; } - public string ControlMessage { get; set; } + [JsonProperty("controlMessage")] public string ControlMessage { get; set; } } \ No newline at end of file diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs index 1338f13..1a53976 100644 --- a/MasstransferExporter/Program.cs +++ b/MasstransferExporter/Program.cs @@ -1,11 +1,10 @@ using MasstransferCommon.Events; using MasstransferCommon.Model.Entity; -using MasstransferCommon.Utils; +using MasstransferCommon.Scheduler; using MasstransferCommunicate.Mqtt.Client; -using MasstransferCommunicate.Mqtt.Model; using MasstransferCommunicate.Process.Client; using MasstransferExporter.Init; -using MasstransferExporter.RemoteControl.Model; +using MasstransferExporter.LogExporter; using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Mqtt.Model; @@ -36,6 +35,8 @@ class Program // 启动完成后,广播启动通知 EventBus.Publish(EventType.StartUp, true); + DelayScheduler.Delay(async () => { await LogFileExporter.ExportLogFile(); }, + TimeSpan.FromSeconds(5)); // 启动与主程序的通信 ProcessHelper.Init(); diff --git a/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs b/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs index e131d54..fa7361b 100644 --- a/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs +++ b/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs @@ -57,6 +57,8 @@ public class SqliteHelper /// 数据类型 public int Insert(T item) { + CreateTable(item!.GetType()); + var id = item?.GetType().GetProperty("Id"); if (id != null && id.CanWrite) id.SetValue(item, SnowFlakeNew.LongId.ToString()); diff --git a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs index 3517467..5113005 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs @@ -65,7 +65,7 @@ public class MessageQueueHelper /// /// /// - public static async Task Publish(string topic, object message, + public static async Task Publish(string topic, T message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) { try diff --git a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs index 6113c46..3132417 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs @@ -1,8 +1,10 @@ using System.Security.Cryptography.X509Certificates; using MasstransferCommon.Model.Constant; using MasstransferCommon.Utils; +using MasstransferCommunicate.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model; using MasstransferSecurity.Utils; +using Masuit.Tools.Systems; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; @@ -96,10 +98,10 @@ class MqttClient /// 发送消息 /// /// - /// + /// /// /// - public async Task Publish(string topic, object message, + public async Task Publish(string topic, T data, MqttQualityOfServiceLevel qos) { if (_client is not { IsConnected: true }) @@ -107,7 +109,14 @@ class MqttClient return false; } - var payload = message as string ?? JsonUtil.ToJson(message); + var message = new Payload() + { + MsgId = SnowFlakeNew.LongId.ToString(), + Data = data, + ConsumeTime = DateTime.Now.Ticks.ToString(), + }; + + var payload = JsonUtil.ToJson(message); var result = await _client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic)