kreta/Kreta.MessageBroker/Client/MessageClient.cs
2024-03-13 00:33:46 +01:00

163 lines
5.6 KiB
C#

using System;
using System.Linq;
using System.Threading.Tasks;
using Kreta.Core.MessageBroker;
using Kreta.Core.MessageBroker.Client.Implementations;
using Kreta.Core.MessageBroker.Contract;
using Kreta.Core.MessageBroker.Logging;
using Kreta.Core.MessageBroker.Logging.Serilog;
using Kreta.MessageBroker.ClientFactory;
using Kreta.MessageBroker.Configuration;
using Kreta.MessageBroker.TraceLog;
using log4net;
namespace Kreta.MessageBroker.Client
{
/// <summary>
/// Message client
/// </summary>
/// <typeparam name="T">Message object</typeparam>
abstract class MessageClient<T> : IMessageClient<T>
where T : Message
{
#region [Properties]
private IntegratedMessageClient<T> IntegratedMessageClient { get; }
private string MessageSignatureKey { get; }
private int QueueSize { get; }
#endregion
#region [Public Methods]
/// <summary>
/// Üzenet küldése
/// </summary>
/// <param name="message">Üzenet</param>
/// <remarks>A metódus az üzenetet várakozási sorba helyezi és azonal visszatér.
/// Ha a várakozási sor megtellik, az üzenetet eldobja</remarks>
public async Task PostAsync(T message)
{
message.CreateSignature(this.MessageSignatureKey);
await this.IntegratedMessageClient.PostAsync(message);
}
/// <summary>
/// Üzenet küldése
/// </summary>
/// <param name="message">Üzenet</param>
public void Post(T message)
{
if (this.QueueSize == 0)
{
Task.Run(async () => await PostAsync(message));
}
else
{
PostAsync(message).Wait();
}
}
/// <summary>
/// Üzenet tömb küldése
/// </summary>
/// <param name="messages">Üzenetek</param>
/// <remarks>A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet</remarks>
public async Task PostAsync(T[] messages)
{
foreach (var message in messages)
{
message.CreateSignature(this.MessageSignatureKey);
}
await this.IntegratedMessageClient.PostAsync(messages);
}
/// <summary>
/// Üzenet tömb küldése
/// </summary>
/// <param name="messages">Üzenetek</param>
/// <remarks>A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet</remarks>
public void Post(T[] messages)
{
PostAsync(messages).Wait();
}
#endregion
#region [Constructor(s)]
/// <summary>
/// Constructor
/// </summary>
/// <param name="jsonMessageClientFactory">Json message client factory</param>
/// <param name="messageClientName">Message client name</param>
/// <param name="configuration">Configuration</param>
/// <param name="loggerName">Logger name</param>
protected MessageClient(string messageClientName, IJsonMessageClientFactory<T> jsonMessageClientFactory, IMessageBrokerConfiguration configuration, ITraceMessageClient traceMessageClient, ISerilogTraceLogger serilogTraceLogger, string loggerName)
{
if (string.IsNullOrWhiteSpace(messageClientName))
{
throw new ArgumentException($"{nameof(messageClientName)} cannot be null or whitespace");
}
if (jsonMessageClientFactory == null)
{
throw new ArgumentNullException(nameof(jsonMessageClientFactory));
}
if (traceMessageClient == null)
{
throw new ArgumentNullException(nameof(traceMessageClient));
}
if (configuration == null)
{
throw new ArgumentNullException(nameof(configuration));
}
var clientConfiguration = configuration.Clients.Cast<MessageClientConfigurationElement>().FirstOrDefault(c => c.Name.Equals(messageClientName, StringComparison.OrdinalIgnoreCase));
if (clientConfiguration == null)
{
throw new ArgumentException($"Client configuration was not found for client {messageClientName}");
}
this.QueueSize = clientConfiguration.QueueSize;
if (string.IsNullOrWhiteSpace(clientConfiguration.MessageSignatureKey))
{
throw new ArgumentException($"{nameof(clientConfiguration.MessageSignatureKey)} cannot be null or whitespace");
}
this.MessageSignatureKey = clientConfiguration.MessageSignatureKey;
if (string.IsNullOrWhiteSpace(loggerName))
{
throw new ArgumentException($"{nameof(loggerName)} cannot be null or whitespace");
}
IKeyValueLogger keyValueLogger = null;
if (clientConfiguration.LoggerType == LoggerType.Log4Net)
{
keyValueLogger = new Log4NetKeyValueLogger(LogManager.GetLogger(loggerName));
}
else if (clientConfiguration.LoggerType == LoggerType.Serilog)
{
keyValueLogger = new SerilogKeyValueLogger(clientConfiguration.SerilogLogger, serilogTraceLogger);
}
else
{
throw new NotSupportedException($"Not supported logger type {clientConfiguration.LoggerType}");
}
this.IntegratedMessageClient = new IntegratedMessageClient<T>(clientConfiguration, jsonMessageClientFactory, keyValueLogger, traceMessageClient);
}
#endregion
}
}