using MessengerApi.Db; using MessengerBroker.Configuration.Model; using MessengerBroker.Configuration.Model.Servers; using MessengerBroker.Db; using MessengerBroker.Factories; using Microsoft.EntityFrameworkCore; using System.Text.Json; namespace MessengerBroker.Handlers { public class MasterServerSynchronizationHandler { private readonly HttpClient httpClient = new HttpClient(); private readonly ILogger logger; private readonly BrokerConfiguration brokerConfiguration; private readonly BrokerDbContextFactory brokerDbContextFactory; private readonly MessengerApi.Factories.DbContextFactory messengerDbContextFactory; public MasterServerSynchronizationHandler( ILogger logger, BrokerConfiguration brokerConfiguration, BrokerDbContextFactory brokerDbContextFactory, MessengerApi.Factories.DbContextFactory messengerDbContextFactory) { this.logger = logger; this.brokerConfiguration = brokerConfiguration; this.brokerDbContextFactory = brokerDbContextFactory; this.messengerDbContextFactory = messengerDbContextFactory; } public Task BeginSyncing(MasterServer server, CancellationToken ct = default) { var userSyncTask = Task.Run(async () => { while (!ct.IsCancellationRequested) { logger.Trace($"Executing {nameof(this.PullUsersFromMasterServer)} at {server}."); // Run small updates of last 10 seconds of messages always. try { await this.PullUsersFromMasterServer(server); } catch (Exception ex) { this.logger.Error($"Error during user pull from server {server}.", ex); } await Task.Delay(TimeSpan.FromSeconds(60)); } }); var fastSyncTask = Task.Run(async () => { while (!ct.IsCancellationRequested) { logger.Trace($"Executing (fast) {nameof(this.PullMessagesFromMasterServer)} at {server}."); // Run small updates of last 10 seconds of messages always. try { await PullMessagesFromMasterServer(server, DateTime.UtcNow.AddSeconds(-10)); } catch(Exception ex) { this.logger.Error($"Error during fast message pull from server {server}.", ex); } await Task.Delay(TimeSpan.FromMilliseconds(100)); } }); var slowSyncTask = Task.Run(async () => { while (!ct.IsCancellationRequested) { logger.Trace($"Executing (slow) {nameof(this.PullMessagesFromMasterServer)} at {server}."); // Run large updates every once in a while. try { DateTime? startTimeUtc = DateTime.UtcNow.AddMinutes(-this.brokerConfiguration.HousekeepingMessageAgeInMinutes); while (!ct.IsCancellationRequested && startTimeUtc != null) { startTimeUtc = await PullMessagesFromMasterServer(server, startTimeUtc.Value); } } catch (Exception ex) { this.logger.Error($"Error during slow message pull from server {server}.", ex); } await Task.Delay(TimeSpan.FromMinutes(1)); } }); return Task.WhenAll(userSyncTask, fastSyncTask, slowSyncTask); } private async Task PullUsersFromMasterServer(MasterServer server) { this.logger.Info($"Pulling user data from {server.ToString()}."); var usersRequest = new HttpRequestMessage(HttpMethod.Get, $"{server.BrokerUrl}/users"); usersRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue( "Bearer", this.brokerConfiguration.BrokerId.ToString()); var usersResponseMessage = await this.httpClient.SendAsync(usersRequest); if (!usersResponseMessage.IsSuccessStatusCode) { this.logger.Error($"Can't pull data from master server: {server.ToString()}." + usersResponseMessage); } var usersResponse = JsonSerializer .Deserialize( await usersResponseMessage.Content.ReadAsStringAsync()); using (var broCtx = this.brokerDbContextFactory.CreateDbContext()) using (var apiCtx = this.messengerDbContextFactory.CreateDbContext()) { var updatedUsers = usersResponse.Users.Where(x => broCtx.Users.Any(bu => bu.Id == x.Id)).ToList(); updatedUsers.ForEach(async x => await UpdateUser(x, apiCtx)); var addedUsers = usersResponse.Users.Where(x => !broCtx.Users.Any(bu => bu.Id == x.Id)).ToList(); addedUsers.ForEach(async x => await AddUser(x, server.BrokerId, broCtx, apiCtx)); var deletedUsers = broCtx.Users.Where(x => !usersResponse.Users.Any(ur => ur.Id == x.Id)).ToList(); deletedUsers.ForEach(async x => await RemoveUser(x.Id, broCtx, apiCtx)); apiCtx.UserRoutes .Include(x => x.From) .Where(x => broCtx.Users.Any(bu => bu.Id == x.From.Id)) .ToList() .Where(x => !usersResponse.UserRoutes.Any(ur => ur.Id == x.Id)) .ToList() .ForEach(x => { this.logger.Info($"Removing UserRoute {x.Id}."); apiCtx.UserRoutes.Remove(x); }); usersResponse.UserRoutes .Where(x => !apiCtx.UserRoutes.Any(ur => ur.Id == x.Id)) .ToList() .ForEach(x => { this.logger.Info($"Adding UserRoute {x.Id}"); apiCtx.UserRoutes.Add(new MessengerApi.Db.Entities.UserRoute { Id = x.Id, From = apiCtx.Users.Single(u => u.Id == x.FromId), To = apiCtx.Users.Single(u => u.Id == x.ToId) }); }); broCtx.SaveChanges(); apiCtx.SaveChanges(); } async Task RemoveUser(Guid id, BrokerDbContext broCtx, MessengerDbContext apiCtx) { this.logger.Info($"Removing user {id}."); var broUser = await broCtx.Users.SingleOrDefaultAsync(x => x.Id == id); if (broUser != null) { broCtx.Users.Remove(broUser); } var apiUser = await apiCtx.Users.SingleOrDefaultAsync(x => x.Id == id); if (apiUser != null) { apiCtx.Users.Remove(apiUser); } } async Task AddUser(Model.Http.Users.UsersResponse.User user, Guid brokerId, BrokerDbContext broCtx, MessengerDbContext apiCtx) { this.logger.Info($"Adding user {user.Id}."); if (broCtx.Users.Any(x => x.Id == user.Id) == false) { broCtx.Users.Add(new Db.Model.User { Id = user.Id, BrokerId = brokerId }); } if (apiCtx.Users.Any(x => x.Id == user.Id)) { await UpdateUser(user, apiCtx); } else { await apiCtx.Users.AddAsync(new MessengerApi.Db.Entities.User { Id = user.Id, ApiKey = user.ApiKey, IsEnabled = user.IsEnabled, Name = user.Name }); } } async Task UpdateUser(Model.Http.Users.UsersResponse.User user, MessengerDbContext apiCtx) { this.logger.Info($"Updating user {user.Id}"); var apiUser = await apiCtx.Users.SingleAsync(x => x.Id == user.Id); apiUser.ApiKey = user.ApiKey; apiUser.IsEnabled = user.IsEnabled; apiUser.Name = user.Name; } } private async Task PullMessagesFromMasterServer(MasterServer server, DateTime lastMessageCreatedUtc) { var messagesRequest = new HttpRequestMessage( HttpMethod.Get, $"{server.BrokerUrl}/messages?sinceUtc={lastMessageCreatedUtc:o}"); messagesRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue( "Bearer", this.brokerConfiguration.BrokerId.ToString()); var messagesResponseMessage = await this.httpClient.SendAsync(messagesRequest); if (!messagesResponseMessage.IsSuccessStatusCode) { this.logger.Error($"Can't pull message data from {server}."); } var messagesResponse = JsonSerializer .Deserialize( await messagesResponseMessage.Content.ReadAsStringAsync()); this.logger.Debug($"Pulled {messagesResponse.Messages.Count()} messages."); using (var broCtx = this.brokerDbContextFactory.CreateDbContext()) using (var apiCtx = this.messengerDbContextFactory.CreateDbContext()) { var addedMessages = messagesResponse.Messages .Where(m => broCtx.Messages.Any(x => x.Id == m.Id) == false) .ToList(); addedMessages.ForEach(x => { broCtx.Messages.Add(new Db.Model.Message { BrokerId = server.BrokerId, Id = x.Id }); apiCtx.Messages.Add(new MessengerApi.Db.Entities.Message { Id = x.Id, CreatedUtc = x.CreatedUtc, FromId = apiCtx.Users.Single(u => u.Id == x.FromId).Id, ToId = apiCtx.Users.Single(u => u.Id == x.ToId).Id, IsAcknowledged = x.IsAcknowledged, IsDelivered = x.IsDelivered, Payload = x.Payload, TimeToLiveInSeconds = x.TimeToLiveInSeconds, PayloadType = x.PayloadType, }); }); var existingMessages = messagesResponse.Messages .Except(addedMessages) .ToList(); existingMessages.ForEach(x => { var existing = apiCtx.Messages.SingleOrDefault(a => a.Id == x.Id); if (existing != null && (existing.IsDelivered != x.IsDelivered || existing.IsAcknowledged != x.IsAcknowledged)) { existing.IsDelivered = x.IsDelivered; existing.IsAcknowledged = x.IsAcknowledged; } }); broCtx.SaveChanges(); apiCtx.SaveChanges(); } return messagesResponse?.Messages? .OrderByDescending(x => x.CreatedUtc) .FirstOrDefault()?.CreatedUtc; } } }