using System; using System.Linq; using System.Threading.Tasks; using Kreta.Core.MessageBroker; using Kreta.Core.MessageBroker.Client.Implementations; using Kreta.Core.MessageBroker.Contract; using Kreta.Core.MessageBroker.Logging; using Kreta.Core.MessageBroker.Logging.Serilog; using Kreta.MessageBroker.ClientFactory; using Kreta.MessageBroker.Configuration; using Kreta.MessageBroker.TraceLog; using log4net; namespace Kreta.MessageBroker.Client { /// /// Message client /// /// Message object abstract class MessageClient : IMessageClient where T : Message { #region [Properties] private IntegratedMessageClient IntegratedMessageClient { get; } private string MessageSignatureKey { get; } private int QueueSize { get; } #endregion #region [Public Methods] /// /// Üzenet küldése /// /// Üzenet /// A metódus az üzenetet várakozási sorba helyezi és azonal visszatér. /// Ha a várakozási sor megtellik, az üzenetet eldobja public async Task PostAsync(T message) { message.CreateSignature(this.MessageSignatureKey); await this.IntegratedMessageClient.PostAsync(message); } /// /// Üzenet küldése /// /// Üzenet public void Post(T message) { if (this.QueueSize == 0) { Task.Run(async () => await PostAsync(message)); } else { PostAsync(message).Wait(); } } /// /// Üzenet tömb küldése /// /// Üzenetek /// A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet public async Task PostAsync(T[] messages) { foreach (var message in messages) { message.CreateSignature(this.MessageSignatureKey); } await this.IntegratedMessageClient.PostAsync(messages); } /// /// Üzenet tömb küldése /// /// Üzenetek /// A metódus addig nem tér vissza amig el nem küldtük az összes üzenetet public void Post(T[] messages) { PostAsync(messages).Wait(); } #endregion #region [Constructor(s)] /// /// Constructor /// /// Json message client factory /// Message client name /// Configuration /// Logger name protected MessageClient(string messageClientName, IJsonMessageClientFactory jsonMessageClientFactory, IMessageBrokerConfiguration configuration, ITraceMessageClient traceMessageClient, ISerilogTraceLogger serilogTraceLogger, string loggerName) { if (string.IsNullOrWhiteSpace(messageClientName)) { throw new ArgumentException($"{nameof(messageClientName)} cannot be null or whitespace"); } if (jsonMessageClientFactory == null) { throw new ArgumentNullException(nameof(jsonMessageClientFactory)); } if (traceMessageClient == null) { throw new ArgumentNullException(nameof(traceMessageClient)); } if (configuration == null) { throw new ArgumentNullException(nameof(configuration)); } var clientConfiguration = configuration.Clients.Cast().FirstOrDefault(c => c.Name.Equals(messageClientName, StringComparison.OrdinalIgnoreCase)); if (clientConfiguration == null) { throw new ArgumentException($"Client configuration was not found for client {messageClientName}"); } this.QueueSize = clientConfiguration.QueueSize; if (string.IsNullOrWhiteSpace(clientConfiguration.MessageSignatureKey)) { throw new ArgumentException($"{nameof(clientConfiguration.MessageSignatureKey)} cannot be null or whitespace"); } this.MessageSignatureKey = clientConfiguration.MessageSignatureKey; if (string.IsNullOrWhiteSpace(loggerName)) { throw new ArgumentException($"{nameof(loggerName)} cannot be null or whitespace"); } IKeyValueLogger keyValueLogger = null; if (clientConfiguration.LoggerType == LoggerType.Log4Net) { keyValueLogger = new Log4NetKeyValueLogger(LogManager.GetLogger(loggerName)); } else if (clientConfiguration.LoggerType == LoggerType.Serilog) { keyValueLogger = new SerilogKeyValueLogger(clientConfiguration.SerilogLogger, serilogTraceLogger); } else { throw new NotSupportedException($"Not supported logger type {clientConfiguration.LoggerType}"); } this.IntegratedMessageClient = new IntegratedMessageClient(clientConfiguration, jsonMessageClientFactory, keyValueLogger, traceMessageClient); } #endregion } }