All checks were successful
Pack and Push NuGet Package / publish (push) Successful in 43s
129 lines
3.6 KiB
C#
129 lines
3.6 KiB
C#
using portaloggy;
|
|
using MessengerApi.Model.Messages;
|
|
using MessengerApi.Model;
|
|
|
|
namespace MessengerApi
|
|
{
|
|
internal class SubscriptionPollingEngine : IDisposable
|
|
{
|
|
private readonly ILogger logger;
|
|
private readonly IClient client;
|
|
private readonly List<Subscription> subscriptions = new List<Subscription>();
|
|
private readonly object subscriptionsLocker = new object();
|
|
|
|
private Task executionTask;
|
|
private CancellationTokenSource executionCts;
|
|
private bool isDisposed;
|
|
|
|
internal SubscriptionPollingEngine(
|
|
ILogger logger,
|
|
IClient client)
|
|
{
|
|
this.logger = logger;
|
|
this.client = client;
|
|
}
|
|
|
|
public Task AddSubscription(Subscription subscription)
|
|
{
|
|
this.AssertNotDisposedOrThrow();
|
|
|
|
lock (subscriptionsLocker)
|
|
{
|
|
subscriptions.Add(subscription);
|
|
}
|
|
|
|
this.logger.Log($"Subscription added for message {subscription.MessageTypeMask}.");
|
|
|
|
if(this.executionTask == null && this.executionCts == null)
|
|
{
|
|
this.BeginPolling();
|
|
}
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (this.isDisposed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
this.executionCts.Cancel();
|
|
this.executionCts.Dispose();
|
|
|
|
this.isDisposed = true;
|
|
}
|
|
|
|
internal void BeginPolling()
|
|
{
|
|
this.executionCts = new CancellationTokenSource();
|
|
this.executionTask = this.PollEndlessly(this.executionCts.Token);
|
|
|
|
this.logger.Log("Polling endlessly now.");
|
|
}
|
|
|
|
internal void RemoveSubscription(Subscription subscription)
|
|
{
|
|
lock (subscriptionsLocker)
|
|
{
|
|
this.subscriptions.Remove(subscription);
|
|
}
|
|
}
|
|
|
|
private async Task PollEndlessly(CancellationToken token)
|
|
{
|
|
while (!token.IsCancellationRequested)
|
|
{
|
|
var messages = Enumerable.Empty<InboxMessage>();
|
|
|
|
try
|
|
{
|
|
messages = this.client.GetMessages();
|
|
this.logger.Info($"Received {messages.Count()} messages.");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
this.logger.Error("Can't obtain messages.", ex);
|
|
}
|
|
|
|
foreach (var message in messages)
|
|
{
|
|
try
|
|
{
|
|
var sub = (Subscription)null;
|
|
|
|
lock (this.subscriptionsLocker)
|
|
{
|
|
sub = this.subscriptions.FirstOrDefault(x => message.PayloadType.StartsWith(x.MessageTypeMask));
|
|
}
|
|
|
|
if (sub == null)
|
|
{
|
|
this.logger.Log($"This message has no subscription and will be ignored: {message.PayloadType}.");
|
|
continue;
|
|
}
|
|
|
|
sub.Message(message);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
this.logger.Error("Can't process received message.", ex);
|
|
}
|
|
}
|
|
|
|
await Task.Delay(1000);
|
|
}
|
|
|
|
this.logger.Info("Polling ended.");
|
|
}
|
|
|
|
private void AssertNotDisposedOrThrow()
|
|
{
|
|
if (this.isDisposed)
|
|
{
|
|
throw new ObjectDisposedException(null);
|
|
}
|
|
}
|
|
}
|
|
} |