using System;
using System.Linq;
using System.Threading.Tasks;
using Kreta.Core.MessageBroker;
using Kreta.Core.MessageBroker.Client.Implementations;
using Kreta.Core.MessageBroker.Contract;
using Kreta.Core.MessageBroker.Logging;
using Kreta.Core.MessageBroker.Logging.Serilog;
using Kreta.MessageBroker.ClientFactory;
using Kreta.MessageBroker.Configuration;
using Kreta.MessageBroker.TraceLog;
using log4net;
namespace Kreta.MessageBroker.Client
{
///
/// Message client
///
/// Message object
abstract class MessageClient : IMessageClient
where T : Message
{
#region [Properties]
private IntegratedMessageClient IntegratedMessageClient { get; }
private string MessageSignatureKey { get; }
private int QueueSize { get; }
#endregion
#region [Public Methods]
///
/// Üzenet küldése
///
/// Üzenet
/// A metódus az üzenetet várakozási sorba helyezi és azonal visszatér.
/// Ha a várakozási sor megtellik, az üzenetet eldobja
public async Task PostAsync(T message)
{
message.CreateSignature(this.MessageSignatureKey);
await this.IntegratedMessageClient.PostAsync(message);
}
///
/// Üzenet küldése
///
/// Üzenet
public void Post(T message)
{
if (this.QueueSize == 0)
{
Task.Run(async () => await PostAsync(message));
}
else
{
PostAsync(message).Wait();
}
}
///
/// Üzenet tömb küldése
///
/// Üzenetek
/// A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet
public async Task PostAsync(T[] messages)
{
foreach (var message in messages)
{
message.CreateSignature(this.MessageSignatureKey);
}
await this.IntegratedMessageClient.PostAsync(messages);
}
///
/// Üzenet tömb küldése
///
/// Üzenetek
/// A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet
public void Post(T[] messages)
{
PostAsync(messages).Wait();
}
#endregion
#region [Constructor(s)]
///
/// Constructor
///
/// Json message client factory
/// Message client name
/// Configuration
/// Logger name
protected MessageClient(string messageClientName, IJsonMessageClientFactory jsonMessageClientFactory, IMessageBrokerConfiguration configuration, ITraceMessageClient traceMessageClient, ISerilogTraceLogger serilogTraceLogger, string loggerName)
{
if (string.IsNullOrWhiteSpace(messageClientName))
{
throw new ArgumentException($"{nameof(messageClientName)} cannot be null or whitespace");
}
if (jsonMessageClientFactory == null)
{
throw new ArgumentNullException(nameof(jsonMessageClientFactory));
}
if (traceMessageClient == null)
{
throw new ArgumentNullException(nameof(traceMessageClient));
}
if (configuration == null)
{
throw new ArgumentNullException(nameof(configuration));
}
var clientConfiguration = configuration.Clients.Cast().FirstOrDefault(c => c.Name.Equals(messageClientName, StringComparison.OrdinalIgnoreCase));
if (clientConfiguration == null)
{
throw new ArgumentException($"Client configuration was not found for client {messageClientName}");
}
this.QueueSize = clientConfiguration.QueueSize;
if (string.IsNullOrWhiteSpace(clientConfiguration.MessageSignatureKey))
{
throw new ArgumentException($"{nameof(clientConfiguration.MessageSignatureKey)} cannot be null or whitespace");
}
this.MessageSignatureKey = clientConfiguration.MessageSignatureKey;
if (string.IsNullOrWhiteSpace(loggerName))
{
throw new ArgumentException($"{nameof(loggerName)} cannot be null or whitespace");
}
IKeyValueLogger keyValueLogger = null;
if (clientConfiguration.LoggerType == LoggerType.Log4Net)
{
keyValueLogger = new Log4NetKeyValueLogger(LogManager.GetLogger(loggerName));
}
else if (clientConfiguration.LoggerType == LoggerType.Serilog)
{
keyValueLogger = new SerilogKeyValueLogger(clientConfiguration.SerilogLogger, serilogTraceLogger);
}
else
{
throw new NotSupportedException($"Not supported logger type {clientConfiguration.LoggerType}");
}
this.IntegratedMessageClient = new IntegratedMessageClient(clientConfiguration, jsonMessageClientFactory, keyValueLogger, traceMessageClient);
}
#endregion
}
}