修复mqtt接收的消息反序列化异常问题

This commit is contained in:
huangxianguo 2024-09-06 15:08:32 +08:00
parent e54f601278
commit ae30c33f44
12 changed files with 98 additions and 30 deletions

View File

@ -0,0 +1,25 @@
using Newtonsoft.Json;
using JsonSerializer = Newtonsoft.Json.JsonSerializer;
namespace MasstransferCommon.Convert;
public class DatetimeConverter : JsonConverter<DateTime>
{
private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
public override void WriteJson(JsonWriter writer, DateTime value, JsonSerializer serializer)
{
long timestamp = (long)(value.ToUniversalTime() - UnixEpoch).TotalMilliseconds;
writer.WriteValue(timestamp);
}
public override DateTime ReadJson(JsonReader reader, Type objectType, DateTime existingValue, bool hasExistingValue,
JsonSerializer serializer)
{
var value = reader.Value;
return long.TryParse(value + "", out var timestamp)
? UnixEpoch.AddMilliseconds(timestamp)
: UnixEpoch.AddMilliseconds((long)value);
}
}

View File

@ -1,4 +1,5 @@
using Newtonsoft.Json; using MasstransferCommon.Convert;
using Newtonsoft.Json;
namespace MasstransferCommon.Utils; namespace MasstransferCommon.Utils;
@ -8,7 +9,13 @@ public class JsonUtil
{ {
try try
{ {
return JsonConvert.SerializeObject(obj); var settings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
};
settings.Converters.Add(new DatetimeConverter());
return JsonConvert.SerializeObject(obj, settings);
} }
catch (Exception e) catch (Exception e)
{ {
@ -34,8 +41,10 @@ public class JsonUtil
{ {
var settings = new JsonSerializerSettings var settings = new JsonSerializerSettings
{ {
NullValueHandling = NullValueHandling.Ignore NullValueHandling = NullValueHandling.Ignore,
}; };
settings.Converters.Add(new DatetimeConverter());
return JsonConvert.DeserializeObject<T>(json, settings); return JsonConvert.DeserializeObject<T>(json, settings);
} }
catch (Exception e) catch (Exception e)
@ -48,7 +57,13 @@ public class JsonUtil
{ {
try try
{ {
return JsonConvert.DeserializeObject(json, type); var settings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
};
settings.Converters.Add(new DatetimeConverter());
return JsonConvert.DeserializeObject(json, type, settings);
} }
catch (Exception e) catch (Exception e)
{ {

View File

@ -18,7 +18,7 @@ public class ConfigService : Instant
/// </summary> /// </summary>
private static async Task ListenConfigIssuedEvent(EventType type, bool start) private static async Task ListenConfigIssuedEvent(EventType type, bool start)
{ {
await MessageQueueHelper.Subscribe(Topics.DownloadConfigData, HandleConfigIssuedEvent); // await MessageQueueHelper.Subscribe(Topics.DownloadConfigData, HandleConfigIssuedEvent);
} }
@ -38,10 +38,10 @@ public class ConfigService : Instant
var type = data.Type; var type = data.Type;
switch (type) switch (type)
{ {
case "system": case "SYSTEM":
SystemParamsService.HandleSystemParamsIssuedEvent(data); SystemParamsService.HandleSystemParamsIssuedEvent(data);
break; break;
case "formula": case "FORMULA":
{ {
var param = data.Param; var param = data.Param;
var formulaDto = JsonUtil.FromJson<FormulaDTO>(JsonUtil.ToJson(param)); var formulaDto = JsonUtil.FromJson<FormulaDTO>(JsonUtil.ToJson(param));

View File

@ -257,7 +257,7 @@ public class FormulaService
{ {
Name = formulaDto.Name, Name = formulaDto.Name,
Param = formulaDto, Param = formulaDto,
Type = "formula" Type = "FORMULA"
}; };
await MessageQueueHelper.Publish(Topics.ReportConfigData, data); await MessageQueueHelper.Publish(Topics.ReportConfigData, data);

View File

@ -129,55 +129,55 @@ public class SystemParamsService
{ {
Name = "相机配置", Name = "相机配置",
Param = GetCameraParams(), Param = GetCameraParams(),
Type = "system" Type = "SYSTEM"
}; };
var substrateCameraInternalParams = new ConfigData() var substrateCameraInternalParams = new ConfigData()
{ {
Name = "基板拍照相机内参", Name = "基板拍照相机内参",
Param = GetSubstrateParams(), Param = GetSubstrateParams(),
Type = "system" Type = "SYSTEM"
}; };
var waferCameraInternalParams = new ConfigData() var waferCameraInternalParams = new ConfigData()
{ {
Name = "芯片拍照相机内参", Name = "芯片拍照相机内参",
Param = GetWaferParams(), Param = GetWaferParams(),
Type = "system" Type = "SYSTEM"
}; };
var logParams = new ConfigData() var logParams = new ConfigData()
{ {
Name = "日志配置", Name = "日志配置",
Param = GetLogParams(), Param = GetLogParams(),
Type = "system" Type = "SYSTEM"
}; };
var systemParams = new ConfigData() var systemParams = new ConfigData()
{ {
Name = "系统配置", Name = "系统配置",
Param = GetSystemParams(), Param = GetSystemParams(),
Type = "system" Type = "SYSTEM"
}; };
var scannerParams = new ConfigData() var scannerParams = new ConfigData()
{ {
Name = "扫描器配置", Name = "扫描器配置",
Param = GetScannerParams(), Param = GetScannerParams(),
Type = "system" Type = "SYSTEM"
}; };
var minioParams = new ConfigData() var minioParams = new ConfigData()
{ {
Name = "MinIO配置", Name = "MinIO配置",
Param = GetMinioParams(), Param = GetMinioParams(),
Type = "system" Type = "SYSTEM"
}; };
var mqttParams = new ConfigData() var mqttParams = new ConfigData()
{ {
Name = "MQTT配置", Name = "MQTT配置",
Param = GetMqttConnectParams(), Param = GetMqttConnectParams(),
Type = "system" Type = "SYSTEM"
}; };
await MessageQueueHelper.Publish(Topics.ReportConfigData, cameraParams); await MessageQueueHelper.Publish(Topics.ReportConfigData, cameraParams);

View File

@ -35,7 +35,7 @@ public class OperationLogExporter
ControlResult = log.Exception.Equals("null") ? "成功" : "异常", ControlResult = log.Exception.Equals("null") ? "成功" : "异常",
ControlTimestamp = log.OperateTime, ControlTimestamp = log.OperateTime,
ControlType = log.Action, ControlType = log.Action,
UserName = log.UserName UserName = log.UserName ?? "admin"
}; };
await MessageQueueHelper.Publish(Topics.ReportOperationLog, data); await MessageQueueHelper.Publish(Topics.ReportOperationLog, data);

View File

@ -1,10 +1,17 @@
using MasstransferCommon.Events; using MasstransferCommon.Events;
using MasstransferCommon.Model.Entity; using MasstransferCommon.Model.Entity;
using MasstransferCommon.Scheduler; using MasstransferCommon.Scheduler;
using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Client; using MasstransferCommunicate.Mqtt.Client;
using MasstransferCommunicate.Mqtt.Model;
using MasstransferCommunicate.Process.Client; using MasstransferCommunicate.Process.Client;
using MasstransferExporter.DataExporter;
using MasstransferExporter.Init; using MasstransferExporter.Init;
using MasstransferExporter.LogExporter; using MasstransferExporter.LogExporter;
using MasstransferExporter.LogExporter.Model;
using MasstransferExporter.RemoteControl.Model;
using MasstransferExporter.Stat;
using MasstransferExporter.StatExporter;
using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Database.Sqlite;
using MasstransferInfrastructure.Mqtt.Model; using MasstransferInfrastructure.Mqtt.Model;
@ -35,12 +42,15 @@ class Program
// 启动完成后,广播启动通知 // 启动完成后,广播启动通知
EventBus<bool>.Publish(EventType.StartUp, true); EventBus<bool>.Publish(EventType.StartUp, true);
DelayScheduler.Delay(async () => { await LogFileExporter.ExportLogFile(); }, // DelayScheduler.Delay(async () => await FormulaService.FormulaDataExporter(),
TimeSpan.FromSeconds(5)); // TimeSpan.FromSeconds(5));
// 启动与主程序的通信 // 启动与主程序的通信
ProcessHelper.Init(); ProcessHelper.Init();
// SystemStatExporter.Collect();
Console.WriteLine("按任意键退出"); Console.WriteLine("按任意键退出");
Console.ReadKey(); Console.ReadKey();

View File

@ -28,10 +28,8 @@ public class RemoteLockService : Instant
/// <summary> /// <summary>
/// 处理接收到锁机业务指令 /// 处理接收到锁机业务指令
/// </summary> /// </summary>
private static void HandleLockCmd(string topic, string payload) private static void HandleLockCmd(string topic, LockCmd? cmd)
{ {
var cmd = JsonUtil.FromJson<LockCmd>(payload);
if (cmd == null) return; if (cmd == null) return;
var action = cmd.Action; var action = cmd.Action;

View File

@ -4,7 +4,7 @@ using MasstransferCommon.Utils;
using MasstransferExporter.Stat.DTO; using MasstransferExporter.Stat.DTO;
using Serilog; using Serilog;
namespace MasstransferExporter.Stat; namespace MasstransferExporter.StatExporter;
/// <summary> /// <summary>
/// 系统状态统计信息 /// 系统状态统计信息

View File

@ -1,4 +1,5 @@
using MasstransferCommon.Model.Entity; using System.Reflection;
using MasstransferCommon.Model.Entity;
using MasstransferCommon.Utils; using MasstransferCommon.Utils;
using MasstransferCommunicate.Mqtt.Model; using MasstransferCommunicate.Mqtt.Model;
using MasstransferInfrastructure.Database.Sqlite; using MasstransferInfrastructure.Database.Sqlite;
@ -105,6 +106,8 @@ public class MessageQueueHelper
var topic = applicationMessage.Topic; var topic = applicationMessage.Topic;
var message = applicationMessage.ConvertPayloadToString(); var message = applicationMessage.ConvertPayloadToString();
Console.WriteLine($"收到消息:{topic} - {message}");
if (!Subscribers.TryGetValue(topic, out var subscribers)) return; if (!Subscribers.TryGetValue(topic, out var subscribers)) return;
foreach (var subscriber in subscribers) foreach (var subscriber in subscribers)
@ -114,12 +117,20 @@ public class MessageQueueHelper
var methodInfo = subscriber.Method; var methodInfo = subscriber.Method;
var parameters = methodInfo.GetParameters(); var parameters = methodInfo.GetParameters();
if (parameters.Length != 2) continue; if (parameters.Length != 2) continue;
var payload = JsonUtil.FromJson<Payload<object>>(message);
var dataType = parameters[1].ParameterType;
var type = typeof(Payload<>).MakeGenericType(dataType);
var payload = JsonUtil.FromJson(type, message);
if (payload == null) continue; if (payload == null) continue;
var property = type.GetProperty("Data");
var data = property?.GetValue(payload);
// 通知订阅者 // 通知订阅者
subscriber.DynamicInvoke(topic, JsonUtil.ToJson(payload.Data)); subscriber.DynamicInvoke(topic, data);
} }
catch (Exception exception) catch (Exception exception)
{ {

View File

@ -17,6 +17,8 @@ class MqttClient
public event EventHandler<MqttApplicationMessageReceivedEventArgs> MessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> MessageReceived;
private static readonly DateTime Epoch = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
public bool IsConnected => _client is { IsConnected: true }; public bool IsConnected => _client is { IsConnected: true };
/// <summary> /// <summary>
@ -111,13 +113,15 @@ class MqttClient
var message = new Payload<T>() var message = new Payload<T>()
{ {
MsgId = SnowFlakeNew.LongId.ToString(), MsgId = GetCurrentTimestamp(),
Data = data, Data = data,
ConsumeTime = DateTime.Now.Ticks.ToString(), ConsumeTime = DateTime.Now,
}; };
var payload = JsonUtil.ToJson(message); var payload = JsonUtil.ToJson(message);
Console.WriteLine(payload);
var result = await _client.PublishAsync(new MqttApplicationMessageBuilder() var result = await _client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(payload) .WithPayload(payload)
@ -127,6 +131,11 @@ class MqttClient
return result.IsSuccess; return result.IsSuccess;
} }
private static long GetCurrentTimestamp()
{
return (long)(DateTime.UtcNow - Epoch).TotalMilliseconds;
}
/// <summary> /// <summary>
/// 订阅主题 /// 订阅主题
/// </summary> /// </summary>

View File

@ -4,9 +4,9 @@ namespace MasstransferCommunicate.Mqtt.Model;
public class Payload<T> public class Payload<T>
{ {
[JsonProperty("msgId")] public string MsgId { get; set; } [JsonProperty("msgId")] public long MsgId { get; set; }
[JsonProperty("consumeTime")] public string ConsumeTime { get; set; } [JsonProperty("consumeTime")] public DateTime ConsumeTime { get; set; }
[JsonProperty("data")] public T Data { get; set; } [JsonProperty("data")] public T Data { get; set; }
} }