using MessengerApi.Db; using MessengerBroker.Db; using Microsoft.EntityFrameworkCore; using System.Text.Json; namespace MessengerBroker.Handlers { public class MasterHandler { private readonly Settings settings; private readonly HttpClient httpClient = new HttpClient(); public async Task BeginSyncingWithMaster(Settings.MasterServer masterServer, CancellationToken ct = default) { while (!ct.IsCancellationRequested) { await Task.Delay(TimeSpan.FromSeconds(1)); var usersRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/users"); usersRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString()); var usersResponseMessage = await this.httpClient.SendAsync(usersRequest, ct); if (!usersResponseMessage.IsSuccessStatusCode) { continue; } var usersResponse = JsonSerializer.Deserialize(await usersResponseMessage.Content.ReadAsStringAsync()); using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString)) using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString)) { var updatedUsers = usersResponse.Users.Where(x => broCtx.Users.GetUserExists(x.Id)).ToList(); updatedUsers.ForEach(async x => await this.UpdateUser(x, apiCtx)); var deletedUsers = broCtx.Users.GetUsersExcept(usersResponse.Users.Select(x => x.Id)).ToList(); deletedUsers.ForEach(async x => await this.RemoveUser(x.Id, broCtx, apiCtx)); var addedUsers = usersResponse.Users.Where(x => !broCtx.Users.GetUserExists(x.Id)).ToList(); addedUsers.ForEach(async x => await this.AddUser(x, masterServer.BrokerId, broCtx, apiCtx)); foreach (var route in usersResponse.UserRoutes .Where(r => apiCtx.UserRoutes .Include(x => x.From) .Include(x => x.To) .Any(apir => apir.Id == r.Id && (apir.From.Id != r.FromId || apir.To.Id != r.ToId)))) { var existing = apiCtx.UserRoutes.Include(x => x.From).Include(x => x.To).Single(x => x.Id == route.Id); existing.From = apiCtx.Users.Single(x => x.Id == route.FromId); existing.To = apiCtx.Users.Single(x => x.Id == route.ToId); } foreach (var deletedRoute in apiCtx.UserRoutes .Where(x => !usersResponse.UserRoutes.Any(r => r.Id == x.Id))) { apiCtx.UserRoutes.Remove(deletedRoute); } foreach (var addedRoute in usersResponse.UserRoutes .Where(r => !apiCtx.UserRoutes.Any(x => x.Id == r.Id))) { apiCtx.UserRoutes.Add(new MessengerApi.Db.Entities.UserRoute { Id = addedRoute.Id, From = apiCtx.Users.Single(x => x.Id == addedRoute.FromId), To = apiCtx.Users.Single(x => x.Id == addedRoute.ToId) }); } broCtx.SaveChanges(); apiCtx.SaveChanges(); } var messagesRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/messages"); messagesRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString()); var messagesResponseMessage = await this.httpClient.SendAsync(messagesRequest, ct); if (!messagesResponseMessage.IsSuccessStatusCode) { continue; } var messagesResponse = JsonSerializer.Deserialize(await messagesResponseMessage.Content.ReadAsStringAsync()); using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString)) using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString)) { var addedMessages = messagesResponse.Messages .Where(m => broCtx.Messages.Any(x => x.BrokerId == masterServer.BrokerId && x.Id == m.Id) == false) .ToList(); addedMessages.ForEach(x => { broCtx.Messages.Add(new Db.Model.Message { BrokerId = masterServer.BrokerId, Id = x.Id }); apiCtx.Messages.Add(new MessengerApi.Db.Entities.Message { Id = x.Id, CreatedUtc = x.CreatedUtc, From = x.From != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.From.Value)) ?? null : null, To = x.To != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.To.Value)) ?? null : null, IsAcknowledged = x.IsAcknowledged, IsDelivered = x.IsDelivered, Payload = x.Payload, PayloadId = x.PayloadId, PayloadLifespanInSeconds = x.PayloadLifespanInSeconds, PayloadTimestamp = x.PayloadTimestamp, PayloadType = x.PayloadType, }); }); var existingMessages = messagesResponse.Messages .Except(addedMessages) .ToList(); existingMessages.ForEach(x => { var existing = apiCtx.Messages.SingleOrDefault(a => a.Id == x.Id); if(existing != null && (existing.IsDelivered != x.IsDelivered || existing.IsAcknowledged != x.IsAcknowledged)) { existing.IsDelivered = x.IsDelivered; existing.IsAcknowledged = x.IsAcknowledged; } }); broCtx.SaveChanges(); apiCtx.SaveChanges(); } } } private async Task RemoveUser(Guid id, BrokerDbContext broCtx, MessengerDbContext apiCtx) { var broUser = await broCtx.Users.SingleOrDefaultAsync(x => x.Id == id); if (broUser != null) { broCtx.Users.Remove(broUser); } var apiUser = await apiCtx.Users.SingleOrDefaultAsync(x => x.Id == id); if (apiUser != null) { apiCtx.Users.Remove(apiUser); } } private async Task AddUser(Model.Http.Users.UsersResponse.User user, Guid brokerId, BrokerDbContext broCtx, MessengerDbContext apiCtx) { if (broCtx.Users.GetUserExists(user.Id) == false) { broCtx.Users.Add(new Db.Model.User { Id = user.Id, BrokerId = brokerId }); } if (apiCtx.Users.Any(x => x.Id == user.Id)) { await this.UpdateUser(user, apiCtx); } else { await apiCtx.Users.AddAsync(new MessengerApi.Db.Entities.User { Id = user.Id, ApiKey = user.ApiKey, CanReceive = user.CanReceive, CanSend = user.CanSend, IsEnabled = user.IsEnabled, Name = user.Name }); } } private async Task UpdateUser(Model.Http.Users.UsersResponse.User user, MessengerDbContext apiCtx) { var apiUser = await apiCtx.Users.SingleAsync(x => x.Id == user.Id); apiUser.ApiKey = user.ApiKey; apiUser.CanReceive = user.CanReceive; apiUser.CanSend = user.CanSend; apiUser.IsEnabled = user.IsEnabled; apiUser.Name = user.Name; } } }