using portaloggy; using MessengerApi.Model.Messages; using MessengerApi.Model; namespace MessengerApi { internal class SubscriptionPollingEngine : IDisposable { private readonly ILogger logger; private readonly IClient client; private readonly List subscriptions = new List(); private readonly object subscriptionsLocker = new object(); private Task executionTask; private CancellationTokenSource executionCts; private bool isDisposed; internal SubscriptionPollingEngine( ILogger logger, IClient client) { this.logger = logger; this.client = client; } public Task AddSubscription(Subscription subscription) { this.AssertNotDisposedOrThrow(); lock (subscriptionsLocker) { subscriptions.Add(subscription); } this.logger.Log($"Subscription added for message {subscription.MessageTypeMask}."); if(this.executionTask == null && this.executionCts == null) { this.BeginPolling(); } return Task.CompletedTask; } public void Dispose() { if (this.isDisposed) { return; } this.executionCts.Cancel(); this.executionCts.Dispose(); this.isDisposed = true; } internal void BeginPolling() { this.executionCts = new CancellationTokenSource(); this.executionTask = this.PollEndlessly(this.executionCts.Token); this.logger.Log("Polling endlessly now."); } internal void RemoveSubscription(Subscription subscription) { lock (subscriptionsLocker) { this.subscriptions.Remove(subscription); } } private async Task PollEndlessly(CancellationToken token) { while (!token.IsCancellationRequested) { var messages = Enumerable.Empty(); try { messages = this.client.GetMessages(); this.logger.Info($"Received {messages.Count()} messages."); } catch (Exception ex) { this.logger.Error("Can't obtain messages.", ex); } foreach (var message in messages) { try { var sub = (Subscription)null; lock (this.subscriptionsLocker) { sub = this.subscriptions.FirstOrDefault(x => message.PayloadType.StartsWith(x.MessageTypeMask)); } if (sub == null) { this.logger.Log($"This message has no subscription and will be ignored: {message.PayloadType}."); continue; } sub.Message(message); } catch (Exception ex) { this.logger.Error("Can't process received message.", ex); } } await Task.Delay(1000); } this.logger.Info("Polling ended."); } private void AssertNotDisposedOrThrow() { if (this.isDisposed) { throw new ObjectDisposedException(null); } } } }