Axon 参考指南
  • 介绍
  • 架构概览
    • DDD & CQRS 概念
    • 事件溯源
    • 事件驱动的微服务
  • Axon Server
  • 发行说明
    • Axon Framework
      • Major Releases
      • Minor Releases
    • Axon Server
      • Major Releases
      • Minor Releases Standard Edition
      • Minor Releases Enterprise Edition
    • Axon Framework Extensions
      • AMQP
        • Major Releases
      • CDI
        • Major Releases
      • JGroups
        • Major Releases
      • Kafka
        • Major Releases
        • Minor Releases
      • Kotlin
        • Experimental Releases
      • Mongo
        • Major Releases
        • Minor Releases
      • Reactor
        • Major Releases
        • Minor Releases
      • Spring Cloud
        • Major Releases
        • Minor Releases
      • Tracing
        • Major Releases
        • Minor Releases
  • Getting Started
    • 快速开始
  • Axon Framework
    • 介绍
    • 消息传递概念
      • 消息剖析
      • 消息关联
      • 消息拦截
      • 支持带注解的处理程序
      • 异常处理
      • 工作单元
    • 命令
      • 建模
        • 聚合
        • 多实体聚合
        • 聚合状态存储
        • 从另一个聚合创建聚合
        • 聚合多态性
        • 解决冲突
      • 命令调度器
      • 命令处理程序
      • 基础设施
      • 配置
    • 事件
      • 事件调度器
      • 事件处理程序
      • 事件处理器
        • 订阅事件处理器
        • 流式事件处理器
      • 事件总线和事件存储
      • 事件版本控制
    • 查询
      • 查询处理
      • 查询调度器
      • 查询处理程序
      • 实现
      • 配置
    • 长时处理过程(Sagas)
      • 实现
      • 关联
      • 基础设施
    • Deadlines
      • Deadline Managers
      • Event Schedulers
    • 测试
      • 命令 / 事件
      • 长时处理过程(Sagas)
    • 序列化
    • 调整
      • 事件快照
      • 事件处理
      • 命令处理
    • 监控和指标
    • Spring Boot 集成
    • 模块
  • Axon Server
    • 介绍
    • 安装
      • 本地安装
        • Axon Server SE
        • Axon Server EE
      • Docker / K8s
        • Axon Server SE
        • Axon Server EE
    • 管理
      • 配置
        • System Properties
        • Command Line Interface
        • REST API
        • GRPC API
      • Monitoring
        • Actuator Endpoints
        • gRPC Metrics
        • Heartbeat Monitoring
      • Clusters
      • Replication Groups
      • Multi-Context
      • Tagging
      • Backup and Messaging-only Nodes
      • Backups
      • Recovery
      • Plugins
      • Error Codes
    • 安全
      • SSL
      • 访问控制
      • 访问控制 - 标准版
      • 访问控制 - 企业版
      • 访问控制 - 客户端应用程序
      • 访问控制 - 命令行
      • 访问控制 - REST API
      • 访问控制 - LDAP
      • 访问控制 - OAuth 2.0
    • 性能
      • 事件段
      • 流量控制
    • 迁移
      • Standard to Enterprise Edition
      • Non-Axon Server to Axon Server
  • Extensions
    • Spring AMQP
    • JGroups
    • Kafka
    • Kotlin
    • Mongo
    • Reactor
      • Reactor Gateways
    • Spring Cloud
    • Tracing
  • Appendices
    • A. RDBMS Tuning
    • B. Message Handler Tuning
      • 参数解析器
      • 处理程序增强
    • C. 元数据注解
    • D. 标识符生成
    • E. Axon Server Query Language
由 GitBook 提供支持
在本页
  • Assigning handlers to processors
  • Event Handler Assignment Rules
  • Ordering Event Handlers within a processor
  • Error Handling
  • Processing Group - Listener Invocation Error Handler
  • Event Processor - Error Handler
  • General processor configuration
  • Event Processor Builders
  • Event Handler Interceptors
  • Message Monitors
  • Transaction Management
  1. Axon Framework
  2. 事件

事件处理器

Event Processors

上一页事件处理程序下一页订阅事件处理器

最后更新于2年前

define the business logic to be performed when an event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a and possibly a transaction. However, they also ensure that can be correctly attached to all messages created during event processing, among other non-functional requirements.

The image below depicts a representation of the organization of Event Processors and Event Handlers:

Axon has a layered approach towards organizing event handlers. First, an event handler is positioned in a Processing Group. Each event handler, or "Event Handling Component," will only ever belong to a single Processing Group. The Processing Group provides a level of configurable non-functional requirements, like and the .

Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. Streaming Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.

Assigning handlers to processors

All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name are considered as two instances of the same processor.

Now let us consider that the following event handlers have been registered:

  • org.axonframework.example.eventhandling.MyHandler

  • org.axonframework.example.eventhandling.MyOtherHandler

  • org.axonframework.example.eventhandling.module.ModuleHandler

Without any intervention, this will trigger the creation of two processors, namely:

  1. org.axonframework.example.eventhandling with two handlers called MyHandler and MyOtherHandler

  2. org.axonframework.example.eventhandling.module with the single handler ModuleHandler

The ProcessingGroup annotation requires the insertion of a name and can only be set on the class. Let us adjust the previous sample by using this annotation instead of the package names for grouping handlers:

@ProcessingGroup("my-handlers")
class MyHandler {
    // left out event handling functions...
}

@ProcessingGroup("my-handlers")
class MyOtherHandler{
    // ...
}

@ProcessingGroup("module-handlers")
class ModuleHandler {
    // ...
}

Using the ProcessingGroup annotation as depicted, we again construct two processors:

  1. my-handlers with two handlers called MyHandler and MyOtherHandler

  2. module-handlers with the single handler ModuleHandler

Event Handler Assignment Rules

The Configuration API allows you to configure other strategies for assigning event handling classes to processors or assigning specific handler instances to particular processors. We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group to Event Processor. Below is an exhaustive list of all the assignment rules the EventProcessingConfigurer exposes:

Event Handler to Processing Group

  • byDefaultAssignTo(String) - defines the default Processing Group name to assign an event handler to. It will only be taken into account if there are no more specifics rules and if the ProcessingGroup annotation is not present.

  • byDefaultAssignHandlerInstancesTo(Function<Object, String>) - defines a lambda invoked to assign an event handling instance to a desired Processing Group by returning that group's name. It will only be taken into account if there are no more specifics rules and if the ProcessingGroup annotation is not present.

  • byDefaultAssignHandlerTypesTo(Function<Class<?>, String>) - defines a lambda invoked to assign an event handler type to a desired Processing Group by returning that group's name. It will only be taken into account if there are no more specifics rules and if the ProcessingGroup annotation is not present.

  • assignHandlerInstancesMatching(String, Predicate<Object>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance. The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is undefined.

  • assignHandlerTypesMatching(String, Predicate<Class<?>>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type. The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is undefined.

  • assignHandlerInstancesMatching(String, int, Predicate<Object>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance. Uses the given priority to decide on rule-ordering. The higher the priority value, the more important the rule is. If an instance matches several criteria, the outcome is undefined.

  • assignHandlerTypesMatching(String, int, Predicate<Class<?>>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type. Uses the given priority to decide on rule-ordering. The higher the priority, the more important the rule is. If an instance matches several criteria, the outcome is undefined.

Processing Group to Event Processor

  • assignProcessingGroup(String, String) - defines a given Processing Group name that belongs to the given Event Processor's name.

  • assignProcessingGroup(Function<String, String>) - defines a lambda invoked to assign a Processing Group name to the desired Event Processor by returning that processor's name.

Ordering Event Handlers within a processor

If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the @Order annotation. This annotation is placed on the event handler class name, containing an integer value to specify the ordering.

Note that it is not possible to order event handlers belonging to different Event Processors. Each Event Processor acts as an isolated component without any intervention from other Event Processors.

Ordering Considerations

Although we can place an order among event handlers within an Event Processor, separation of event handlers is recommended.

Placing an overall ordering on event handlers means those components are inclined to interact with one another, introducing a form of coupling. Due to this, the event handling process will become complex to manage (e.g., for new team members). Furthermore, embracing an ordering approach might lead to place all event handlers in a global ordering, decreasing processing speeds in general.

In all, you are free to use an ordering, but we recommend using it sparingly.

Error Handling

Errors are inevitable in any application. Depending on where they happen, you may want to respond differently.

To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with exceptions:

Processing Group - Listener Invocation Error Handler

The component dealing with exceptions thrown from an event handling method is called the ListenerInvocationErrorHandler. By default, these exceptions are logged (with the LoggingErrorHandler implementation), and processing continues with the next handler or message.

The default ListenerInvocationErrorHandler used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group:

public class AxonConfig {
    // ...
    public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) {
        // To configure a default ...
        processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */)
                            // ... or for a specific processing group: 
                            .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */);
    }
}
@Configuration
public class AxonConfig {
    // ...
    @Autowired
    public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) {
        // To configure a default ...
        processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */)
                            // ... or for a specific processing group: 
                            .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */);
    }
}

It is easy to implement custom error handling behavior. The error handling method to implement provides the exception, the event that was handled, and a reference to the handler that was handling the message:

public interface ListenerInvocationErrorHandler {

    void onError(Exception exception, 
                 EventMessage<?> event, 
                 EventMessageHandler eventHandler) throws Exception;
}

Event Processor - Error Handler

Exceptions occurring outside an event handler's scope, or have bubbled up from there, are handled by the ErrorHandler. The default error handler is the PropagatingErrorHandler, which will rethrow any exceptions it catches.

We can configure a default ErrorHandler for all Event Processors or an ErrorHandler for specific processors:

public class AxonConfig {
    // ...
    public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) {
        // To configure a default ...
        processingConfigurer.registerDefaultErrorHandler(conf -> /* create error handler */)
                            // ... or for a specific processor: 
                            .registerErrorHandler("my-processor", conf -> /* create error handler */);
    }
}
@Configuration
public class AxonConfig {
    // ...
    @Autowired
    public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) {
        // To configure a default ...
        processingConfigurer.registerDefaultErrorHandler(conf -> /* create error handler */)
                            // ... or for a specific processor: 
                            .registerErrorHandler("my-processor", conf -> /* create error handler */);
    }
}

For providing a custom solution, the ErrorHandler's single method needs to be implemented:

public interface ErrorHandler {

    void handleError(ErrorContext errorContext) throws Exception;
}

Based on the provided ErrorContext object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception.

General processor configuration

Event Processor Builders

The EventProcessingConfigurer provides access to a lot of configurable components for Event Processors. Sometimes it is easier or preferable to provide an entire function to construct an Event Processor, however. To that end, we can configure a custom EventProcessorBuilder:

@FunctionalInterface
interface EventProcessorBuilder {

    // Note: the `EventHandlerInvoker` is the component which holds the event handling functions.
    EventProcessor build(String name, 
                         Configuration configuration, 
                         EventHandlerInvoker eventHandlerInvoker);
}

The EventProcessorBuilder functional interface provides the event processor's name, the Configuration and the EventHandlerInvoker, and requires returning an EventProcessor instance. Note that any Axon component that an Event Processor requires (e.g., an EventStore) is retrievable from the Configuration.

The EventProcessingConfigurer provides two methods to configure an EventProcessorBuilder:

  1. registerEventProcessorFactory(EventProcessorBuilder) - allows you to define a default factory method that creates event processors for which no explicit factories are defined

  2. registerEventProcessor(String, EventProcessorBuilder) - defines the factory method to use to create a processor with given name

Event Handler Interceptors

The EventProcessingConfigurer provides two methods to configure MessageHandlerInterceptor instances:

  • registerDefaultHandlerInterceptor(BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>>) - registers a default MessageHandlerInterceptor that will be configured on every Event Processor instance

  • registerHandlerInterceptor(String, Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>) - registers a MessageHandlerInterceptor that will be configured for the Event Processor matching the given String

Message Monitors

The EventProcessingConfigurer provides two approaches towards configuring a MessageMonitor:

  • registerMessageMonitor(String, Function<Configuration, MessageMonitor<Message<?>>>) - registers the given MessageMonitor to the Event Processor matching the given String

  • registerMessageMonitorFactory(String, MessageMonitorFactory) - registers the given MessageMonitorFactory to construct a MessageMonitor for the Event Processor matching the given String

The MessageMonitorFactory provides a more fine-grained approach, used throughout the Configuration API, to construct a MessageMonitor:

@FunctionalInterface
public interface MessageMonitorFactory {
    
    MessageMonitor<Message<?>> create(Configuration configuration, 
                                      Class<?> componentType, 
                                      String componentName);
}

We can use the Configuration to retrieve the required dependencies to construct the MessageMonitor. The type and name reflect which infrastructure component the factory constructs a monitor for. Whenever you use the MessageMonitorFactory to construct a MessageMonitor for an Event Processor, the factory expects the componentType to be an EventProcessor implementation. The componentName, on the other hand, would resemble the name of the Event Processor.

Transaction Management

As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration options. Note that in the majority of the scenarios, the defaults will suffice. This section simply serves to show these options to allow adjustment if the application requires it.

The Event Processors, in turn, is in charge of the Processing Group. An Event Processor will control 1 to N Processing Groups, although there will be a one-to-one mapping in most cases. Similar to the Event Handling Component, a Processing Group will belong to a single Event Processor. This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a more fine-grained degree of .

Event Processors come in roughly two forms: and .

For more specifics on either type, consult their respective sections and . The rest of this page dedicates itself to describing the Event Processor's common concepts and configuration options. Note that throughout, the EventProcessingConfigurer is used. The EventProcessingConfigurer is part of Axon's Configuration API, dedicated to configuring Event Processors.

All event handlers are attached to a processor whose name by default is the package name of the event handler's class. Furthermore, the default processor implementation used by Axon is the . The (default) event processor used can be adjusted, as is shown in the and sections.

Event handlers, or Event Handling Components, come in roughly two flavors: "regular" (singleton, stateless) event handlers and . section describes the process to register an event handler, whereas page describes the saga registration process.

Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the Processing Group is recommended. The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using the ProcessingGroup annotation. This annotation resembles the Processing Group level discussed in the .

If more control is required to group Event Handling Components, we recommend consulting the section.

To order event handlers within an Event Processor, the order in which event handlers are registered (as described in the section) is guiding. Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their insertion ordering in the Configuration API.

By default, exceptions raised by event handlers are caught in the , logged, and processing continues with the following events. When an exception is thrown when a processor is trying to commit a transaction, update a , or in any other part of the process, the exception will be propagated.

In the case of a , this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds). A will report a publication error to the component that provided the event.

You can choose to retry, ignore or rethrow the exception. The exception will bubble up to the when rethrown.

How the Event Processor deals with a rethrown exception differ per implementation. The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found and .

Alongside and , Event Processors allow configuration for other components too. For and Event Processor specific options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor.

Since the Event Processor is the invoker of event handling methods, it is a spot to configure too. Since Event Processors are dedicated to event handling, the MessageHandlerInterceptor is required to deal with an EventMessage. Differently put, an can be registered to Event Processors.

Any Event Processor instance provides the means to contain a Message Monitor. Message Monitors (discussed in more detail ) allow for monitoring the flow of messages throughout an Axon application. For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions.

The first of these is the TransactionManager. Axon uses the TransactionManager to attach a transaction to every . Within a Spring environment, the TransactionManager defaults to a SpringTransactionManager, which uses Spring's PlatformTransactionManager under the hood. In non Spring environments, it would be wise to build a TransactionManager implement if transaction management is required, of course. Such an implementation only requires the definition of the TransactionManager#startTransaction() method. To adjust the transaction manager for an Event Processor, the registerTransactionManager(String, Function<Configuration, TransactionManager>) on the EventProcessingConfigurer should be used.

Secondly, you can adjust the desired RollbackConfiguration per Event Processor. It is the RollbackConfiguration that decide when a should rollback the transaction. The default RollbackConfiguration is to rollback on any type of Throwable; the page describes the other options you can choose. To adjust the default behaviour, the registerRollbackConfiguration(String, Function<Configuration, RollbackConfiguration>) function should be invoked on the EventProcessingConfigurer.

Subscribing
Streaming
here
here
Unit of Work
Unit of Work
Unit of Work
error handling
introduction
assignment rules
Event Processor level
Event handlers
unit of work
error handling
Tracking Event Processor
sagas
Message Handler Interceptors
Processing Group layer
handler assignment
error handling
here
correlation data
Organization of Event Processors and Event Handlers
EventHandlerInterceptor
this
subscribing
Subscribing Event Processor
here
Subscribing
This
Registering Event Handlers
sequencing policy
streaming
token
Streaming Event Processor
here
Streaming