163 lines
5.6 KiB
C#
163 lines
5.6 KiB
C#
using System;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using Kreta.Core.MessageBroker;
|
|
using Kreta.Core.MessageBroker.Client.Implementations;
|
|
using Kreta.Core.MessageBroker.Contract;
|
|
using Kreta.Core.MessageBroker.Logging;
|
|
using Kreta.Core.MessageBroker.Logging.Serilog;
|
|
using Kreta.MessageBroker.ClientFactory;
|
|
using Kreta.MessageBroker.Configuration;
|
|
using Kreta.MessageBroker.TraceLog;
|
|
using log4net;
|
|
|
|
namespace Kreta.MessageBroker.Client
|
|
{
|
|
/// <summary>
|
|
/// Message client
|
|
/// </summary>
|
|
/// <typeparam name="T">Message object</typeparam>
|
|
abstract class MessageClient<T> : IMessageClient<T>
|
|
where T : Message
|
|
{
|
|
#region [Properties]
|
|
|
|
private IntegratedMessageClient<T> IntegratedMessageClient { get; }
|
|
|
|
private string MessageSignatureKey { get; }
|
|
|
|
private int QueueSize { get; }
|
|
|
|
#endregion
|
|
|
|
#region [Public Methods]
|
|
|
|
/// <summary>
|
|
/// Üzenet küldése
|
|
/// </summary>
|
|
/// <param name="message">Üzenet</param>
|
|
/// <remarks>A metódus az üzenetet várakozási sorba helyezi és azonal visszatér.
|
|
/// Ha a várakozási sor megtellik, az üzenetet eldobja</remarks>
|
|
public async Task PostAsync(T message)
|
|
{
|
|
message.CreateSignature(this.MessageSignatureKey);
|
|
|
|
await this.IntegratedMessageClient.PostAsync(message);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Üzenet küldése
|
|
/// </summary>
|
|
/// <param name="message">Üzenet</param>
|
|
public void Post(T message)
|
|
{
|
|
if (this.QueueSize == 0)
|
|
{
|
|
Task.Run(async () => await PostAsync(message));
|
|
}
|
|
else
|
|
{
|
|
PostAsync(message).Wait();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Üzenet tömb küldése
|
|
/// </summary>
|
|
/// <param name="messages">Üzenetek</param>
|
|
/// <remarks>A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet</remarks>
|
|
public async Task PostAsync(T[] messages)
|
|
{
|
|
foreach (var message in messages)
|
|
{
|
|
message.CreateSignature(this.MessageSignatureKey);
|
|
}
|
|
|
|
await this.IntegratedMessageClient.PostAsync(messages);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Üzenet tömb küldése
|
|
/// </summary>
|
|
/// <param name="messages">Üzenetek</param>
|
|
/// <remarks>A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet</remarks>
|
|
public void Post(T[] messages)
|
|
{
|
|
PostAsync(messages).Wait();
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region [Constructor(s)]
|
|
|
|
/// <summary>
|
|
/// Constructor
|
|
/// </summary>
|
|
/// <param name="jsonMessageClientFactory">Json message client factory</param>
|
|
/// <param name="messageClientName">Message client name</param>
|
|
/// <param name="configuration">Configuration</param>
|
|
/// <param name="loggerName">Logger name</param>
|
|
protected MessageClient(string messageClientName, IJsonMessageClientFactory<T> jsonMessageClientFactory, IMessageBrokerConfiguration configuration, ITraceMessageClient traceMessageClient, ISerilogTraceLogger serilogTraceLogger, string loggerName)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(messageClientName))
|
|
{
|
|
throw new ArgumentException($"{nameof(messageClientName)} cannot be null or whitespace");
|
|
}
|
|
|
|
if (jsonMessageClientFactory == null)
|
|
{
|
|
throw new ArgumentNullException(nameof(jsonMessageClientFactory));
|
|
}
|
|
|
|
if (traceMessageClient == null)
|
|
{
|
|
throw new ArgumentNullException(nameof(traceMessageClient));
|
|
}
|
|
|
|
if (configuration == null)
|
|
{
|
|
throw new ArgumentNullException(nameof(configuration));
|
|
}
|
|
|
|
var clientConfiguration = configuration.Clients.Cast<MessageClientConfigurationElement>().FirstOrDefault(c => c.Name.Equals(messageClientName, StringComparison.OrdinalIgnoreCase));
|
|
|
|
if (clientConfiguration == null)
|
|
{
|
|
throw new ArgumentException($"Client configuration was not found for client {messageClientName}");
|
|
}
|
|
|
|
this.QueueSize = clientConfiguration.QueueSize;
|
|
|
|
if (string.IsNullOrWhiteSpace(clientConfiguration.MessageSignatureKey))
|
|
{
|
|
throw new ArgumentException($"{nameof(clientConfiguration.MessageSignatureKey)} cannot be null or whitespace");
|
|
}
|
|
|
|
this.MessageSignatureKey = clientConfiguration.MessageSignatureKey;
|
|
|
|
if (string.IsNullOrWhiteSpace(loggerName))
|
|
{
|
|
throw new ArgumentException($"{nameof(loggerName)} cannot be null or whitespace");
|
|
}
|
|
|
|
IKeyValueLogger keyValueLogger = null;
|
|
|
|
if (clientConfiguration.LoggerType == LoggerType.Log4Net)
|
|
{
|
|
keyValueLogger = new Log4NetKeyValueLogger(LogManager.GetLogger(loggerName));
|
|
}
|
|
else if (clientConfiguration.LoggerType == LoggerType.Serilog)
|
|
{
|
|
keyValueLogger = new SerilogKeyValueLogger(clientConfiguration.SerilogLogger, serilogTraceLogger);
|
|
}
|
|
else
|
|
{
|
|
throw new NotSupportedException($"Not supported logger type {clientConfiguration.LoggerType}");
|
|
}
|
|
|
|
this.IntegratedMessageClient = new IntegratedMessageClient<T>(clientConfiguration, jsonMessageClientFactory, keyValueLogger, traceMessageClient);
|
|
}
|
|
|
|
#endregion
|
|
}
|
|
}
|