Broker implementation updated.

This commit is contained in:
2025-07-05 08:48:14 +02:00
parent 8735510dfc
commit 91a1f7aa61
22 changed files with 587 additions and 510 deletions

View File

@ -1,28 +0,0 @@
namespace MessengerBroker.Handlers
{
public class AuthHandler
{
private readonly Settings settings;
public AuthHandler(Settings settings)
{
this.settings = settings;
}
public Guid? Auth(HttpContext context)
{
var authHeader = context.Request.Headers["Authorization"].ToString();
if (!string.IsNullOrEmpty(authHeader) && authHeader.StartsWith("Bearer "))
{
var token = authHeader.Substring("Bearer ".Length).Trim();
if (Guid.TryParse(token, out Guid brokerId) && this.settings.Slaves.Any(x => x.BrokerId == brokerId))
{
return brokerId;
}
}
return null;
}
}
}

View File

@ -0,0 +1,78 @@
using MessengerBroker.Configuration.Model;
using MessengerBroker.Models;
using MessengerBroker.Models.Scoped;
using Microsoft.AspNetCore.Authentication;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
using System.Security.Claims;
using System.Text.Encodings.Web;
namespace MessengerBroker.Handlers
{
/// <summary>
/// Validates our permananet API keys sent over as Bearer tokens.
/// </summary>
public class CustomBearerAuthenticationHandler : AuthenticationHandler<AuthenticationSchemeOptions>
{
private readonly IMemoryCache memoryCache;
private readonly BrokerConfiguration configuration;
public CustomBearerAuthenticationHandler(
IOptionsMonitor<AuthenticationSchemeOptions> options,
ILoggerFactory loggerFactory,
UrlEncoder encoder,
IMemoryCache memoryCache,
BrokerConfiguration configuration)
: base(options, loggerFactory, encoder)
{
this.memoryCache = memoryCache;
this.configuration = configuration;
}
protected override Task<AuthenticateResult> HandleAuthenticateAsync()
{
const string HEADER = "Authorization";
const string PREFIX = "Bearer ";
if (!Request.Headers.TryGetValue(HEADER, out var authHeader) ||
!authHeader.ToString().StartsWith(PREFIX))
{
return Task.FromResult(AuthenticateResult.NoResult());
}
var token = authHeader.ToString().Substring(PREFIX.Length).Trim();
if(memoryCache.TryGetValue(token, out CachedIdentity oldCache))
{
var identity = Context.RequestServices.GetRequiredService<Identity>();
identity.Server = oldCache.Server;
return Task.FromResult(AuthenticateResult.Success(new AuthenticationTicket(oldCache.ClaimsPrincipal, Scheme.Name)));
}
else
{
var brokerId = Guid.Parse(token);
var server = configuration.SlaveServers.SingleOrDefault(x => x.BrokerId == brokerId);
var principal = new ClaimsPrincipal(
new ClaimsIdentity(
new List<Claim>
{
new Claim(ClaimTypes.NameIdentifier, token),
new Claim(ClaimTypes.Name, token)
}, Scheme.Name));
var cache = new CachedIdentity
{
ClaimsPrincipal = principal,
Server = server
};
memoryCache.Set(token, cache, TimeSpan.FromMinutes(5));
var identity = Context.RequestServices.GetRequiredService<Identity>();
identity.Server = server;
return Task.FromResult(AuthenticateResult.Success(new AuthenticationTicket(cache.ClaimsPrincipal, Scheme.Name)));
}
}
}
}

View File

@ -1,81 +0,0 @@
using MessengerApi.Db;
using MessengerBroker.Db;
using Microsoft.EntityFrameworkCore;
namespace MessengerBroker.Handlers
{
public class DataHandler
{
private readonly Settings _settings;
public Task<Tuple<MessengerApi.Db.Entities.User[], MessengerApi.Db.Entities.UserRoute[]>> GetLocalUsersAndRoutes()
{
var foreignUserIds = (Guid[])null;
using(var broCtx = new BrokerDbContext(this._settings.MessengerBrokerDbConnectionString))
{
foreignUserIds = broCtx.Users.Select(x => x.Id).ToArray();
}
using (var apiCtx = new MessengerDbContext(this._settings.MessengerApiDbConnectionString))
{
var localUsers = apiCtx.Users
.Where(x => !foreignUserIds.Any(f => f == x.Id))
.ToArray();
var localRoutes = apiCtx.UserRoutes
.Include(x => x.From)
.Include(x => x.To)
.Where(x => localUsers.Any(l => l.Id == x.From.Id) && localUsers.Any(l => l.Id == x.To.Id))
.ToArray();
return Task.FromResult(new Tuple<MessengerApi.Db.Entities.User[], MessengerApi.Db.Entities.UserRoute[]>(
localUsers,
localRoutes));
}
}
private Task<MessengerApi.Db.Entities.User[]> GetForeignUsers(Guid brokerId)
{
var foreignUserIds = (Guid[])null;
using (var broCtx = new BrokerDbContext(this._settings.MessengerBrokerDbConnectionString))
{
foreignUserIds = broCtx.Users.Where(x=>x.BrokerId == brokerId).Select(x => x.Id).ToArray();
}
using (var apiCtx = new MessengerDbContext(this._settings.MessengerApiDbConnectionString))
{
var localUsers = apiCtx.Users
.Where(x => !foreignUserIds.Any(f => f == x.Id))
.ToArray();
return Task.FromResult(localUsers);
}
}
public async Task<MessengerApi.Db.Entities.Message[]> GetMessages(Guid brokerId, DateTime sinceUtc)
{
var userIds = (Guid[])null;
if(brokerId == this._settings.BrokerId)
{
// Our messages.
var users = await this.GetLocalUsersAndRoutes();
userIds = users.Item1.Select(x => x.Id).ToArray();
}
else
{
var users = await this.GetForeignUsers(brokerId);
userIds = users.Select(x => x.Id).ToArray();
}
using (var apiCtx = new MessengerDbContext(this._settings.MessengerApiDbConnectionString))
{
var messages = apiCtx.Messages
.Include(x => x.From).Include(x => x.To)
.Where(x => x.CreatedUtc >= sinceUtc && userIds.Contains(x.From.Id))
.ToArray();
return messages;
}
}
}
}

View File

@ -0,0 +1,49 @@
using MessengerApi.Factories;
using MessengerBroker.Configuration.Model;
using MessengerBroker.Factories;
using MessengerBroker.Model.Http;
using Microsoft.EntityFrameworkCore;
namespace MessengerBroker.Handlers.Endpoint
{
public class MessagesEndpointHandler
{
private readonly BrokerConfiguration configuration;
private readonly BrokerDbContextFactory brokerDbContextFactory;
private readonly DbContextFactory messengerDbContextFactory;
public MessagesEndpointHandler(
BrokerConfiguration configuration,
BrokerDbContextFactory brokerDbContextFactory,
DbContextFactory messengerDbContextFactory)
{
this.configuration = configuration;
this.brokerDbContextFactory = brokerDbContextFactory;
this.messengerDbContextFactory = messengerDbContextFactory;
}
public async Task<Messages.MessagesResponse> GetMessages(Messages.MessagesRequest request)
{
using var broCtx = this.brokerDbContextFactory.CreateDbContext();
var brokerMessageIdCollection = broCtx.Messages
.Where(x => x.BrokerId == request.OwnerBrokerId)
.Select(x => x.Id)
.Take(1000)
.ToArray();
using var apiCtx = this.messengerDbContextFactory.CreateDbContext();
var messages = apiCtx.Messages
.Where(x => x.CreatedUtc >= request.SinceUtc && brokerMessageIdCollection.Contains(x.Id))
.ToArray();
var response = new Messages.MessagesResponse
{
Messages = messages
};
return response;
}
}
}

View File

@ -0,0 +1,65 @@
using MessengerApi.Factories;
using MessengerBroker.Configuration.Model;
using MessengerBroker.Factories;
using MessengerBroker.Model.Http;
using Microsoft.EntityFrameworkCore;
namespace MessengerBroker.Handlers.Endpoint
{
public class UsersEndpointHandler
{
private readonly BrokerConfiguration configuration;
private readonly BrokerDbContextFactory brokerDbContextFactory;
private readonly DbContextFactory messengerDbContextFactory;
public UsersEndpointHandler(
BrokerConfiguration configuration,
BrokerDbContextFactory brokerDbContextFactory,
DbContextFactory messengerDbContextFactory)
{
this.configuration = configuration;
this.brokerDbContextFactory = brokerDbContextFactory;
this.messengerDbContextFactory = messengerDbContextFactory;
}
public Task<Users.UsersResponse> GetUsers()
{
var foreignUserIds = (Guid[])null;
using(var broCtx = this.brokerDbContextFactory.CreateDbContext())
{
foreignUserIds = broCtx.Users.Select(x => x.Id).ToArray();
}
using (var apiCtx = this.messengerDbContextFactory.CreateDbContext())
{
var localUsers = apiCtx.Users
.Where(x => !foreignUserIds.Any(f => f == x.Id))
.ToArray();
var localRoutes = apiCtx.UserRoutes
.Include(x => x.From)
.Include(x => x.To)
.Where(x => localUsers.Any(l => l.Id == x.From.Id) && localUsers.Any(l => l.Id == x.To.Id))
.ToArray();
return Task.FromResult(new Users.UsersResponse
{
Users = localUsers.Select(x => new Users.UsersResponse.User
{
Id = x.Id,
ApiKey = x.ApiKey,
IsEnabled = x.IsEnabled,
Name = x.Name
}).ToArray(),
UserRoutes = localRoutes.Select(x => new Users.UsersResponse.UserRoute
{
Id = x.Id,
FromId = x.From.Id,
ToId = x.To.Id
}).ToArray()
});
}
}
}
}

View File

@ -1,196 +1,196 @@
using MessengerApi.Db;
using MessengerBroker.Db;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
//using MessengerApi.Db;
//using MessengerBroker.Db;
//using Microsoft.EntityFrameworkCore;
//using System.Text.Json;
namespace MessengerBroker.Handlers
{
public class MasterHandler
{
private readonly Settings settings;
//namespace MessengerBroker.Handlers
//{
// public class MasterHandler
// {
// private readonly Settings settings;
private readonly HttpClient httpClient = new HttpClient();
// private readonly HttpClient httpClient = new HttpClient();
public async Task BeginSyncingWithMaster(Settings.MasterServer masterServer, CancellationToken ct = default)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
// public async Task BeginSyncingWithMaster(Settings.MasterServer masterServer, CancellationToken ct = default)
// {
// while (!ct.IsCancellationRequested)
// {
// await Task.Delay(TimeSpan.FromSeconds(1));
var usersRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/users");
usersRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString());
// var usersRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/users");
// usersRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString());
var usersResponseMessage = await this.httpClient.SendAsync(usersRequest, ct);
// var usersResponseMessage = await this.httpClient.SendAsync(usersRequest, ct);
if (!usersResponseMessage.IsSuccessStatusCode)
{
continue;
}
// if (!usersResponseMessage.IsSuccessStatusCode)
// {
// continue;
// }
var usersResponse = JsonSerializer.Deserialize<Model.Http.Users.UsersResponse>(await usersResponseMessage.Content.ReadAsStringAsync());
// var usersResponse = JsonSerializer.Deserialize<Model.Http.Users.UsersResponse>(await usersResponseMessage.Content.ReadAsStringAsync());
using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString))
using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString))
{
var updatedUsers = usersResponse.Users.Where(x => broCtx.Users.GetUserExists(x.Id)).ToList();
updatedUsers.ForEach(async x => await this.UpdateUser(x, apiCtx));
// using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString))
// using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString))
// {
// var updatedUsers = usersResponse.Users.Where(x => broCtx.Users.GetUserExists(x.Id)).ToList();
// updatedUsers.ForEach(async x => await this.UpdateUser(x, apiCtx));
var deletedUsers = broCtx.Users.GetUsersExcept(usersResponse.Users.Select(x => x.Id)).ToList();
deletedUsers.ForEach(async x => await this.RemoveUser(x.Id, broCtx, apiCtx));
// var deletedUsers = broCtx.Users.GetUsersExcept(usersResponse.Users.Select(x => x.Id)).ToList();
// deletedUsers.ForEach(async x => await this.RemoveUser(x.Id, broCtx, apiCtx));
var addedUsers = usersResponse.Users.Where(x => !broCtx.Users.GetUserExists(x.Id)).ToList();
addedUsers.ForEach(async x => await this.AddUser(x, masterServer.BrokerId, broCtx, apiCtx));
// var addedUsers = usersResponse.Users.Where(x => !broCtx.Users.GetUserExists(x.Id)).ToList();
// addedUsers.ForEach(async x => await this.AddUser(x, masterServer.BrokerId, broCtx, apiCtx));
foreach (var route in usersResponse.UserRoutes
.Where(r => apiCtx.UserRoutes
.Include(x => x.From)
.Include(x => x.To)
.Any(apir => apir.Id == r.Id && (apir.From.Id != r.FromId || apir.To.Id != r.ToId))))
{
var existing = apiCtx.UserRoutes.Include(x => x.From).Include(x => x.To).Single(x => x.Id == route.Id);
existing.From = apiCtx.Users.Single(x => x.Id == route.FromId);
existing.To = apiCtx.Users.Single(x => x.Id == route.ToId);
}
// foreach (var route in usersResponse.UserRoutes
// .Where(r => apiCtx.UserRoutes
// .Include(x => x.From)
// .Include(x => x.To)
// .Any(apir => apir.Id == r.Id && (apir.From.Id != r.FromId || apir.To.Id != r.ToId))))
// {
// var existing = apiCtx.UserRoutes.Include(x => x.From).Include(x => x.To).Single(x => x.Id == route.Id);
// existing.From = apiCtx.Users.Single(x => x.Id == route.FromId);
// existing.To = apiCtx.Users.Single(x => x.Id == route.ToId);
// }
foreach (var deletedRoute in apiCtx.UserRoutes
.Where(x => !usersResponse.UserRoutes.Any(r => r.Id == x.Id)))
{
apiCtx.UserRoutes.Remove(deletedRoute);
}
// foreach (var deletedRoute in apiCtx.UserRoutes
// .Where(x => !usersResponse.UserRoutes.Any(r => r.Id == x.Id)))
// {
// apiCtx.UserRoutes.Remove(deletedRoute);
// }
foreach (var addedRoute in usersResponse.UserRoutes
.Where(r => !apiCtx.UserRoutes.Any(x => x.Id == r.Id)))
{
apiCtx.UserRoutes.Add(new MessengerApi.Db.Entities.UserRoute
{
Id = addedRoute.Id,
From = apiCtx.Users.Single(x => x.Id == addedRoute.FromId),
To = apiCtx.Users.Single(x => x.Id == addedRoute.ToId)
});
}
// foreach (var addedRoute in usersResponse.UserRoutes
// .Where(r => !apiCtx.UserRoutes.Any(x => x.Id == r.Id)))
// {
// apiCtx.UserRoutes.Add(new MessengerApi.Db.Entities.UserRoute
// {
// Id = addedRoute.Id,
// From = apiCtx.Users.Single(x => x.Id == addedRoute.FromId),
// To = apiCtx.Users.Single(x => x.Id == addedRoute.ToId)
// });
// }
broCtx.SaveChanges();
apiCtx.SaveChanges();
}
// broCtx.SaveChanges();
// apiCtx.SaveChanges();
// }
var messagesRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/messages");
messagesRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString());
// var messagesRequest = new HttpRequestMessage(HttpMethod.Get, $"{masterServer.BrokerApiUrl}/messages");
// messagesRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this.settings.BrokerId.ToString());
var messagesResponseMessage = await this.httpClient.SendAsync(messagesRequest, ct);
// var messagesResponseMessage = await this.httpClient.SendAsync(messagesRequest, ct);
if (!messagesResponseMessage.IsSuccessStatusCode)
{
continue;
}
// if (!messagesResponseMessage.IsSuccessStatusCode)
// {
// continue;
// }
var messagesResponse = JsonSerializer.Deserialize<Model.Http.Sync.SyncResponse>(await messagesResponseMessage.Content.ReadAsStringAsync());
// var messagesResponse = JsonSerializer.Deserialize<Model.Http.Messages.MessagesResponse>(await messagesResponseMessage.Content.ReadAsStringAsync());
using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString))
using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString))
{
var addedMessages = messagesResponse.Messages
.Where(m => broCtx.Messages.Any(x => x.BrokerId == masterServer.BrokerId && x.Id == m.Id) == false)
.ToList();
// using (var broCtx = new BrokerDbContext(this.settings.MessengerBrokerDbConnectionString))
// using (var apiCtx = new MessengerDbContext(this.settings.MessengerApiDbConnectionString))
// {
// var addedMessages = messagesResponse.Messages
// .Where(m => broCtx.Messages.Any(x => x.BrokerId == masterServer.BrokerId && x.Id == m.Id) == false)
// .ToList();
addedMessages.ForEach(x =>
{
broCtx.Messages.Add(new Db.Model.Message
{
BrokerId = masterServer.BrokerId,
Id = x.Id
});
// addedMessages.ForEach(x =>
// {
// broCtx.Messages.Add(new Db.Model.Message
// {
// BrokerId = masterServer.BrokerId,
// Id = x.Id
// });
apiCtx.Messages.Add(new MessengerApi.Db.Entities.Message
{
Id = x.Id,
CreatedUtc = x.CreatedUtc,
From = x.From != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.From.Value)) ?? null : null,
To = x.To != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.To.Value)) ?? null : null,
IsAcknowledged = x.IsAcknowledged,
IsDelivered = x.IsDelivered,
Payload = x.Payload,
PayloadId = x.PayloadId,
PayloadLifespanInSeconds = x.PayloadLifespanInSeconds,
PayloadTimestamp = x.PayloadTimestamp,
PayloadType = x.PayloadType,
});
});
// apiCtx.Messages.Add(new MessengerApi.Db.Entities.Message
// {
// Id = x.Id,
// CreatedUtc = x.CreatedUtc,
// From = x.From != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.From.Value)) ?? null : null,
// To = x.To != null ? (apiCtx.Users.SingleOrDefault(u => u.Id == x.To.Value)) ?? null : null,
// IsAcknowledged = x.IsAcknowledged,
// IsDelivered = x.IsDelivered,
// Payload = x.Payload,
// PayloadId = x.PayloadId,
// PayloadLifespanInSeconds = x.PayloadLifespanInSeconds,
// PayloadTimestamp = x.PayloadTimestamp,
// PayloadType = x.PayloadType,
// });
// });
var existingMessages = messagesResponse.Messages
.Except(addedMessages)
.ToList();
// var existingMessages = messagesResponse.Messages
// .Except(addedMessages)
// .ToList();
existingMessages.ForEach(x =>
{
var existing = apiCtx.Messages.SingleOrDefault(a => a.Id == x.Id);
// existingMessages.ForEach(x =>
// {
// var existing = apiCtx.Messages.SingleOrDefault(a => a.Id == x.Id);
if(existing != null && (existing.IsDelivered != x.IsDelivered || existing.IsAcknowledged != x.IsAcknowledged))
{
existing.IsDelivered = x.IsDelivered;
existing.IsAcknowledged = x.IsAcknowledged;
}
});
// if(existing != null && (existing.IsDelivered != x.IsDelivered || existing.IsAcknowledged != x.IsAcknowledged))
// {
// existing.IsDelivered = x.IsDelivered;
// existing.IsAcknowledged = x.IsAcknowledged;
// }
// });
broCtx.SaveChanges();
apiCtx.SaveChanges();
}
}
}
// broCtx.SaveChanges();
// apiCtx.SaveChanges();
// }
// }
// }
private async Task RemoveUser(Guid id, BrokerDbContext broCtx, MessengerDbContext apiCtx)
{
var broUser = await broCtx.Users.SingleOrDefaultAsync(x => x.Id == id);
// private async Task RemoveUser(Guid id, BrokerDbContext broCtx, MessengerDbContext apiCtx)
// {
// var broUser = await broCtx.Users.SingleOrDefaultAsync(x => x.Id == id);
if (broUser != null)
{
broCtx.Users.Remove(broUser);
}
// if (broUser != null)
// {
// broCtx.Users.Remove(broUser);
// }
var apiUser = await apiCtx.Users.SingleOrDefaultAsync(x => x.Id == id);
// var apiUser = await apiCtx.Users.SingleOrDefaultAsync(x => x.Id == id);
if (apiUser != null)
{
apiCtx.Users.Remove(apiUser);
}
}
// if (apiUser != null)
// {
// apiCtx.Users.Remove(apiUser);
// }
// }
private async Task AddUser(Model.Http.Users.UsersResponse.User user, Guid brokerId, BrokerDbContext broCtx, MessengerDbContext apiCtx)
{
if (broCtx.Users.GetUserExists(user.Id) == false)
{
broCtx.Users.Add(new Db.Model.User
{
Id = user.Id,
BrokerId = brokerId
});
}
// private async Task AddUser(Model.Http.Users.UsersResponse.User user, Guid brokerId, BrokerDbContext broCtx, MessengerDbContext apiCtx)
// {
// if (broCtx.Users.GetUserExists(user.Id) == false)
// {
// broCtx.Users.Add(new Db.Model.User
// {
// Id = user.Id,
// BrokerId = brokerId
// });
// }
if (apiCtx.Users.Any(x => x.Id == user.Id))
{
await this.UpdateUser(user, apiCtx);
}
else
{
await apiCtx.Users.AddAsync(new MessengerApi.Db.Entities.User
{
Id = user.Id,
ApiKey = user.ApiKey,
CanReceive = user.CanReceive,
CanSend = user.CanSend,
IsEnabled = user.IsEnabled,
Name = user.Name
});
}
}
// if (apiCtx.Users.Any(x => x.Id == user.Id))
// {
// await this.UpdateUser(user, apiCtx);
// }
// else
// {
// await apiCtx.Users.AddAsync(new MessengerApi.Db.Entities.User
// {
// Id = user.Id,
// ApiKey = user.ApiKey,
// CanReceive = user.CanReceive,
// CanSend = user.CanSend,
// IsEnabled = user.IsEnabled,
// Name = user.Name
// });
// }
// }
private async Task UpdateUser(Model.Http.Users.UsersResponse.User user, MessengerDbContext apiCtx)
{
var apiUser = await apiCtx.Users.SingleAsync(x => x.Id == user.Id);
apiUser.ApiKey = user.ApiKey;
apiUser.CanReceive = user.CanReceive;
apiUser.CanSend = user.CanSend;
apiUser.IsEnabled = user.IsEnabled;
apiUser.Name = user.Name;
}
}
}
// private async Task UpdateUser(Model.Http.Users.UsersResponse.User user, MessengerDbContext apiCtx)
// {
// var apiUser = await apiCtx.Users.SingleAsync(x => x.Id == user.Id);
// apiUser.ApiKey = user.ApiKey;
// apiUser.CanReceive = user.CanReceive;
// apiUser.CanSend = user.CanSend;
// apiUser.IsEnabled = user.IsEnabled;
// apiUser.Name = user.Name;
// }
// }
//}