From ae30c33f44b4cd15b0300b3393b05c46a09d1ec0 Mon Sep 17 00:00:00 2001 From: huangxianguo Date: Fri, 6 Sep 2024 15:08:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dmqtt=E6=8E=A5=E6=94=B6?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=E5=8F=8D=E5=BA=8F=E5=88=97=E5=8C=96?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Convert/DatetimeConverter.cs | 25 +++++++++++++++++++ MasstransferCommon/Utils/JsonUtil.cs | 23 ++++++++++++++--- .../DataExporter/ConfigService.cs | 6 ++--- .../DataExporter/FormulaService.cs | 2 +- .../DataExporter/SystemParamsService.cs | 16 ++++++------ .../LogExporter/OperationLogExporter.cs | 2 +- MasstransferExporter/Program.cs | 14 +++++++++-- .../RemoteControl/RemoteLockService.cs | 4 +-- .../StatExporter/SystemStatExporter.cs | 2 +- .../Mqtt/Client/MessageQueueHelper.cs | 17 ++++++++++--- .../Mqtt/Client/MqttClient.cs | 13 ++++++++-- .../Mqtt/Model/Payload.cs | 4 +-- 12 files changed, 98 insertions(+), 30 deletions(-) create mode 100644 MasstransferCommon/Convert/DatetimeConverter.cs diff --git a/MasstransferCommon/Convert/DatetimeConverter.cs b/MasstransferCommon/Convert/DatetimeConverter.cs new file mode 100644 index 0000000..5e76e12 --- /dev/null +++ b/MasstransferCommon/Convert/DatetimeConverter.cs @@ -0,0 +1,25 @@ +using Newtonsoft.Json; +using JsonSerializer = Newtonsoft.Json.JsonSerializer; + +namespace MasstransferCommon.Convert; + +public class DatetimeConverter : JsonConverter +{ + private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + public override void WriteJson(JsonWriter writer, DateTime value, JsonSerializer serializer) + { + long timestamp = (long)(value.ToUniversalTime() - UnixEpoch).TotalMilliseconds; + writer.WriteValue(timestamp); + } + + public override DateTime ReadJson(JsonReader reader, Type objectType, DateTime existingValue, bool hasExistingValue, + JsonSerializer serializer) + { + var value = reader.Value; + + return long.TryParse(value + "", out var timestamp) + ? UnixEpoch.AddMilliseconds(timestamp) + : UnixEpoch.AddMilliseconds((long)value); + } +} \ No newline at end of file diff --git a/MasstransferCommon/Utils/JsonUtil.cs b/MasstransferCommon/Utils/JsonUtil.cs index 4667ec4..1c9da47 100644 --- a/MasstransferCommon/Utils/JsonUtil.cs +++ b/MasstransferCommon/Utils/JsonUtil.cs @@ -1,4 +1,5 @@ -using Newtonsoft.Json; +using MasstransferCommon.Convert; +using Newtonsoft.Json; namespace MasstransferCommon.Utils; @@ -8,7 +9,13 @@ public class JsonUtil { try { - return JsonConvert.SerializeObject(obj); + var settings = new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + }; + settings.Converters.Add(new DatetimeConverter()); + + return JsonConvert.SerializeObject(obj, settings); } catch (Exception e) { @@ -34,8 +41,10 @@ public class JsonUtil { var settings = new JsonSerializerSettings { - NullValueHandling = NullValueHandling.Ignore + NullValueHandling = NullValueHandling.Ignore, }; + settings.Converters.Add(new DatetimeConverter()); + return JsonConvert.DeserializeObject(json, settings); } catch (Exception e) @@ -48,7 +57,13 @@ public class JsonUtil { try { - return JsonConvert.DeserializeObject(json, type); + var settings = new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + }; + settings.Converters.Add(new DatetimeConverter()); + + return JsonConvert.DeserializeObject(json, type, settings); } catch (Exception e) { diff --git a/MasstransferExporter/DataExporter/ConfigService.cs b/MasstransferExporter/DataExporter/ConfigService.cs index 045bf30..ca228ef 100644 --- a/MasstransferExporter/DataExporter/ConfigService.cs +++ b/MasstransferExporter/DataExporter/ConfigService.cs @@ -18,7 +18,7 @@ public class ConfigService : Instant /// private static async Task ListenConfigIssuedEvent(EventType type, bool start) { - await MessageQueueHelper.Subscribe(Topics.DownloadConfigData, HandleConfigIssuedEvent); + // await MessageQueueHelper.Subscribe(Topics.DownloadConfigData, HandleConfigIssuedEvent); } @@ -38,10 +38,10 @@ public class ConfigService : Instant var type = data.Type; switch (type) { - case "system": + case "SYSTEM": SystemParamsService.HandleSystemParamsIssuedEvent(data); break; - case "formula": + case "FORMULA": { var param = data.Param; var formulaDto = JsonUtil.FromJson(JsonUtil.ToJson(param)); diff --git a/MasstransferExporter/DataExporter/FormulaService.cs b/MasstransferExporter/DataExporter/FormulaService.cs index ca8da78..68c94a4 100644 --- a/MasstransferExporter/DataExporter/FormulaService.cs +++ b/MasstransferExporter/DataExporter/FormulaService.cs @@ -257,7 +257,7 @@ public class FormulaService { Name = formulaDto.Name, Param = formulaDto, - Type = "formula" + Type = "FORMULA" }; await MessageQueueHelper.Publish(Topics.ReportConfigData, data); diff --git a/MasstransferExporter/DataExporter/SystemParamsService.cs b/MasstransferExporter/DataExporter/SystemParamsService.cs index a185d7a..5459833 100644 --- a/MasstransferExporter/DataExporter/SystemParamsService.cs +++ b/MasstransferExporter/DataExporter/SystemParamsService.cs @@ -129,55 +129,55 @@ public class SystemParamsService { Name = "相机配置", Param = GetCameraParams(), - Type = "system" + Type = "SYSTEM" }; var substrateCameraInternalParams = new ConfigData() { Name = "基板拍照相机内参", Param = GetSubstrateParams(), - Type = "system" + Type = "SYSTEM" }; var waferCameraInternalParams = new ConfigData() { Name = "芯片拍照相机内参", Param = GetWaferParams(), - Type = "system" + Type = "SYSTEM" }; var logParams = new ConfigData() { Name = "日志配置", Param = GetLogParams(), - Type = "system" + Type = "SYSTEM" }; var systemParams = new ConfigData() { Name = "系统配置", Param = GetSystemParams(), - Type = "system" + Type = "SYSTEM" }; var scannerParams = new ConfigData() { Name = "扫描器配置", Param = GetScannerParams(), - Type = "system" + Type = "SYSTEM" }; var minioParams = new ConfigData() { Name = "MinIO配置", Param = GetMinioParams(), - Type = "system" + Type = "SYSTEM" }; var mqttParams = new ConfigData() { Name = "MQTT配置", Param = GetMqttConnectParams(), - Type = "system" + Type = "SYSTEM" }; await MessageQueueHelper.Publish(Topics.ReportConfigData, cameraParams); diff --git a/MasstransferExporter/LogExporter/OperationLogExporter.cs b/MasstransferExporter/LogExporter/OperationLogExporter.cs index b19558b..29e4f99 100644 --- a/MasstransferExporter/LogExporter/OperationLogExporter.cs +++ b/MasstransferExporter/LogExporter/OperationLogExporter.cs @@ -35,7 +35,7 @@ public class OperationLogExporter ControlResult = log.Exception.Equals("null") ? "成功" : "异常", ControlTimestamp = log.OperateTime, ControlType = log.Action, - UserName = log.UserName + UserName = log.UserName ?? "admin" }; await MessageQueueHelper.Publish(Topics.ReportOperationLog, data); diff --git a/MasstransferExporter/Program.cs b/MasstransferExporter/Program.cs index 1a53976..334e5ba 100644 --- a/MasstransferExporter/Program.cs +++ b/MasstransferExporter/Program.cs @@ -1,10 +1,17 @@ using MasstransferCommon.Events; using MasstransferCommon.Model.Entity; using MasstransferCommon.Scheduler; +using MasstransferCommon.Utils; using MasstransferCommunicate.Mqtt.Client; +using MasstransferCommunicate.Mqtt.Model; using MasstransferCommunicate.Process.Client; +using MasstransferExporter.DataExporter; using MasstransferExporter.Init; using MasstransferExporter.LogExporter; +using MasstransferExporter.LogExporter.Model; +using MasstransferExporter.RemoteControl.Model; +using MasstransferExporter.Stat; +using MasstransferExporter.StatExporter; using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Mqtt.Model; @@ -35,12 +42,15 @@ class Program // 启动完成后,广播启动通知 EventBus.Publish(EventType.StartUp, true); - DelayScheduler.Delay(async () => { await LogFileExporter.ExportLogFile(); }, - TimeSpan.FromSeconds(5)); + // DelayScheduler.Delay(async () => await FormulaService.FormulaDataExporter(), + // TimeSpan.FromSeconds(5)); // 启动与主程序的通信 ProcessHelper.Init(); + // SystemStatExporter.Collect(); + + Console.WriteLine("按任意键退出"); Console.ReadKey(); diff --git a/MasstransferExporter/RemoteControl/RemoteLockService.cs b/MasstransferExporter/RemoteControl/RemoteLockService.cs index d9deb39..25a53bc 100644 --- a/MasstransferExporter/RemoteControl/RemoteLockService.cs +++ b/MasstransferExporter/RemoteControl/RemoteLockService.cs @@ -28,10 +28,8 @@ public class RemoteLockService : Instant /// /// 处理接收到锁机业务指令 /// - private static void HandleLockCmd(string topic, string payload) + private static void HandleLockCmd(string topic, LockCmd? cmd) { - var cmd = JsonUtil.FromJson(payload); - if (cmd == null) return; var action = cmd.Action; diff --git a/MasstransferExporter/StatExporter/SystemStatExporter.cs b/MasstransferExporter/StatExporter/SystemStatExporter.cs index b184129..29056bf 100644 --- a/MasstransferExporter/StatExporter/SystemStatExporter.cs +++ b/MasstransferExporter/StatExporter/SystemStatExporter.cs @@ -4,7 +4,7 @@ using MasstransferCommon.Utils; using MasstransferExporter.Stat.DTO; using Serilog; -namespace MasstransferExporter.Stat; +namespace MasstransferExporter.StatExporter; /// /// 系统状态统计信息 diff --git a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs index 5113005..9ae2c51 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MessageQueueHelper.cs @@ -1,4 +1,5 @@ -using MasstransferCommon.Model.Entity; +using System.Reflection; +using MasstransferCommon.Model.Entity; using MasstransferCommon.Utils; using MasstransferCommunicate.Mqtt.Model; using MasstransferInfrastructure.Database.Sqlite; @@ -105,6 +106,8 @@ public class MessageQueueHelper var topic = applicationMessage.Topic; var message = applicationMessage.ConvertPayloadToString(); + Console.WriteLine($"收到消息:{topic} - {message}"); + if (!Subscribers.TryGetValue(topic, out var subscribers)) return; foreach (var subscriber in subscribers) @@ -114,12 +117,20 @@ public class MessageQueueHelper var methodInfo = subscriber.Method; var parameters = methodInfo.GetParameters(); if (parameters.Length != 2) continue; - var payload = JsonUtil.FromJson>(message); + + var dataType = parameters[1].ParameterType; + + var type = typeof(Payload<>).MakeGenericType(dataType); + + var payload = JsonUtil.FromJson(type, message); if (payload == null) continue; + var property = type.GetProperty("Data"); + + var data = property?.GetValue(payload); // 通知订阅者 - subscriber.DynamicInvoke(topic, JsonUtil.ToJson(payload.Data)); + subscriber.DynamicInvoke(topic, data); } catch (Exception exception) { diff --git a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs index 3132417..5b659e5 100644 --- a/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs +++ b/MasstransferInfrastructure/Mqtt/Client/MqttClient.cs @@ -17,6 +17,8 @@ class MqttClient public event EventHandler MessageReceived; + private static readonly DateTime Epoch = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc); + public bool IsConnected => _client is { IsConnected: true }; /// @@ -111,13 +113,15 @@ class MqttClient var message = new Payload() { - MsgId = SnowFlakeNew.LongId.ToString(), + MsgId = GetCurrentTimestamp(), Data = data, - ConsumeTime = DateTime.Now.Ticks.ToString(), + ConsumeTime = DateTime.Now, }; var payload = JsonUtil.ToJson(message); + Console.WriteLine(payload); + var result = await _client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) @@ -127,6 +131,11 @@ class MqttClient return result.IsSuccess; } + private static long GetCurrentTimestamp() + { + return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds; + } + /// /// 订阅主题 /// diff --git a/MasstransferInfrastructure/Mqtt/Model/Payload.cs b/MasstransferInfrastructure/Mqtt/Model/Payload.cs index 55e31b5..4271ca7 100644 --- a/MasstransferInfrastructure/Mqtt/Model/Payload.cs +++ b/MasstransferInfrastructure/Mqtt/Model/Payload.cs @@ -4,9 +4,9 @@ namespace MasstransferCommunicate.Mqtt.Model; public class Payload { - [JsonProperty("msgId")] public string MsgId { get; set; } + [JsonProperty("msgId")] public long MsgId { get; set; } - [JsonProperty("consumeTime")] public string ConsumeTime { get; set; } + [JsonProperty("consumeTime")] public DateTime ConsumeTime { get; set; } [JsonProperty("data")] public T Data { get; set; } } \ No newline at end of file