diff --git a/code/MessengerBroker.Configuration/Model/BrokerConfiguration.cs b/code/MessengerBroker.Configuration/Model/BrokerConfiguration.cs index 62c3f18..073ae70 100644 --- a/code/MessengerBroker.Configuration/Model/BrokerConfiguration.cs +++ b/code/MessengerBroker.Configuration/Model/BrokerConfiguration.cs @@ -24,6 +24,10 @@ namespace MessengerBroker.Configuration.Model public SlaveServer[] SlaveServers { get; set; } + public bool HousekeepingEnabled { get; set; } + + public int HousekeepingMessageAgeInMinutes { get; set; } + public BrokerConfiguration( PersistenceConfiguration apiPersistenceConfiguration, PersistenceConfiguration broPersistenceConfiguration, @@ -37,15 +41,22 @@ namespace MessengerBroker.Configuration.Model this.MasterServers = masters; this.SlaveServers = slaves; this.Proxies = []; + this.HousekeepingEnabled = true; + this.HousekeepingMessageAgeInMinutes = 180; } - public BrokerConfiguration(IEnvironmentConfigurationSource config):this( + public BrokerConfiguration(IEnvironmentConfigurationSource config) : this( ApiEnvironmentPersistenceConfigurationParser.Parse(config), EnvironmentPersistenceConfigurationParser.Parse(config), config.GetValue(Constants.EnvironmentVariables.BROKER_ID), MasterServerParser.Parse(config.GetValue(Constants.EnvironmentVariables.MASTER_SERVERS)), SlaveServerParser.Parse(config.GetValue(Constants.EnvironmentVariables.SLAVE_SERVERS))) { + if(config.HasKey(Constants.EnvironmentVariables.HOUSEKEEPING_ENABLED)) + { + this.HousekeepingEnabled = config.GetValue(Constants.EnvironmentVariables.HOUSEKEEPING_ENABLED); + this.HousekeepingMessageAgeInMinutes = config.GetValue(Constants.EnvironmentVariables.HOUSEKEEPING_MESSAGE_AGE_IN_MINUTES); + } } } } \ No newline at end of file diff --git a/code/MessengerBroker.Configuration/Model/Servers/MasterServer.cs b/code/MessengerBroker.Configuration/Model/Servers/MasterServer.cs index 3cc650a..5b3f2e0 100644 --- a/code/MessengerBroker.Configuration/Model/Servers/MasterServer.cs +++ b/code/MessengerBroker.Configuration/Model/Servers/MasterServer.cs @@ -11,5 +11,10 @@ namespace MessengerBroker.Configuration.Model.Servers /// Url of the Broker API we use to download the data. /// public string BrokerUrl { get; set; } + + public override string ToString() + { + return $"{this.Name} ({this.BrokerUrl}, ID {this.BrokerId})"; + } } } \ No newline at end of file diff --git a/code/MessengerBroker.Configuration/Model/Servers/SlaveServer.cs b/code/MessengerBroker.Configuration/Model/Servers/SlaveServer.cs index 4a80029..ae8ae96 100644 --- a/code/MessengerBroker.Configuration/Model/Servers/SlaveServer.cs +++ b/code/MessengerBroker.Configuration/Model/Servers/SlaveServer.cs @@ -7,5 +7,9 @@ namespace MessengerBroker.Configuration.Model.Servers /// public class SlaveServer : Server { + public override string ToString() + { + return $"{this.Name} (ID {this.BrokerId})"; + } } } \ No newline at end of file diff --git a/code/MessengerBroker.Configuration/Sources/Environment/Constants.EnvironmentVariables.cs b/code/MessengerBroker.Configuration/Sources/Environment/Constants.EnvironmentVariables.cs index 6d76f51..4e1fb85 100644 --- a/code/MessengerBroker.Configuration/Sources/Environment/Constants.EnvironmentVariables.cs +++ b/code/MessengerBroker.Configuration/Sources/Environment/Constants.EnvironmentVariables.cs @@ -11,6 +11,8 @@ namespace MessengerBroker.Configuration.Sources.Environment public const string BROKER_ID = nameof(BROKER_ID); public const string MASTER_SERVERS = nameof(MASTER_SERVERS); public const string SLAVE_SERVERS = nameof(SLAVE_SERVERS); + public const string HOUSEKEEPING_ENABLED = nameof(HOUSEKEEPING_ENABLED); + public const string HOUSEKEEPING_MESSAGE_AGE_IN_MINUTES = nameof(HOUSEKEEPING_MESSAGE_AGE_IN_MINUTES); } } -} +} \ No newline at end of file diff --git a/code/MessengerBroker.Db/Model/Message.cs b/code/MessengerBroker.Db/Model/Message.cs index b33455f..559221f 100644 --- a/code/MessengerBroker.Db/Model/Message.cs +++ b/code/MessengerBroker.Db/Model/Message.cs @@ -5,5 +5,7 @@ public Guid Id { get; set; } public Guid BrokerId { get; set; } + + public DateTime TimestampUtc { get; set; } } } diff --git a/code/MessengerBroker/Handlers/HousekeepingHandler.cs b/code/MessengerBroker/Handlers/HousekeepingHandler.cs new file mode 100644 index 0000000..2262338 --- /dev/null +++ b/code/MessengerBroker/Handlers/HousekeepingHandler.cs @@ -0,0 +1,41 @@ +using MessengerBroker.Configuration.Model; +using MessengerBroker.Factories; +using Microsoft.EntityFrameworkCore; + +namespace MessengerBroker.Handlers +{ + public class HousekeepingHandler + { + private readonly BrokerConfiguration brokerConfiguration; + private readonly BrokerDbContextFactory brokerDbContextFactory; + + public HousekeepingHandler(BrokerConfiguration brokerConfiguration) + { + this.brokerConfiguration = brokerConfiguration; + } + + public Task BeginHousekeeping(CancellationToken ct = default) + { + if(this.brokerConfiguration.HousekeepingEnabled == false) + { + return Task.CompletedTask; + } + + return Task.Run(async () => + { + while (!ct.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromMinutes(1)); + + using var context = this.brokerDbContextFactory.CreateDbContext(); + + var oldestAllowedTimestamp = DateTime.UtcNow.AddMinutes(-this.brokerConfiguration.HousekeepingMessageAgeInMinutes); + + await context.Messages + .Where(x => x.TimestampUtc < oldestAllowedTimestamp) + .ExecuteDeleteAsync(); + } + }); + } + } +} diff --git a/code/MessengerBroker/Handlers/MasterHandler.cs b/code/MessengerBroker/Handlers/MasterHandler.cs deleted file mode 100644 index c41c6e5..0000000 --- a/code/MessengerBroker/Handlers/MasterHandler.cs +++ /dev/null @@ -1,196 +0,0 @@ -//using MessengerApi.Db; -//using MessengerBroker.Db; -//using Microsoft.EntityFrameworkCore; -//using System.Text.Json; - -//namespace MessengerBroker.Handlers -//{ -// public class MasterHandler -// { -// private readonly Settings settings; - -// private readonly HttpClient httpClient = new HttpClient(); - -// public async Task BeginSyncingWithMaster(Settings.MasterServer masterServer, CancellationToken ct = default) -// { -// while (!ct.IsCancellationRequested) -// { -// await Task.Delay(TimeSpan.FromSeconds(1)); - -// var usersRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/users"); -// usersRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString()); - -// var usersResponseMessage = await this.httpClient.SendAsync(usersRequest, ct); - -// if (!usersResponseMessage.IsSuccessStatusCode) -// { -// continue; -// } - -// var usersResponse = JsonSerializer.Deserialize(await usersResponseMessage.Content.ReadAsStringAsync()); - -// using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString)) -// using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString)) -// { -// var updatedUsers = usersResponse.Users.Where(x => broCtx.Users.GetUserExists(x.Id)).ToList(); -// updatedUsers.ForEach(async x => await this.UpdateUser(x, apiCtx)); - -// var deletedUsers = broCtx.Users.GetUsersExcept(usersResponse.Users.Select(x => x.Id)).ToList(); -// deletedUsers.ForEach(async x => await this.RemoveUser(x.Id, broCtx, apiCtx)); - -// var addedUsers = usersResponse.Users.Where(x => !broCtx.Users.GetUserExists(x.Id)).ToList(); -// addedUsers.ForEach(async x => await this.AddUser(x, masterServer.BrokerId, broCtx, apiCtx)); - -// foreach (var route in usersResponse.UserRoutes -// .Where(r => apiCtx.UserRoutes -// .Include(x => x.From) -// .Include(x => x.To) -// .Any(apir => apir.Id == r.Id && (apir.From.Id != r.FromId || apir.To.Id != r.ToId)))) -// { -// var existing = apiCtx.UserRoutes.Include(x => x.From).Include(x => x.To).Single(x => x.Id == route.Id); -// existing.From = apiCtx.Users.Single(x => x.Id == route.FromId); -// existing.To = apiCtx.Users.Single(x => x.Id == route.ToId); -// } - -// foreach (var deletedRoute in apiCtx.UserRoutes -// .Where(x => !usersResponse.UserRoutes.Any(r => r.Id == x.Id))) -// { -// apiCtx.UserRoutes.Remove(deletedRoute); -// } - -// foreach (var addedRoute in usersResponse.UserRoutes -// .Where(r => !apiCtx.UserRoutes.Any(x => x.Id == r.Id))) -// { -// apiCtx.UserRoutes.Add(new MessengerApi.Db.Entities.UserRoute -// { -// Id = addedRoute.Id, -// From = apiCtx.Users.Single(x => x.Id == addedRoute.FromId), -// To = apiCtx.Users.Single(x => x.Id == addedRoute.ToId) -// }); -// } - -// broCtx.SaveChanges(); -// apiCtx.SaveChanges(); -// } - -// var messagesRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/messages"); -// messagesRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString()); - -// var messagesResponseMessage = await this.httpClient.SendAsync(messagesRequest, ct); - -// if (!messagesResponseMessage.IsSuccessStatusCode) -// { -// continue; -// } - -// var messagesResponse = JsonSerializer.Deserialize(await messagesResponseMessage.Content.ReadAsStringAsync()); - -// using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString)) -// using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString)) -// { -// var addedMessages = messagesResponse.Messages -// .Where(m => broCtx.Messages.Any(x => x.BrokerId == masterServer.BrokerId && x.Id == m.Id) == false) -// .ToList(); - -// addedMessages.ForEach(x => -// { -// broCtx.Messages.Add(new Db.Model.Message -// { -// BrokerId = masterServer.BrokerId, -// Id = x.Id -// }); - -// apiCtx.Messages.Add(new MessengerApi.Db.Entities.Message -// { -// Id = x.Id, -// CreatedUtc = x.CreatedUtc, -// From = x.From != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.From.Value)) ?? null : null, -// To = x.To != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.To.Value)) ?? null : null, -// IsAcknowledged = x.IsAcknowledged, -// IsDelivered = x.IsDelivered, -// Payload = x.Payload, -// PayloadId = x.PayloadId, -// PayloadLifespanInSeconds = x.PayloadLifespanInSeconds, -// PayloadTimestamp = x.PayloadTimestamp, -// PayloadType = x.PayloadType, -// }); -// }); - -// var existingMessages = messagesResponse.Messages -// .Except(addedMessages) -// .ToList(); - -// existingMessages.ForEach(x => -// { -// var existing = apiCtx.Messages.SingleOrDefault(a => a.Id == x.Id); - -// if(existing != null && (existing.IsDelivered != x.IsDelivered || existing.IsAcknowledged != x.IsAcknowledged)) -// { -// existing.IsDelivered = x.IsDelivered; -// existing.IsAcknowledged = x.IsAcknowledged; -// } -// }); - -// broCtx.SaveChanges(); -// apiCtx.SaveChanges(); -// } -// } -// } - -// private async Task RemoveUser(Guid id, BrokerDbContext broCtx, MessengerDbContext apiCtx) -// { -// var broUser = await broCtx.Users.SingleOrDefaultAsync(x => x.Id == id); - -// if (broUser != null) -// { -// broCtx.Users.Remove(broUser); -// } - -// var apiUser = await apiCtx.Users.SingleOrDefaultAsync(x => x.Id == id); - -// if (apiUser != null) -// { -// apiCtx.Users.Remove(apiUser); -// } -// } - -// private async Task AddUser(Model.Http.Users.UsersResponse.User user, Guid brokerId, BrokerDbContext broCtx, MessengerDbContext apiCtx) -// { -// if (broCtx.Users.GetUserExists(user.Id) == false) -// { -// broCtx.Users.Add(new Db.Model.User -// { -// Id = user.Id, -// BrokerId = brokerId -// }); -// } - -// if (apiCtx.Users.Any(x => x.Id == user.Id)) -// { -// await this.UpdateUser(user, apiCtx); -// } -// else -// { -// await apiCtx.Users.AddAsync(new MessengerApi.Db.Entities.User -// { -// Id = user.Id, -// ApiKey = user.ApiKey, -// CanReceive = user.CanReceive, -// CanSend = user.CanSend, -// IsEnabled = user.IsEnabled, -// Name = user.Name -// }); -// } -// } - -// private async Task UpdateUser(Model.Http.Users.UsersResponse.User user, MessengerDbContext apiCtx) -// { -// var apiUser = await apiCtx.Users.SingleAsync(x => x.Id == user.Id); -// apiUser.ApiKey = user.ApiKey; -// apiUser.CanReceive = user.CanReceive; -// apiUser.CanSend = user.CanSend; -// apiUser.IsEnabled = user.IsEnabled; -// apiUser.Name = user.Name; -// } -// } -//} \ No newline at end of file diff --git a/code/MessengerBroker/Handlers/MasterServerSynchronizationHandler.cs b/code/MessengerBroker/Handlers/MasterServerSynchronizationHandler.cs new file mode 100644 index 0000000..b124e81 --- /dev/null +++ b/code/MessengerBroker/Handlers/MasterServerSynchronizationHandler.cs @@ -0,0 +1,299 @@ +using MessengerApi.Db; +using MessengerBroker.Configuration.Model; +using MessengerBroker.Configuration.Model.Servers; +using MessengerBroker.Db; +using MessengerBroker.Factories; +using Microsoft.EntityFrameworkCore; +using System.Text.Json; + +namespace MessengerBroker.Handlers +{ + public class MasterServerSynchronizationHandler + { + private readonly HttpClient httpClient = new HttpClient(); + private readonly ILogger logger; + private readonly BrokerConfiguration brokerConfiguration; + private readonly BrokerDbContextFactory brokerDbContextFactory; + private readonly MessengerApi.Factories.DbContextFactory messengerDbContextFactory; + + public MasterServerSynchronizationHandler( + ILogger logger, + BrokerConfiguration brokerConfiguration, + BrokerDbContextFactory brokerDbContextFactory, + MessengerApi.Factories.DbContextFactory messengerDbContextFactory) + { + this.logger = logger; + this.brokerConfiguration = brokerConfiguration; + this.brokerDbContextFactory = brokerDbContextFactory; + this.messengerDbContextFactory = messengerDbContextFactory; + } + + public Task BeginSyncing(MasterServer server, CancellationToken ct = default) + { + var userSyncTask = Task.Run(async () => + { + while (!ct.IsCancellationRequested) + { + logger.Trace($"Executing {nameof(this.PullUsersFromMasterServer)} at {server}."); + + // Run small updates of last 10 seconds of messages always. + try + { + await this.PullUsersFromMasterServer(server); + } + catch (Exception ex) + { + this.logger.Error($"Error during user pull from server {server}.", ex); + } + + await Task.Delay(TimeSpan.FromSeconds(60)); + } + }); + + var fastSyncTask = Task.Run(async () => + { + while (!ct.IsCancellationRequested) + { + logger.Trace($"Executing (fast) {nameof(this.PullMessagesFromMasterServer)} at {server}."); + + // Run small updates of last 10 seconds of messages always. + try + { + await PullMessagesFromMasterServer(server, DateTime.UtcNow.AddSeconds(-10)); + } + catch(Exception ex) + { + this.logger.Error($"Error during fast message pull from server {server}.", ex); + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + }); + + var slowSyncTask = Task.Run(async () => + { + while (!ct.IsCancellationRequested) + { + logger.Trace($"Executing (slow) {nameof(this.PullMessagesFromMasterServer)} at {server}."); + + // Run large updates every once in a while. + try + { + DateTime? startTimeUtc = DateTime.UtcNow.AddMinutes(-this.brokerConfiguration.HousekeepingMessageAgeInMinutes); + while (!ct.IsCancellationRequested && startTimeUtc != null) + { + startTimeUtc = await PullMessagesFromMasterServer(server, startTimeUtc.Value); + } + } + catch (Exception ex) + { + this.logger.Error($"Error during slow message pull from server {server}.", ex); + } + + await Task.Delay(TimeSpan.FromMinutes(1)); + } + }); + + return Task.WhenAll(userSyncTask, fastSyncTask, slowSyncTask); + } + + private async Task PullUsersFromMasterServer(MasterServer server) + { + this.logger.Info($"Pulling user data from {server.ToString()}."); + + var usersRequest = new HttpRequestMessage(HttpMethod.Get, $"{server.BrokerUrl}/users"); + usersRequest.Headers.Authorization = + new System.Net.Http.Headers.AuthenticationHeaderValue( + "Bearer", + this.brokerConfiguration.BrokerId.ToString()); + + var usersResponseMessage = await this.httpClient.SendAsync(usersRequest); + + if (!usersResponseMessage.IsSuccessStatusCode) + { + this.logger.Error($"Can't pull data from master server: {server.ToString()}." + usersResponseMessage); + } + + var usersResponse = JsonSerializer + .Deserialize( + await usersResponseMessage.Content.ReadAsStringAsync()); + + using (var broCtx = this.brokerDbContextFactory.CreateDbContext()) + using (var apiCtx = this.messengerDbContextFactory.CreateDbContext()) + { + var updatedUsers = usersResponse.Users.Where(x => broCtx.Users.Any(bu => bu.Id == x.Id)).ToList(); + updatedUsers.ForEach(async x => await UpdateUser(x, apiCtx)); + + var addedUsers = usersResponse.Users.Where(x => !broCtx.Users.Any(bu => bu.Id == x.Id)).ToList(); + addedUsers.ForEach(async x => await AddUser(x, server.BrokerId, broCtx, apiCtx)); + + var deletedUsers = broCtx.Users.Where(x => !usersResponse.Users.Any(ur => ur.Id == x.Id)).ToList(); + deletedUsers.ForEach(async x => await RemoveUser(x.Id, broCtx, apiCtx)); + + apiCtx.UserRoutes + .Include(x => x.From) + .Where(x => broCtx.Users.Any(bu => bu.Id == x.From.Id)) + .ToList() + .Where(x => !usersResponse.UserRoutes.Any(ur => ur.Id == x.Id)) + .ToList() + .ForEach(x => + { + this.logger.Info($"Removing UserRoute {x.Id}."); + apiCtx.UserRoutes.Remove(x); + }); + + usersResponse.UserRoutes + .Where(x => !apiCtx.UserRoutes.Any(ur => ur.Id == x.Id)) + .ToList() + .ForEach(x => + { + this.logger.Info($"Adding UserRoute {x.Id}"); + apiCtx.UserRoutes.Add(new MessengerApi.Db.Entities.UserRoute + { + Id = x.Id, + From = apiCtx.Users.Single(u => u.Id == x.FromId), + To = apiCtx.Users.Single(u => u.Id == x.ToId) + }); + }); + + broCtx.SaveChanges(); + apiCtx.SaveChanges(); + } + + async Task RemoveUser(Guid id, BrokerDbContext broCtx, MessengerDbContext apiCtx) + { + this.logger.Info($"Removing user {id}."); + + var broUser = await broCtx.Users.SingleOrDefaultAsync(x => x.Id == id); + + if (broUser != null) + { + broCtx.Users.Remove(broUser); + } + + var apiUser = await apiCtx.Users.SingleOrDefaultAsync(x => x.Id == id); + + if (apiUser != null) + { + apiCtx.Users.Remove(apiUser); + } + } + + async Task AddUser(Model.Http.Users.UsersResponse.User user, Guid brokerId, BrokerDbContext broCtx, MessengerDbContext apiCtx) + { + this.logger.Info($"Adding user {user.Id}."); + + if (broCtx.Users.Any(x => x.Id == user.Id) == false) + { + broCtx.Users.Add(new Db.Model.User + { + Id = user.Id, + BrokerId = brokerId + }); + } + + if (apiCtx.Users.Any(x => x.Id == user.Id)) + { + await UpdateUser(user, apiCtx); + } + else + { + await apiCtx.Users.AddAsync(new MessengerApi.Db.Entities.User + { + Id = user.Id, + ApiKey = user.ApiKey, + IsEnabled = user.IsEnabled, + Name = user.Name + }); + } + } + + async Task UpdateUser(Model.Http.Users.UsersResponse.User user, MessengerDbContext apiCtx) + { + this.logger.Info($"Updating user {user.Id}"); + + var apiUser = await apiCtx.Users.SingleAsync(x => x.Id == user.Id); + apiUser.ApiKey = user.ApiKey; + apiUser.IsEnabled = user.IsEnabled; + apiUser.Name = user.Name; + } + } + + private async Task PullMessagesFromMasterServer(MasterServer server, DateTime lastMessageCreatedUtc) + { + var messagesRequest = new HttpRequestMessage( + HttpMethod.Get, + $"{server.BrokerUrl}/messages?sinceUtc={lastMessageCreatedUtc:o}"); + + messagesRequest.Headers.Authorization = + new System.Net.Http.Headers.AuthenticationHeaderValue( + "Bearer", + this.brokerConfiguration.BrokerId.ToString()); + + var messagesResponseMessage = await this.httpClient.SendAsync(messagesRequest); + + if (!messagesResponseMessage.IsSuccessStatusCode) + { + this.logger.Error($"Can't pull message data from {server}."); + } + + var messagesResponse = JsonSerializer + .Deserialize( + await messagesResponseMessage.Content.ReadAsStringAsync()); + + this.logger.Debug($"Pulled {messagesResponse.Messages.Count()} messages."); + + using (var broCtx = this.brokerDbContextFactory.CreateDbContext()) + using (var apiCtx = this.messengerDbContextFactory.CreateDbContext()) + { + var addedMessages = messagesResponse.Messages + .Where(m => broCtx.Messages.Any(x => x.Id == m.Id) == false) + .ToList(); + + addedMessages.ForEach(x => + { + broCtx.Messages.Add(new Db.Model.Message + { + BrokerId = server.BrokerId, + Id = x.Id + }); + + apiCtx.Messages.Add(new MessengerApi.Db.Entities.Message + { + Id = x.Id, + CreatedUtc = x.CreatedUtc, + FromId = apiCtx.Users.Single(u => u.Id == x.FromId).Id, + ToId = apiCtx.Users.Single(u => u.Id == x.ToId).Id, + IsAcknowledged = x.IsAcknowledged, + IsDelivered = x.IsDelivered, + Payload = x.Payload, + TimeToLiveInSeconds = x.TimeToLiveInSeconds, + PayloadType = x.PayloadType, + }); + }); + + var existingMessages = messagesResponse.Messages + .Except(addedMessages) + .ToList(); + + existingMessages.ForEach(x => + { + var existing = apiCtx.Messages.SingleOrDefault(a => a.Id == x.Id); + + if (existing != null && (existing.IsDelivered != x.IsDelivered || existing.IsAcknowledged != x.IsAcknowledged)) + { + existing.IsDelivered = x.IsDelivered; + existing.IsAcknowledged = x.IsAcknowledged; + } + }); + + broCtx.SaveChanges(); + apiCtx.SaveChanges(); + } + + return messagesResponse?.Messages? + .OrderByDescending(x => x.CreatedUtc) + .FirstOrDefault()?.CreatedUtc; + } + } +} \ No newline at end of file diff --git a/code/MessengerBroker/Program.cs b/code/MessengerBroker/Program.cs index 0f54a30..30bbb2a 100644 --- a/code/MessengerBroker/Program.cs +++ b/code/MessengerBroker/Program.cs @@ -39,6 +39,8 @@ namespace MessengerBroker { return new MessengerApi.Factories.DbContextFactory(configuration.ApiPersistenceConfiguration); }); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddScoped(); builder.Services.AddScoped(); @@ -86,30 +88,24 @@ namespace MessengerBroker } } - //// Housekeeping. - //if (configuration.HousekeepingEnabled) - //{ - // _ = Task.Run(async () => - // { - // while (true) - // { - // await app.Services.GetService().RemoveOldMessages(); - // await Task.Delay(TimeSpan.FromMinutes(1)); - // } - // }); - //} + // Housekeeping. + if (configuration.HousekeepingEnabled) + { + _ = app.Services.GetRequiredService().BeginHousekeeping(); + } //// Run pull sync from masters. - //_ = Task.Run(async () => - //{ - // var cts = new CancellationTokenSource(); - // var handler = app.Services.GetService(); + if(configuration.MasterServers.Any()) + { + var handler = app.Services.GetRequiredService(); + var logger = app.Services.GetRequiredService(); - // foreach (var master in settings.Masters) - // { - // _ = handler.BeginSyncingWithMaster(master, cts.Token); - // } - //}); + foreach (var server in configuration.MasterServers) + { + logger.Info($"Starting sync task for {server}."); + _ = handler.BeginSyncing(server); + } + } app.UseStaticFiles(); app.UseRouting(); diff --git a/code/MessengerBroker/Properties/launchSettings.json b/code/MessengerBroker/Properties/launchSettings.json index 2132c2a..4264a3b 100644 --- a/code/MessengerBroker/Properties/launchSettings.json +++ b/code/MessengerBroker/Properties/launchSettings.json @@ -10,6 +10,8 @@ "BROKER_ID": "15D97720-F5B7-47AA-9C1A-71F98B0B9248", "MASTER_SERVERS": "F696442B-E8DC-4074-B34F-94BCECE8E74B,http://localhost:1234,test1;D1B65834-53BA-45F8-A1E1-66C91C7CFCA9,http://localhost:5678,test2", "SLAVE_SERVERS": "20E4B747-EC58-4071-8AD9-25521F3C4013,test3" + "HOUSEKEEPING_ENABLED": "true", + "HOUSEKEEPING_MESSAGE_AGE_IN_MINUTES": "180" }, "dotnetRunMessages": true, "applicationUrl": "http://localhost:5048"