diff --git a/MasstransferCommon/Scheduler/DelayScheduler.cs b/MasstransferCommon/Scheduler/DelayScheduler.cs
index 5a20278..8864e47 100644
--- a/MasstransferCommon/Scheduler/DelayScheduler.cs
+++ b/MasstransferCommon/Scheduler/DelayScheduler.cs
@@ -1,22 +1,42 @@
-namespace MasstransferCommon.Scheduler;
+using Serilog;
+
+namespace MasstransferCommon.Scheduler;
///
/// 延时定时任务
///
public class DelayScheduler
{
- private Timer _timer;
- private Action _action;
-
- public void Schedule(Action action, TimeSpan delay)
+ ///
+ /// 设定延时任务
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static async void Delay(Action action, TimeSpan delay, CancellationToken cancellationToken = default)
{
- _action = action;
- _timer = new Timer(TimerCallback, null, delay, Timeout.InfiniteTimeSpan);
- }
+ try
+ {
+ if (action == null) throw new ArgumentNullException(nameof(action));
+ if (delay.TotalMilliseconds < 0)
+ throw new ArgumentOutOfRangeException(nameof(delay), "延时时间不能为负数");
- private void TimerCallback(object? state)
- {
- _timer?.Dispose();
- _action?.Invoke();
+ await Task.Delay(delay, cancellationToken);
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return;
+ }
+
+ action();
+ }
+ catch (Exception e)
+ {
+ if (e is not TaskCanceledException)
+ {
+ Log.Error(e, "延时任务执行失败");
+ }
+ }
}
}
\ No newline at end of file
diff --git a/MasstransferCommon/Utils/JsonUtil.cs b/MasstransferCommon/Utils/JsonUtil.cs
index 71e4cd6..4667ec4 100644
--- a/MasstransferCommon/Utils/JsonUtil.cs
+++ b/MasstransferCommon/Utils/JsonUtil.cs
@@ -1,5 +1,4 @@
-using System.Text.Json;
-using Newtonsoft.Json;
+using Newtonsoft.Json;
namespace MasstransferCommon.Utils;
diff --git a/MasstransferCommon/Utils/SnowflakeId.cs b/MasstransferCommon/Utils/SnowflakeId.cs
new file mode 100644
index 0000000..7ced0e0
--- /dev/null
+++ b/MasstransferCommon/Utils/SnowflakeId.cs
@@ -0,0 +1,103 @@
+namespace MasstransferCommon.Utils;
+
+using System;
+
+public class SnowflakeId
+{
+ private static readonly DateTime Epoch = new DateTime(2020, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ private const int WorkerIdBits = 5;
+ private const int DatacenterIdBits = 5;
+ private const int SequenceBits = 12;
+
+ private const long MaxWorkerId = -1L ^ (-1L << WorkerIdBits);
+ private const long MaxDatacenterId = -1L ^ (-1L << DatacenterIdBits);
+
+ private const long WorkerIdShift = SequenceBits;
+ private const long DatacenterIdShift = SequenceBits + WorkerIdBits;
+ private const long TimestampLeftShift = SequenceBits + WorkerIdBits + DatacenterIdBits;
+ private const long SequenceMask = -1L ^ (-1L << SequenceBits);
+
+ private readonly object _lock = new object();
+ private long _lastTimestamp = -1L;
+ private long _sequence = 0L;
+
+ private static readonly SnowflakeId Instance = new SnowflakeId(0, 0);
+
+ public long WorkerId { get; }
+ public long DatacenterId { get; }
+
+ public SnowflakeId(long workerId, long datacenterId)
+ {
+ if (workerId > MaxWorkerId || workerId < 0)
+ {
+ throw new ArgumentException($"workerId must be between 0 and {MaxWorkerId}");
+ }
+
+ if (datacenterId > MaxDatacenterId || datacenterId < 0)
+ {
+ throw new ArgumentException($"datacenterId must be between 0 and {MaxDatacenterId}");
+ }
+
+ WorkerId = workerId;
+ DatacenterId = datacenterId;
+ }
+
+ public static string GetNextId()
+ {
+ return Instance.NextId().ToString();
+ }
+
+ private long NextId()
+ {
+ lock (_lock)
+ {
+ var timestamp = TimeGen();
+
+ if (timestamp < _lastTimestamp)
+ {
+ throw new InvalidOperationException("Clock moved backwards. Refusing to generate id");
+ }
+
+ if (_lastTimestamp == timestamp)
+ {
+ _sequence = (_sequence + 1) & SequenceMask;
+ if (_sequence == 0)
+ {
+ timestamp = TilNextMillis(_lastTimestamp);
+ }
+ }
+ else
+ {
+ _sequence = 0L;
+ }
+
+ _lastTimestamp = timestamp;
+
+ return ((timestamp - EpochTicks()) << (int)TimestampLeftShift) |
+ (DatacenterId << (int)DatacenterIdShift) |
+ (WorkerId << (int)WorkerIdShift) |
+ _sequence;
+ }
+ }
+
+ private long TilNextMillis(long lastTimestamp)
+ {
+ var timestamp = TimeGen();
+ while (timestamp <= lastTimestamp)
+ {
+ timestamp = TimeGen();
+ }
+
+ return timestamp;
+ }
+
+ private long TimeGen()
+ {
+ return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds;
+ }
+
+ private static long EpochTicks()
+ {
+ return (long)(Epoch - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
+ }
+}
\ No newline at end of file
diff --git a/MasstransferExporter/LogExporter/Model/OperationLogData.cs b/MasstransferExporter/LogExporter/Model/OperationLogData.cs
index 9cf58ac..1449ff9 100644
--- a/MasstransferExporter/LogExporter/Model/OperationLogData.cs
+++ b/MasstransferExporter/LogExporter/Model/OperationLogData.cs
@@ -1,21 +1,23 @@
-namespace MasstransferExporter.LogExporter.Model;
+using Newtonsoft.Json;
+
+namespace MasstransferExporter.LogExporter.Model;
///
/// 用户操作日志
///
public class OperationLogData
{
- public string UserName { get; set; }
+ [JsonProperty("userName")] public string UserName { get; set; }
- public DateTime ControlTimestamp { get; set; }
+ [JsonProperty("controlTimestamp")] public DateTime ControlTimestamp { get; set; }
- public string ControlType { get; set; }
+ [JsonProperty("controlType")] public string ControlType { get; set; }
- public string ControlResult { get; set; }
+ [JsonProperty("controlResult")] public string ControlResult { get; set; }
- public string ControlTarget { get; set; }
+ [JsonProperty("controlTarget")] public string ControlTarget { get; set; }
- public string ControlParams { get; set; }
+ [JsonProperty("controlParams")] public string ControlParams { get; set; }
- public string ControlMessage { get; set; }
+ [JsonProperty("controlMessage")] public string ControlMessage { get; set; }
}
\ No newline at end of file
diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs
index 1338f13..1a53976 100644
--- a/MasstransferExporter/Program.cs
+++ b/MasstransferExporter/Program.cs
@@ -1,11 +1,10 @@
using MasstransferCommon.Events;
using MasstransferCommon.Model.Entity;
-using MasstransferCommon.Utils;
+using MasstransferCommon.Scheduler;
using MasstransferCommunicate.Mqtt.Client;
-using MasstransferCommunicate.Mqtt.Model;
using MasstransferCommunicate.Process.Client;
using MasstransferExporter.Init;
-using MasstransferExporter.RemoteControl.Model;
+using MasstransferExporter.LogExporter;
using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model;
@@ -36,6 +35,8 @@ class Program
// 启动完成后,广播启动通知
EventBus.Publish(EventType.StartUp, true);
+ DelayScheduler.Delay(async () => { await LogFileExporter.ExportLogFile(); },
+ TimeSpan.FromSeconds(5));
// 启动与主程序的通信
ProcessHelper.Init();
diff --git a/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs b/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs
index e131d54..fa7361b 100644
--- a/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs
+++ b/MasstransferInfrastructure/Database/Sqlite/SqliteHelper.cs
@@ -57,6 +57,8 @@ public class SqliteHelper
/// 数据类型
public int Insert(T item)
{
+ CreateTable(item!.GetType());
+
var id = item?.GetType().GetProperty("Id");
if (id != null && id.CanWrite) id.SetValue(item, SnowFlakeNew.LongId.ToString());
diff --git a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs
index 3517467..5113005 100644
--- a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs
+++ b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs
@@ -65,7 +65,7 @@ public class MessageQueueHelper
///
///
///
- public static async Task Publish(string topic, object message,
+ public static async Task Publish(string topic, T message,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
try
diff --git a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs
index 6113c46..3132417 100644
--- a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs
+++ b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs
@@ -1,8 +1,10 @@
using System.Security.Cryptography.X509Certificates;
using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils;
+using MasstransferCommunicate.Mqtt.Model;
using MasstransferInfrastructure.Mqtt.Model;
using MasstransferSecurity.Utils;
+using Masuit.Tools.Systems;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
@@ -96,10 +98,10 @@ class MqttClient
/// 发送消息
///
///
- ///
+ ///
///
///
- public async Task Publish(string topic, object message,
+ public async Task Publish(string topic, T data,
MqttQualityOfServiceLevel qos)
{
if (_client is not { IsConnected: true })
@@ -107,7 +109,14 @@ class MqttClient
return false;
}
- var payload = message as string ?? JsonUtil.ToJson(message);
+ var message = new Payload()
+ {
+ MsgId = SnowFlakeNew.LongId.ToString(),
+ Data = data,
+ ConsumeTime = DateTime.Now.Ticks.ToString(),
+ };
+
+ var payload = JsonUtil.ToJson(message);
var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)