Axon 参考指南
  • 介绍
  • 架构概览
    • DDD & CQRS 概念
    • 事件溯源
    • 事件驱动的微服务
  • Axon Server
  • 发行说明
    • Axon Framework
      • Major Releases
      • Minor Releases
    • Axon Server
      • Major Releases
      • Minor Releases Standard Edition
      • Minor Releases Enterprise Edition
    • Axon Framework Extensions
      • AMQP
        • Major Releases
      • CDI
        • Major Releases
      • JGroups
        • Major Releases
      • Kafka
        • Major Releases
        • Minor Releases
      • Kotlin
        • Experimental Releases
      • Mongo
        • Major Releases
        • Minor Releases
      • Reactor
        • Major Releases
        • Minor Releases
      • Spring Cloud
        • Major Releases
        • Minor Releases
      • Tracing
        • Major Releases
        • Minor Releases
  • Getting Started
    • 快速开始
  • Axon Framework
    • 介绍
    • 消息传递概念
      • 消息剖析
      • 消息关联
      • 消息拦截
      • 支持带注解的处理程序
      • 异常处理
      • 工作单元
    • 命令
      • 建模
        • 聚合
        • 多实体聚合
        • 聚合状态存储
        • 从另一个聚合创建聚合
        • 聚合多态性
        • 解决冲突
      • 命令调度器
      • 命令处理程序
      • 基础设施
      • 配置
    • 事件
      • 事件调度器
      • 事件处理程序
      • 事件处理器
        • 订阅事件处理器
        • 流式事件处理器
      • 事件总线和事件存储
      • 事件版本控制
    • 查询
      • 查询处理
      • 查询调度器
      • 查询处理程序
      • 实现
      • 配置
    • 长时处理过程(Sagas)
      • 实现
      • 关联
      • 基础设施
    • Deadlines
      • Deadline Managers
      • Event Schedulers
    • 测试
      • 命令 / 事件
      • 长时处理过程(Sagas)
    • 序列化
    • 调整
      • 事件快照
      • 事件处理
      • 命令处理
    • 监控和指标
    • Spring Boot 集成
    • 模块
  • Axon Server
    • 介绍
    • 安装
      • 本地安装
        • Axon Server SE
        • Axon Server EE
      • Docker / K8s
        • Axon Server SE
        • Axon Server EE
    • 管理
      • 配置
        • System Properties
        • Command Line Interface
        • REST API
        • GRPC API
      • Monitoring
        • Actuator Endpoints
        • gRPC Metrics
        • Heartbeat Monitoring
      • Clusters
      • Replication Groups
      • Multi-Context
      • Tagging
      • Backup and Messaging-only Nodes
      • Backups
      • Recovery
      • Plugins
      • Error Codes
    • 安全
      • SSL
      • 访问控制
      • 访问控制 - 标准版
      • 访问控制 - 企业版
      • 访问控制 - 客户端应用程序
      • 访问控制 - 命令行
      • 访问控制 - REST API
      • 访问控制 - LDAP
      • 访问控制 - OAuth 2.0
    • 性能
      • 事件段
      • 流量控制
    • 迁移
      • Standard to Enterprise Edition
      • Non-Axon Server to Axon Server
  • Extensions
    • Spring AMQP
    • JGroups
    • Kafka
    • Kotlin
    • Mongo
    • Reactor
      • Reactor Gateways
    • Spring Cloud
    • Tracing
  • Appendices
    • A. RDBMS Tuning
    • B. Message Handler Tuning
      • 参数解析器
      • 处理程序增强
    • C. 元数据注解
    • D. 标识符生成
    • E. Axon Server Query Language
由 GitBook 提供支持
在本页
  • Correlation Data Provider
  • MessageOriginProvider
  • SimpleCorrelationDataProvider
  • MultiCorrelationDataProvider
  • Implementing custom correlation data provider
  • Configuration
  1. Axon Framework
  2. 消息传递概念

消息关联

Message Correlation

上一页消息剖析下一页消息拦截

最后更新于2年前

In messaging systems it is common to group messages together, or correlate them. In Axon Framework a Command message might result in one or several Event messages and a Query message might result in one or several QueryResponse messages. Usually, correlation is implemented using a specific message property, a so called correlation identifier.

Correlation Data Provider

Messages in Axon Framework use the property to transport meta information about the message. The MetaData object is of type Map<String, Object> and is passed around with the message. In order to fill the MetaData of a new message produced within a , a so called CorrelationDataProvider can be used. It is the Unit of Work which is in charge of populating the MetaData of a new message based on this CorrelationDataProvider. Axon Framework currently provides a couple of implementations of this functional interface:

MessageOriginProvider

By default, the MessageOriginProvider is registered as the correlation data provider to use. It is responsible for two values to be passed around from one Message to another, namely the correlationId and traceId. The correlationId of a message always references the identifier of the message it originates from (i.e. the parent message). The traceId on the other hand references to the message identifier which started the chain of messages (i.e. the root message). When neither of these fields are present in the parent message when a new message is created, the message identifier will be used for both. To place this in an example, if you would handle a Command message which in turn publishes an Event message, the Event message's MetaData will be populated based on:

  • The Command message's identifier for the correlationId.

  • The Command message's presence of the traceId in the MetaData or otherwise the message's identifier for the traceId.

SimpleCorrelationDataProvider

The SimpleCorrelationDataProvider is configured to unconditionally copy values of specified keys from one message to metadata of the other. To do so, the constructor of the SimpleCorrelationDataProvider must be called with a list of keys which should be copied. Here is an example how to configure it to copy the myId and myId2 values.

public class Configuration {
    
    public CorrelationDataProvider customCorrelationDataProvider() {
        return new SimpleCorrelationDataProvider("myId", "myId2");
    }
}

MultiCorrelationDataProvider

A MultiCorrelationDataProvider is capable to combine the effect of multiple correlation data providers. To do so, the constructor of the MultiCorrelationDataProvider must be called with a list of providers, as shown in the following sample:

public class Configuration {
    
    public CorrelationDataProvider customCorrelationDataProviders() {
        return new MultiCorrelationDataProvider<CommandMessage<?>>(
            Arrays.asList(
                new SimpleCorrelationDataProvider("someKey"),
                new MessageOriginProvider()
            )
        );
    }
}

Implementing custom correlation data provider

If the predefined providers don't fulfill your requirements, you can always implement your own CorrelationDataProvider. The class must implement the CorrelationDataProvider interface, as is depicted in the following example:

public class AuthCorrelationDataProvider implements CorrelationDataProvider {
    
    private final Function<String, String> usernameProvider;
    
    public AuthCorrelationDataProvider(Function<String, String> userProvider) {
        this.usernameProvider = userProvider;
    }
    
    @Override
    public Map<String, ?> correlationDataFor(Message<?> message) {
        Map<String, Object> correlationData = new HashMap<>();
        if (message instanceof CommandMessage<?>) {
            if (message.getMetaData().containsKey("authorization")) {
                String token = (String) message.getMetaData().get("authorization");
                correlationData.put("username", usernameProvider.apply(token));
            }
        }
        return correlationData;
    }
}

Configuration

When the default MessageOriginProvider isn't sufficient for your use case, the (custom) correlation data providers need to be registered inside you application. If you are using the Axon Configuration API, make sure to call Configuration#configureCorrelationDataProviders method to register the correlated data providers. If you are relying on Spring Boot Autoconfiguration, just provide a factory method exposing the Spring Bean, or multiple if you need more than one provider. The following snippets shows some possible approaches of registration:

public class Configuration {
    
    public void configuring() {
        Configurer configurer = 
            DefaultConfigurer.defaultConfiguration()
                             .configureCorrelationDataProviders(config -> Arrays.asList(
                                new SimpleCorrelationDataProvider("someKey"),
                                new MessageOriginProvider()
                             ));
    }
}
@Configuration
public class CorrelationDataProviderConfiguration {
    
    // Configuring a single CorrelationDataProvider will automatically override the default MessageOriginProvider
    @Bean
    public CorrelationDataProvider someKeyCorrelationProvider() {
        return new SimpleCorrelationDataProvider("someKey");
    }    

    @Bean
    public CorrelationDataProvider messageOriginProvider() {
        return new MessageOriginProvider();
    }
}
Unit of Work
MetaData