Axon 参考指南
  • 介绍
  • 架构概览
    • DDD & CQRS 概念
    • 事件溯源
    • 事件驱动的微服务
  • Axon Server
  • 发行说明
    • Axon Framework
      • Major Releases
      • Minor Releases
    • Axon Server
      • Major Releases
      • Minor Releases Standard Edition
      • Minor Releases Enterprise Edition
    • Axon Framework Extensions
      • AMQP
        • Major Releases
      • CDI
        • Major Releases
      • JGroups
        • Major Releases
      • Kafka
        • Major Releases
        • Minor Releases
      • Kotlin
        • Experimental Releases
      • Mongo
        • Major Releases
        • Minor Releases
      • Reactor
        • Major Releases
        • Minor Releases
      • Spring Cloud
        • Major Releases
        • Minor Releases
      • Tracing
        • Major Releases
        • Minor Releases
  • Getting Started
    • 快速开始
  • Axon Framework
    • 介绍
    • 消息传递概念
      • 消息剖析
      • 消息关联
      • 消息拦截
      • 支持带注解的处理程序
      • 异常处理
      • 工作单元
    • 命令
      • 建模
        • 聚合
        • 多实体聚合
        • 聚合状态存储
        • 从另一个聚合创建聚合
        • 聚合多态性
        • 解决冲突
      • 命令调度器
      • 命令处理程序
      • 基础设施
      • 配置
    • 事件
      • 事件调度器
      • 事件处理程序
      • 事件处理器
        • 订阅事件处理器
        • 流式事件处理器
      • 事件总线和事件存储
      • 事件版本控制
    • 查询
      • 查询处理
      • 查询调度器
      • 查询处理程序
      • 实现
      • 配置
    • 长时处理过程(Sagas)
      • 实现
      • 关联
      • 基础设施
    • Deadlines
      • Deadline Managers
      • Event Schedulers
    • 测试
      • 命令 / 事件
      • 长时处理过程(Sagas)
    • 序列化
    • 调整
      • 事件快照
      • 事件处理
      • 命令处理
    • 监控和指标
    • Spring Boot 集成
    • 模块
  • Axon Server
    • 介绍
    • 安装
      • 本地安装
        • Axon Server SE
        • Axon Server EE
      • Docker / K8s
        • Axon Server SE
        • Axon Server EE
    • 管理
      • 配置
        • System Properties
        • Command Line Interface
        • REST API
        • GRPC API
      • Monitoring
        • Actuator Endpoints
        • gRPC Metrics
        • Heartbeat Monitoring
      • Clusters
      • Replication Groups
      • Multi-Context
      • Tagging
      • Backup and Messaging-only Nodes
      • Backups
      • Recovery
      • Plugins
      • Error Codes
    • 安全
      • SSL
      • 访问控制
      • 访问控制 - 标准版
      • 访问控制 - 企业版
      • 访问控制 - 客户端应用程序
      • 访问控制 - 命令行
      • 访问控制 - REST API
      • 访问控制 - LDAP
      • 访问控制 - OAuth 2.0
    • 性能
      • 事件段
      • 流量控制
    • 迁移
      • Standard to Enterprise Edition
      • Non-Axon Server to Axon Server
  • Extensions
    • Spring AMQP
    • JGroups
    • Kafka
    • Kotlin
    • Mongo
    • Reactor
      • Reactor Gateways
    • Spring Cloud
    • Tracing
  • Appendices
    • A. RDBMS Tuning
    • B. Message Handler Tuning
      • 参数解析器
      • 处理程序增强
    • C. 元数据注解
    • D. 标识符生成
    • E. Axon Server Query Language
由 GitBook 提供支持
在本页
  • Configuration in Spring Boot
  • Reactor Command Gateway
  • Reactor Query Gateway
  • Subscription queries
  • Reactor Event Gateway
  • Interceptors
  • Reactor Dispatch Interceptors
  • Reactor Result Handler Interceptors
  1. Extensions
  2. Reactor

Reactor Gateways

上一页Reactor下一页Spring Cloud

最后更新于2年前

The "Reactive Gateways" offer a reactive API wrapper around the command, query and event bus. Most of the operations are similar to those from non-reactive gateways, simply replacing the CompletableFuture with either a Mono or Flux. In some cases, the API is expended to ease use of common reactive patterns.

Reactor doesn't allow null values in streams. Any null value returned from the handler will be mapped to .

Retrying operations

All operations support Reactor's retry mechanism:

reactiveQueryGateway.query(query, ResponseType.class).retry(5);

This call will retry sending the query a maximum of five times when it fails.

Configuration in Spring Boot

This extension can be added as a Spring Boot starter dependency to your project using group id org.axonframework.extensions.reactor and artifact id axon-reactor-spring-boot-starter. The implementation of the extension can be found .

Reactor Command Gateway

This section describes the methods on the ReactorCommandGateway.

send - Sends the given command once the caller subscribes to the command result. Returns immediately.

A common pattern is using the REST API to send a command. In this case it is recommend to for example use , and return the command result Mono directly to the controller:

class SpringCommandController {

    private final ReactorCommandGateway reactiveCommandGateway; 
    
    @PostMapping
    public Mono<CommandHandlerResponseBody> sendCommand(@RequestBody CommandBody command) {
        return reactiveCommandGateway.send(command);
    }
}

Sending a command from a Spring WebFlux Controller.

If the command handling function returns type void, Mono<CommandHandlerResponseBody> should be replaced with Mono<Void>

Another common pattern is "send and forget":

class CommandDispatcher {

    private final ReactorCommandGateway reactiveCommandGateway;
    
    public void sendAndForget(MyCommand command) {
         reactiveCommandGateway.send(command)
                               .subscribe();
    }
}

Function that sends a command and returns immediately without waiting for the result.

sendAll - This method uses the given Publisher of commands to dispatch incoming commands.

This operation is available only in the Reactor extension. Use it to connect 3rd party streams that delivers commands.

class CommandPublisher {

    private final ReactorCommandGateway reactiveCommandGateway;
    
    @PostConstruct
    public void startReceivingCommands(Flux<CommandBody> inputStream) {
        reactiveCommandGateway.sendAll(inputStream)
                              .subscribe();
    }
}

Connects an external input stream directly to the Reactor Command Gateway.

The sendAll operation will keep sending commands until the input stream is canceled.

Reactor Query Gateway

query - Sends the given query, expecting a response in the form of responseType from a single source.

class SpringQueryController {
    
    private final ReactorQueryGateway reactiveQueryGateway;

    // The query's Mono is returned to the Spring controller. Subscribe control is given to Spring Framework.
    @GetMapping
    public Mono<SomeResponseType> findAll(FindAllQuery query, Class<SomeResponseType> responseType) {
        return reactiveQueryGateway.query(query, responseType);
    }
}

Recommended way of using the Reactor query gateway within a Spring REST controller.

scatterGather - Sends the given query, expecting a response in the form of responseType from several sources within a specified duration.

class SpringQueryController {
    
    private final ReactorQueryGateway reactiveQueryGateway;

    @GetMapping
    public Flux<SomeResponseType> findMany(FindManyQuery query) {
        return reactiveQueryGateway.scatterGather(query, SomeResponseType.class, Duration.ofSeconds(5)).take(3);
    }
}

Sends a given query that stops after receiving three results, or after 5 seconds.

Subscription queries

Firstly, the Reactor API for subscription queries in Axon is not new.

However, we noticed several patterns which are often used, such as:

  • Concatenating initial results with query updates in a single stream, or

  • skipping the initial result all together.

As such, the Reactor Extension provides several methods to ease usage of these common patterns.

subscriptionQuery - Sends the given query, returns the initial result and keeps streaming incremental updates until a subscriber unsubscribes from the Flux.

Note that this method should be used when the response type of the initial result and incremental update match.

Flux<ResultType> resultFlux = reactiveQueryGateway.subscriptionQuery("criteriaQuery", ResultType.class);

The above invocation through the ReactorQueryGateway is equivalent to:

class SubscriptionQuerySender {
    
    private final ReactorQueryGateway reactiveQueryGateway;
    
    public Flux<SomeResponseType> sendSubscriptionQuery(SomeQuery query, Class<SomeResponseType> responseType) {
        return reactiveQueryGateway.subscriptionQuery(query, responseType, responseType)
                                   .flatMapMany(result -> result.initialResult()
                                                                .concatWith(result.updates())
                                                                .doFinally(signal -> result.close()));
    }   
}

subscriptionQueryMany - Sends the given query, returns the initial result and keeps streaming incremental updates until a subscriber unsubscribes from the Flux.

This operation should be used when the initial result contains multiple instances of the response type which needs to be flattened. Additionally, the response type of the initial response and incremental updates need to match.

Flux<ResultType> resultFlux = reactiveQueryGateway.subscriptionQueryMany("criteriaQuery", ResultType.class);

The above invocation through the ReactorQueryGateway is equivalent to:

class SubscriptionQuerySender {
    
    private final ReactorQueryGateway reactiveQueryGateway;
    
    public Flux<SomeResponseType> sendSubscriptionQuery(SomeQuery query, Class<SomeResponseType> responseType) {
        return reactiveQueryGateway.subscriptionQuery(query,
                                                      ResponseTypes.multipleInstancesOf(responseType),
                                                      ResponseTypes.instanceOf(responseType))
                                   .flatMapMany(result -> result.initialResult()
                                                                .flatMapMany(Flux::fromIterable)
                                                                .concatWith(result.updates())
                                                                .doFinally(signal -> result.close()));
    }
}

queryUpdates - Sends the given query and streams incremental updates until a subscriber unsubscribes from the Flux.

This method could be used when subscriber is only interested in updates.

Flux<ResultType> updatesOnly = reactiveQueryGateway.queryUpdates("criteriaQuery", ResultType.class);

The above invocation through the ReactorQueryGateway is equivalent to:

class SubscriptionQuerySender {
    
    private final ReactorQueryGateway reactiveQueryGateway;
    
    public Flux<SomeResponseType> sendSubscriptionQuery(SomeQuery query, Class<SomeResponseType> responseType) {
        return reactiveQueryGateway.subscriptionQuery(query, ResponseTypes.instanceOf(Void.class), responseType)
                                   .flatMapMany(result -> result.updates()
                                                                .doFinally(signal -> result.close()));
    }
}

In the above shown methods, the subscription query is closed automatically after a subscriber has unsubscribed from the Flux. When using the regular QueryGateway, the subscription query needs to be closed manually however.

Reactor Event Gateway

Reactive variation of the EventGateway. Provides support for reactive return types such as Flux.

publish - Publishes the given events once the caller subscribes to the resulting Flux.

class EventPublisher {
    
    private final ReactorEventGateway reactiveEventGateway;
    
    // Register a dispatch interceptor to modify the event messages
    public EventPublisher() {
        reactiveEventGateway.registerDispatchInterceptor(
            eventMono -> eventMono.map(event -> GenericEventMessage.asEventMessage("intercepted" + event.getPayload()))
        );
    }
    
    public void publishEvent() {
        Flux<Object> result = reactiveEventGateway.publish("event");
    }   
}

Example of dispatcher modified events, returned to user as the result Flux.

Interceptors

These interceptors allow us to centrally define rules and filters that will be applied to a message stream.

Interceptors will be applied in order they have been registered to the given component.

Reactor Dispatch Interceptors

The ReactorMessageDispatchInterceptor should be used to centrally apply rules and validations for outgoing messages. Note that a ReactorMessageDispatchInterceptor is an implementation of the default MessageDispatchInterceptor interface used throughout the framework. The implementation of this interface is described as follows:

@FunctionalInterface
public interface ReactorMessageDispatchInterceptor<M extends Message<?>> extends MessageDispatchInterceptor<M> {

    Mono<M> intercept(Mono<M> message);

    @Override
    default BiFunction<Integer, M, M> handle(List<? extends M> messages) {
        return (position, message) -> intercept(Mono.just(message)).block();
    }
}

It thus defaults the MessageDispatchInterceptor#handle(List<? extends M> method to utilize the ReactorMessageDispatchInterceptor#intercept(Mono<M>) method. As such, a ReactorMessageDispatchInterceptor could thus be configured on a plain Axon gateway too. Here are a couple of examples how a message dispatch interceptor could be used:

class ReactorConfiguration {

    public void registerDispatchInterceptor(ReactorCommandGateway reactiveGateway) {
        reactiveGateway.registerDispatchInterceptor(
            msgMono -> msgMono.map(msg -> msg.andMetaData(Collections.singletonMap("key1", "value1")))
        );
    }
}

Dispatch interceptor that adds key-value pairs to the message's MetaData.

class ReactorConfiguration {

    public void registerDispatchInterceptor(ReactorEventGateway reactiveGateway) {
        reactiveGateway.registerDispatchInterceptor(
            msgMono -> msgMono.filterWhen(v -> Mono.subscriberContext()
                              .filter(ctx-> ctx.hasKey("security"))
                              .map(ctx->ctx.get("security")))
        );
    }
}

Dispatch interceptor that discards the message, based on a security flag in the Reactor Context.

Reactor Result Handler Interceptors

The ReactorResultHandlerInterceptor should be used to centrally apply rules and validations for incoming messages, a.k.a. results. The implementation of this interface is described as follows:

@FunctionalInterface
public interface ReactorResultHandlerInterceptor<M extends Message<?>, R extends ResultMessage<?>> {
    
    Flux<R> intercept(M message, Flux<R> results);
}

The parameters are the message that has been sent, and a Flux of results from that message, which is going to be intercepted. The message parameter can be useful if you want to apply a given result rule only for specific messages. Here are a couple of examples how a message result interceptor could be used:

This type of interceptor is available only in the Reactor Extension.

class ReactorConfiguration {

    public void registerResultInterceptor(ReactorQueryGateway reactiveGateway) {
        reactiveGateway.registerResultHandlerInterceptor(
            (msg, results) -> results.filter(r -> !r.getPayload().equals("blockedPayload"))
        );
    }
}

Result interceptor which discards all results that have a payload matching blockedPayload

class ReactorConfiguration {

    public void registerResultInterceptor(ReactorQueryGateway reactiveGateway) {
        reactiveQueryGateway.registerResultHandlerInterceptor(
            (query, results) -> results.flatMap(r -> {
                if (r.getPayload().equals("")) {
                    return Flux.<ResultMessage<?>>error(new RuntimeException("no empty strings allowed"));
                } else {
                    return Flux.just(r);
                }
            })
        );
    }
}

Result interceptor which validates that the query result does not contain an empty String.

class ReactorConfiguration {

    public void registerResultInterceptor(ReactorQueryGateway reactiveGateway) {
        reactiveQueryGateway.registerResultHandlerInterceptor(
            (q, results) -> results.filter(it -> !((boolean) q.getQueryName().equals("myBlockedQuery")))
        );
    }
}

Result interceptor which discards all results where the queryName matches myBlockedQuery.

class ReactorConfiguration {

    public void registerResultInterceptor(ReactorCommandGateway reactiveGateway) {
        reactiveGateway.registerResultHandlerInterceptor(
            (msg,results) -> results.timeout(Duration.ofSeconds(30))
        );
    }
}

Result interceptor which limits the result waiting time to thirty seconds per message.

class ReactorConfiguration {

    public void registerResultInterceptor(ReactorCommandGateway reactiveGateway) {
        reactiveGateway.registerResultHandlerInterceptor(
            (msg,results) -> results.log().take(5)
        );
    }
}

Result interceptor which limits the number of results to five entries, and logs all results.

send and sendAll do not offer any backpressure, yet. The only backpressure mechanism in place is that commands will be sent sequentially; thus once the result of a previous command arrives. The number of commands is prefetched from an incoming stream and stored in a buffer for sending (see ). This slows down sending, but does not guarantee that the Subscriber will not be overwhelmed with commands if they are sent too fast.

This method returns events that were published. Note that the returned events may be different from those the user has published, granted an has been registered which modifies events.

Axon provides a notion of . The Reactor gateways allow for similar interceptor logic, namely the ReactorMessageDispatchInterceptor and ReactorResultHandlerInterceptor.

Mono#empty()
here
WebFlux
Flux#concatMap
interceptors
interceptor