init
This commit is contained in:
@@ -0,0 +1,163 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Kreta.Core.MessageBroker;
|
||||
using Kreta.Core.MessageBroker.Client.Implementations;
|
||||
using Kreta.Core.MessageBroker.Contract;
|
||||
using Kreta.Core.MessageBroker.Logging;
|
||||
using Kreta.Core.MessageBroker.Logging.Serilog;
|
||||
using Kreta.MessageBroker.ClientFactory;
|
||||
using Kreta.MessageBroker.Configuration;
|
||||
using Kreta.MessageBroker.TraceLog;
|
||||
using log4net;
|
||||
|
||||
namespace Kreta.MessageBroker.Client
|
||||
{
|
||||
/// <summary>
|
||||
/// Message client
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Message object</typeparam>
|
||||
abstract class MessageClient<T> : IMessageClient<T>
|
||||
where T : Message
|
||||
{
|
||||
#region [Properties]
|
||||
|
||||
private IntegratedMessageClient<T> IntegratedMessageClient { get; }
|
||||
|
||||
private string MessageSignatureKey { get; }
|
||||
|
||||
private int QueueSize { get; }
|
||||
|
||||
#endregion
|
||||
|
||||
#region [Public Methods]
|
||||
|
||||
/// <summary>
|
||||
/// Üzenet küldése
|
||||
/// </summary>
|
||||
/// <param name="message">Üzenet</param>
|
||||
/// <remarks>A metódus az üzenetet várakozási sorba helyezi és azonal visszatér.
|
||||
/// Ha a várakozási sor megtellik, az üzenetet eldobja</remarks>
|
||||
public async Task PostAsync(T message)
|
||||
{
|
||||
message.CreateSignature(this.MessageSignatureKey);
|
||||
|
||||
await this.IntegratedMessageClient.PostAsync(message);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Üzenet küldése
|
||||
/// </summary>
|
||||
/// <param name="message">Üzenet</param>
|
||||
public void Post(T message)
|
||||
{
|
||||
if (this.QueueSize == 0)
|
||||
{
|
||||
Task.Run(async () => await PostAsync(message));
|
||||
}
|
||||
else
|
||||
{
|
||||
PostAsync(message).Wait();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Üzenet tömb küldése
|
||||
/// </summary>
|
||||
/// <param name="messages">Üzenetek</param>
|
||||
/// <remarks>A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet</remarks>
|
||||
public async Task PostAsync(T[] messages)
|
||||
{
|
||||
foreach (var message in messages)
|
||||
{
|
||||
message.CreateSignature(this.MessageSignatureKey);
|
||||
}
|
||||
|
||||
await this.IntegratedMessageClient.PostAsync(messages);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Üzenet tömb küldése
|
||||
/// </summary>
|
||||
/// <param name="messages">Üzenetek</param>
|
||||
/// <remarks>A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet</remarks>
|
||||
public void Post(T[] messages)
|
||||
{
|
||||
PostAsync(messages).Wait();
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region [Constructor(s)]
|
||||
|
||||
/// <summary>
|
||||
/// Constructor
|
||||
/// </summary>
|
||||
/// <param name="jsonMessageClientFactory">Json message client factory</param>
|
||||
/// <param name="messageClientName">Message client name</param>
|
||||
/// <param name="configuration">Configuration</param>
|
||||
/// <param name="loggerName">Logger name</param>
|
||||
protected MessageClient(string messageClientName, IJsonMessageClientFactory<T> jsonMessageClientFactory, IMessageBrokerConfiguration configuration, ITraceMessageClient traceMessageClient, ISerilogTraceLogger serilogTraceLogger, string loggerName)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(messageClientName))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(messageClientName)} cannot be null or whitespace");
|
||||
}
|
||||
|
||||
if (jsonMessageClientFactory == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(jsonMessageClientFactory));
|
||||
}
|
||||
|
||||
if (traceMessageClient == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(traceMessageClient));
|
||||
}
|
||||
|
||||
if (configuration == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(configuration));
|
||||
}
|
||||
|
||||
var clientConfiguration = configuration.Clients.Cast<MessageClientConfigurationElement>().FirstOrDefault(c => c.Name.Equals(messageClientName, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
if (clientConfiguration == null)
|
||||
{
|
||||
throw new ArgumentException($"Client configuration was not found for client {messageClientName}");
|
||||
}
|
||||
|
||||
this.QueueSize = clientConfiguration.QueueSize;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(clientConfiguration.MessageSignatureKey))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(clientConfiguration.MessageSignatureKey)} cannot be null or whitespace");
|
||||
}
|
||||
|
||||
this.MessageSignatureKey = clientConfiguration.MessageSignatureKey;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(loggerName))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(loggerName)} cannot be null or whitespace");
|
||||
}
|
||||
|
||||
IKeyValueLogger keyValueLogger = null;
|
||||
|
||||
if (clientConfiguration.LoggerType == LoggerType.Log4Net)
|
||||
{
|
||||
keyValueLogger = new Log4NetKeyValueLogger(LogManager.GetLogger(loggerName));
|
||||
}
|
||||
else if (clientConfiguration.LoggerType == LoggerType.Serilog)
|
||||
{
|
||||
keyValueLogger = new SerilogKeyValueLogger(clientConfiguration.SerilogLogger, serilogTraceLogger);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new NotSupportedException($"Not supported logger type {clientConfiguration.LoggerType}");
|
||||
}
|
||||
|
||||
this.IntegratedMessageClient = new IntegratedMessageClient<T>(clientConfiguration, jsonMessageClientFactory, keyValueLogger, traceMessageClient);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user