调整了远程锁机和解锁接口

This commit is contained in:
huangxianguo 2024-09-05 21:25:51 +08:00
parent 5710bf9005
commit e54f601278
8 changed files with 165 additions and 29 deletions

View File

@ -1,22 +1,42 @@
namespace MasstransferCommon.Scheduler; using Serilog;
namespace MasstransferCommon.Scheduler;
/// <summary> /// <summary>
/// 延时定时任务 /// 延时定时任务
/// </summary> /// </summary>
public class DelayScheduler public class DelayScheduler
{ {
private Timer _timer; /// <summary>
private Action _action; /// 设定延时任务
/// </summary>
public void Schedule(Action action, TimeSpan delay) /// <param name="action"></param>
/// <param name="delay"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public static async void Delay(Action action, TimeSpan delay, CancellationToken cancellationToken = default)
{ {
_action = action; try
_timer = new Timer(TimerCallback, null, delay, Timeout.InfiniteTimeSpan); {
if (action == null) throw new ArgumentNullException(nameof(action));
if (delay.TotalMilliseconds < 0)
throw new ArgumentOutOfRangeException(nameof(delay), "延时时间不能为负数");
await Task.Delay(delay, cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
return;
} }
private void TimerCallback(object? state) action();
}
catch (Exception e)
{ {
_timer?.Dispose(); if (e is not TaskCanceledException)
_action?.Invoke(); {
Log.Error(e, "延时任务执行失败");
}
}
} }
} }

View File

@ -1,5 +1,4 @@
using System.Text.Json; using Newtonsoft.Json;
using Newtonsoft.Json;
namespace MasstransferCommon.Utils; namespace MasstransferCommon.Utils;

View File

@ -0,0 +1,103 @@
namespace MasstransferCommon.Utils;
using System;
public class SnowflakeId
{
private static readonly DateTime Epoch = new DateTime(2020, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private const int WorkerIdBits = 5;
private const int DatacenterIdBits = 5;
private const int SequenceBits = 12;
private const long MaxWorkerId = -1L ^ (-1L << WorkerIdBits);
private const long MaxDatacenterId = -1L ^ (-1L << DatacenterIdBits);
private const long WorkerIdShift = SequenceBits;
private const long DatacenterIdShift = SequenceBits + WorkerIdBits;
private const long TimestampLeftShift = SequenceBits + WorkerIdBits + DatacenterIdBits;
private const long SequenceMask = -1L ^ (-1L << SequenceBits);
private readonly object _lock = new object();
private long _lastTimestamp = -1L;
private long _sequence = 0L;
private static readonly SnowflakeId Instance = new SnowflakeId(0, 0);
public long WorkerId { get; }
public long DatacenterId { get; }
public SnowflakeId(long workerId, long datacenterId)
{
if (workerId > MaxWorkerId || workerId < 0)
{
throw new ArgumentException($"workerId must be between 0 and {MaxWorkerId}");
}
if (datacenterId > MaxDatacenterId || datacenterId < 0)
{
throw new ArgumentException($"datacenterId must be between 0 and {MaxDatacenterId}");
}
WorkerId = workerId;
DatacenterId = datacenterId;
}
public static string GetNextId()
{
return Instance.NextId().ToString();
}
private long NextId()
{
lock (_lock)
{
var timestamp = TimeGen();
if (timestamp < _lastTimestamp)
{
throw new InvalidOperationException("Clock moved backwards. Refusing to generate id");
}
if (_lastTimestamp == timestamp)
{
_sequence = (_sequence + 1) & SequenceMask;
if (_sequence == 0)
{
timestamp = TilNextMillis(_lastTimestamp);
}
}
else
{
_sequence = 0L;
}
_lastTimestamp = timestamp;
return ((timestamp - EpochTicks()) << (int)TimestampLeftShift) |
(DatacenterId << (int)DatacenterIdShift) |
(WorkerId << (int)WorkerIdShift) |
_sequence;
}
}
private long TilNextMillis(long lastTimestamp)
{
var timestamp = TimeGen();
while (timestamp <= lastTimestamp)
{
timestamp = TimeGen();
}
return timestamp;
}
private long TimeGen()
{
return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds;
}
private static long EpochTicks()
{
return (long)(Epoch - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
}
}

View File

@ -1,21 +1,23 @@
namespace MasstransferExporter.LogExporter.Model; using Newtonsoft.Json;
namespace MasstransferExporter.LogExporter.Model;
/// <summary> /// <summary>
/// 用户操作日志 /// 用户操作日志
/// </summary> /// </summary>
public class OperationLogData public class OperationLogData
{ {
public string UserName { get; set; } [JsonProperty("userName")] public string UserName { get; set; }
public DateTime ControlTimestamp { get; set; } [JsonProperty("controlTimestamp")] public DateTime ControlTimestamp { get; set; }
public string ControlType { get; set; } [JsonProperty("controlType")] public string ControlType { get; set; }
public string ControlResult { get; set; } [JsonProperty("controlResult")] public string ControlResult { get; set; }
public string ControlTarget { get; set; } [JsonProperty("controlTarget")] public string ControlTarget { get; set; }
public string ControlParams { get; set; } [JsonProperty("controlParams")] public string ControlParams { get; set; }
public string ControlMessage { get; set; } [JsonProperty("controlMessage")] public string ControlMessage { get; set; }
} }

View File

@ -1,11 +1,10 @@
using MasstransferCommon.Events; using MasstransferCommon.Events;
using MasstransferCommon.Model.Entity; using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils; using MasstransferCommon.Scheduler;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Mqtt.Model;
using MasstransferCommunicate.Process.Client; using MasstransferCommunicate.Process.Client;
using MasstransferExporter.Init; using MasstransferExporter.Init;
using MasstransferExporter.RemoteControl.Model; using MasstransferExporter.LogExporter;
using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
@ -36,6 +35,8 @@ class Program
// 启动完成后,广播启动通知 // 启动完成后,广播启动通知
EventBus<bool>.Publish(EventType.StartUp, true); EventBus<bool>.Publish(EventType.StartUp, true);
DelayScheduler.Delay(async () => { await LogFileExporter.ExportLogFile(); },
TimeSpan.FromSeconds(5));
// 启动与主程序的通信 // 启动与主程序的通信
ProcessHelper.Init(); ProcessHelper.Init();

View File

@ -57,6 +57,8 @@ public class SqliteHelper
/// <typeparam name="T">数据类型</typeparam> /// <typeparam name="T">数据类型</typeparam>
public int Insert<T>(T item) public int Insert<T>(T item)
{ {
CreateTable(item!.GetType());
var id = item?.GetType().GetProperty("Id"); var id = item?.GetType().GetProperty("Id");
if (id != null && id.CanWrite) id.SetValue(item, SnowFlakeNew.LongId.ToString()); if (id != null && id.CanWrite) id.SetValue(item, SnowFlakeNew.LongId.ToString());

View File

@ -65,7 +65,7 @@ public class MessageQueueHelper
/// <param name="topic"></param> /// <param name="topic"></param>
/// <param name="message"></param> /// <param name="message"></param>
/// <param name="qos"></param> /// <param name="qos"></param>
public static async Task<bool> Publish(string topic, object message, public static async Task<bool> Publish<T>(string topic, T message,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{ {
try try

View File

@ -1,8 +1,10 @@
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using MasstransferCommon.Model.Constant; using MasstransferCommon.Model.Constant;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Model;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
using MasstransferSecurity.Utils; using MasstransferSecurity.Utils;
using Masuit.Tools.Systems;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Protocol; using MQTTnet.Protocol;
@ -96,10 +98,10 @@ class MqttClient
/// 发送消息 /// 发送消息
/// </summary> /// </summary>
/// <param name="topic"></param> /// <param name="topic"></param>
/// <param name="message"></param> /// <param name="data"></param>
/// <param name="qos"></param> /// <param name="qos"></param>
/// <returns></returns> /// <returns></returns>
public async Task<bool> Publish(string topic, object message, public async Task<bool> Publish<T>(string topic, T data,
MqttQualityOfServiceLevel qos) MqttQualityOfServiceLevel qos)
{ {
if (_client is not { IsConnected: true }) if (_client is not { IsConnected: true })
@ -107,7 +109,14 @@ class MqttClient
return false; return false;
} }
var payload = message as string ?? JsonUtil.ToJson(message); var message = new Payload<T>()
{
MsgId = SnowFlakeNew.LongId.ToString(),
Data = data,
ConsumeTime = DateTime.Now.Ticks.ToString(),
};
var payload = JsonUtil.ToJson(message);
var result = await _client.PublishAsync(new MqttApplicationMessageBuilder() var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)