using portaloggy; using MessengerApi.Model.Messages; using MessengerApi.Model; namespace MessengerApi { public interface IQueryClient : IDisposable { Task Query(OutboxMessage message); } public class QueryClient : SubscriptionClient, IQueryClient { private readonly int _timeoutInSeconds; public QueryClient( Credentials credentials, HttpClient httpClient = null, ILogger logger = null, int maximumPermittedWaitTimeInSeconds = 10) :base(credentials, httpClient, logger) { this._timeoutInSeconds = maximumPermittedWaitTimeInSeconds; } public async Task Query(OutboxMessage message) { try { var messageTypeMask = $"{message.PayloadType}-{Guid.NewGuid()}"; var maskedMessage = new OutboxMessage { Payload = message.Payload, LifespanInSeconds = message.LifespanInSeconds, PayloadType = messageTypeMask, ToUserId = message.ToUserId }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(this._timeoutInSeconds)); var sub = this.Subscribe(messageTypeMask); var result = (InboxMessage)null; sub.OnMessage += (source, message) => { result = message; cts.Cancel(); }; this.SendMessage(maskedMessage); while (!cts.IsCancellationRequested) { await Task.Delay(TimeSpan.FromSeconds(1)); } this.Unsubscribe(sub); if (result == null) { throw new InvalidOperationException("Couldn't retrieve response."); } result.PayloadType = message.PayloadType; return result; } catch { throw; } } } }