Provides services for messaging, a.k.a. request/response service layer. Typically used areas and classes/interfaces/services: - IMessageProcessor, IMessage, MessageHandlerBase. - Distributed: IMessageBroker, IBrokeredMessage, MessageRouterBase. - Behaviors: IMessagingBehavior, MessagingBehaviorBase. - Events: IEvent. Kephas Framework ("stone" in aramaic) aims to deliver a solid infrastructure for applications and application ecosystems.
$ dotnet add package Kephas.MessagingApplication components and services are just pieces in a big puzzle, requiring in most cases to interact and communicate. The simplest interaction is when inside a component the counterpart is identified, either directly or by the means of [[dependency injection|Composition and Dependency Injection]], and the required API method is called. This implies that the consumer knows which provider handles the message and that both components, consumer and provider, live in the same process.
The messaging infrastructure Kephas provides addresses these key issues:
To anticipate a little bit, using the messaging infrastructure is as simple as that:
var message = new PingMessage();
var pingBack = await messageProcessor.ProcessAsync(message).PreserveThreadContext();
var serverTime = pingBack.ServerTime;
So, there is no information whatsoever about who processes the message, only the message processor taking the responsibility of carrying this to a good end.
The message is the content of communication. A message:
IMessage. This is only a marker interface so that messages may be identified as such and no other kind of objects are communicated.The message type plays an important role, as it is used to filter message handlers (more about this in the [[message handlers|Architecture-of-messaging#Message handlers]] section. However, there are so called weakly typed messages, which hold actually a category of messages that for some reason cannot be strongly typed, typically used in distributed scenarios and coming from external sources. These weakly typed messages will provide a name, used for discriminating the message handlers, retrieved as:
Events are a special case of messages targeting the Publisher/Subscriber pattern. An event:
IEvent. This is also a marker interface inheriting from IMessage.A message handler is an [[application service|Application Services]] processing a message of a given type and, optionally, name.
IMessageHandler<TMessage> application service contract.ProcessAsync method where the message handling takes place.
ProcessAsync(message: TMessage, context: IMessageProcessingContext, token: CancellationToken): Task<IMessage>[MessageName] attribute, to indicate the name of the handled message in case of weakly typed messages.Typically, a message handler specializes the MessageHandlerBase<TMessage, TResponseMessage> base class, which takes care of basic message type checks and provides an overridable typed ProcessAsync method.
Note: message handlers are NOT singleton application services. This is by design, so that any resources hold during the message processing can be disposed freely at the end. Therefore they can be safely implemented as statefull.
It is the sole responsibility of the message processor to make sure that the handler receives the appropriate messages which it is registered for. The message handler assumes it receives only messages of the declared type and name.
The service taking care of message processing is the message processor. It is a [[singleton application service|Application-Services#shared-scope-shared-or-instance-based-services]] which is in charge of selecting the appropriate handler (or handlers) for a particular message, ensuring that that (or those) handlers handle the message, and returning a response. It provides a single method:
ProcessAsync(message: IMessage, [context: IMessageProcessingContext], [token: CancellationToken]): Task<IMessage>
Kephas provides a default implementation, see the [[default message processor|In-Process-messaging#the-defaultmessageprocessor]] for more information about it.
The typical message bus implementations do not filter the message handlers, leaving them the responsibility of handling or not handling a particular message. In Kephas, this responsibility is delegated to the handler selectors, which decide if and how the handlers are filtered before letting them handle a message. They provide two methods for interaction with the message processor:
CanHandle(messageType: Type, messageName: string): boolean: Indicates whether the selector can handle the indicated message type. This is the method by which the selectors are requested to indicate whether they are in charge of providing the handlers for a specific message type and name.GetHandlersFactory(messageType: Type, messageName: string): Func<IEnumerable<IMessageHandler>>: Gets a factory which retrieves the components handling messages of the given type.EventMessageHandlerSelectorThis is a handler selector for [[events|Architecture-of-messaging#Events]]. It returns all the handlers processing the specific event type and name, ordered by their processing priority.
Note: it has the
Lowprocessing priority, so that custom code can modify easily the strategy of selecting event handlers.
DefaultMessageHandlerSelectorThis is the fallback handler selector for generic messages. It returns a single handler processing the specific message type and name, in the order of override and processing priority. If two or more handlers have the same override and processing priority, an AmbiguousMetchException occurs.
Note: this selector has the lowest processing priority, being the fallback for messages not handled by any other selector.
The processing of a message may be intercepted be message processing behaviors, before and after the actual handler invocation. They can do various things, like auditing, authorization checking, or whatever other functionality may be needed. Behaviors are [[singleton application services|Application-Services#shared-scope-shared-or-instance-based-services]], so they should no store any processing information, but use exclusively the processing context for such scenarios. They implement the contract IMessageProcessingBehavior which provides two methods:
BeforeProcessAsync(context: IMessageProcessingContext, token: CancellationToken): Task: Interception called before invoking the handler to process the message.AfterProcessAsync(context: IMessageProcessingContext, token: CancellationToken): Task: Interception called after invoking the handler to process the message.
DefaultMessageProcessorKephas provides the DefaultMessageProcessor class, a [[low override priority|Application-Services#override-priority]] message processor. It provides a basic, overridable functionality for processing messages, which should be enough for most cases.
ProcessAsync call.Note that the message processor is an in-process service. However, if the handlers themselves go beyond the process boundary it is their sole responsibility and, at the same time, freedom.
The flow of the ProcessAsync implementation is as follows:
true is delegated to provide the handlers, the rest are ignored.Note: if the handler throws an exception, the processing filters are called in the after method. with the exception information set in the context. However, if a filter throws an exception during the processing, it interrupts the flow and the exception is thrown to the caller. This is by design, so that, for example, authorization filters are able to interrupt the processing flow.
The responsibility of the message processor during execution is to ensure that the provided message gets handled by one or more handlers, and that the behaviors are properly called; however it remains agnostic to the semantics of the message itself. The authorization check is ensured by behaviors:
EnsureAuthorizedMessageProcessingBehavior takes the job of ensuring that the handling of the message is authorized.
Messages may be secured by decorating them with the [RequiresPermission] attribute.
Example:
/// <summary>
/// Message for importing a hierarchy node.
/// </summary>
[RequiresPermission(typeof(IExportImportPermission))]
public class ImportHierarchyMessage : EntityActionMessage
{
// ...
}