Axon 参考指南
  • 介绍
  • 架构概览
    • DDD & CQRS 概念
    • 事件溯源
    • 事件驱动的微服务
  • Axon Server
  • 发行说明
    • Axon Framework
      • Major Releases
      • Minor Releases
    • Axon Server
      • Major Releases
      • Minor Releases Standard Edition
      • Minor Releases Enterprise Edition
    • Axon Framework Extensions
      • AMQP
        • Major Releases
      • CDI
        • Major Releases
      • JGroups
        • Major Releases
      • Kafka
        • Major Releases
        • Minor Releases
      • Kotlin
        • Experimental Releases
      • Mongo
        • Major Releases
        • Minor Releases
      • Reactor
        • Major Releases
        • Minor Releases
      • Spring Cloud
        • Major Releases
        • Minor Releases
      • Tracing
        • Major Releases
        • Minor Releases
  • Getting Started
    • 快速开始
  • Axon Framework
    • 介绍
    • 消息传递概念
      • 消息剖析
      • 消息关联
      • 消息拦截
      • 支持带注解的处理程序
      • 异常处理
      • 工作单元
    • 命令
      • 建模
        • 聚合
        • 多实体聚合
        • 聚合状态存储
        • 从另一个聚合创建聚合
        • 聚合多态性
        • 解决冲突
      • 命令调度器
      • 命令处理程序
      • 基础设施
      • 配置
    • 事件
      • 事件调度器
      • 事件处理程序
      • 事件处理器
        • 订阅事件处理器
        • 流式事件处理器
      • 事件总线和事件存储
      • 事件版本控制
    • 查询
      • 查询处理
      • 查询调度器
      • 查询处理程序
      • 实现
      • 配置
    • 长时处理过程(Sagas)
      • 实现
      • 关联
      • 基础设施
    • Deadlines
      • Deadline Managers
      • Event Schedulers
    • 测试
      • 命令 / 事件
      • 长时处理过程(Sagas)
    • 序列化
    • 调整
      • 事件快照
      • 事件处理
      • 命令处理
    • 监控和指标
    • Spring Boot 集成
    • 模块
  • Axon Server
    • 介绍
    • 安装
      • 本地安装
        • Axon Server SE
        • Axon Server EE
      • Docker / K8s
        • Axon Server SE
        • Axon Server EE
    • 管理
      • 配置
        • System Properties
        • Command Line Interface
        • REST API
        • GRPC API
      • Monitoring
        • Actuator Endpoints
        • gRPC Metrics
        • Heartbeat Monitoring
      • Clusters
      • Replication Groups
      • Multi-Context
      • Tagging
      • Backup and Messaging-only Nodes
      • Backups
      • Recovery
      • Plugins
      • Error Codes
    • 安全
      • SSL
      • 访问控制
      • 访问控制 - 标准版
      • 访问控制 - 企业版
      • 访问控制 - 客户端应用程序
      • 访问控制 - 命令行
      • 访问控制 - REST API
      • 访问控制 - LDAP
      • 访问控制 - OAuth 2.0
    • 性能
      • 事件段
      • 流量控制
    • 迁移
      • Standard to Enterprise Edition
      • Non-Axon Server to Axon Server
  • Extensions
    • Spring AMQP
    • JGroups
    • Kafka
    • Kotlin
    • Mongo
    • Reactor
      • Reactor Gateways
    • Spring Cloud
    • Tracing
  • Appendices
    • A. RDBMS Tuning
    • B. Message Handler Tuning
      • 参数解析器
      • 处理程序增强
    • C. 元数据注解
    • D. 标识符生成
    • E. Axon Server Query Language
由 GitBook 提供支持
在本页
  • Configuring
  • Error Mode
  1. Axon Framework
  2. 事件
  3. 事件处理器

订阅事件处理器

Subscribing Event Processors

上一页事件处理器下一页流式事件处理器

最后更新于2年前

The SubscribingEventProcessor, or Subscribing Processor for short, is a type of . As any Event Processor, it serves as the technical aspect to handle events by invoking the event handlers written in an Axon application.

The Subscribing Processor defines itself by receiving the events from a SubscribableMessageSource. The SubscribableMessageSource is an infrastructure component to register a Subscribing Processor too.

After registration to the SubscribableMessageSource, the message source gives the events to the SubscribingEventProcessor in the order they are received. Examples of a SubscribableMessageSource are the EventBus or the . Both the EventBus and AMQP Extension are simple message bus solutions for events.

The simple bus solution makes the SubscribableMessageSource and thus the Subscribing Processor an approach to only receive current events. Operations like are, therefore, not an option for any Subscribing Processor as long as the SubscribableMessageSource follows this paradigm.

Furthermore, the message source will use the same thread that receives the events to invoke the registered Subscribing Processors. When the EventBus is, for example, used as the message source, this means that the event publishing thread is the same one handling the event in the Subscribing Processor.

Although this approach deserves a spot within the framework, most scenarios require further decoupling of components by separating the threads as well. When, for example, an application requires event processing parallelization to get a higher performance, this can be a blocker. This predicament is why the SubscribingEventProcessor is not the default in Axon Framework.

Instead, the "Tracking Event Processor" (a implementation) takes up that role. It provides greater flexibility for developers for configuring the event processor in greater detail.

Subscribing Processor Use Cases

Although the SubscribingEventProcessor does not support easy parallelization or replays, there are still scenarios when it is beneficial.

When a model, for example, should be updated within the same thread that published the event, the Subscribing Processor becomes a reasonable solution. In combination with Axon's or extension, some of these concerns are alleviated too, making it a viable option.

Configuring

Other than configuring that an app uses a Subscribing Event Processor, everything is covered . Firstly, to specify that a new Event Processors should default to a SubscribingEventProcessor, you can use the usingSubscribingEventProcessors method:

public class AxonConfig {
    // ...
    public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
        processingConfigurer.usingSubscribingEventProcessors();
    }
}
@Configuration
public class AxonConfig {
    // ...
    @Autowired
    public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
        processingConfigurer.usingSubscribingEventProcessors();
    }
}

For a specific Event Processor to be a Subscribing instance, registerSubscribingEventProcessor is used:

public class AxonConfig {
    // ...
    public void configureSubscribingProcessors(EventProcessingConfigurer processingConfigurer) {
        // To configure a processor to be subscribing ...
        processingConfigurer.registerSubscribingEventProcessor("my-processor")
                            // ... to define a specific SubscribableMessageSource ... 
                            .registerSubscribingEventProcessor("my-processor", conf -> /* create/return SubscribableMessageSource */);
    }
}
@Configuration
public class AxonConfig {
    // ...
    @Autowired
    public void configureSubscribingProcessors(EventProcessingConfigurer processingConfigurer) {
        // To configure a processor to be subscribing ...
        processingConfigurer.registerSubscribingEventProcessor("my-processor")
                            // ... to define a specific SubscribableMessageSource ... 
                            .registerSubscribingEventProcessor("my-processor", conf -> /* create/return SubscribableMessageSource */);
    }
}

A properties file allows the configuration of some fields on an Event Processor. Do note that the Java configuration provides more degrees of freedom.

axon.eventhandling.processors.my-processor.mode=subscribing
axon.eventhandling.processors.my-processor.source=eventBus

If the name of an event processor contains periods ., use the map notation:

axon.eventhandling.processors[my.processor].mode=subscribing
axon.eventhandling.processors[my.processor].source=eventBus

Error Mode

Whenever the rethrows an exception, the SubscribingEventProcessor will have it bubble up to the publishing component of the event. Providing the exception to the event publisher allows the publishing component to deal with it accordingly.

Event Processor
AMQP Extension
AMQP
Kafka
error handler
here
replaying
Streaming Processor