构建响应式Workflow应用

# 前言

阅读本文需要

  • 函数式编程
  • 响应式编程
  • Spring Boot、Project Reactor

Workflow,即为工作流。其需要对输入数据源(pull数据、push数据)通过串行和并行的工作链进行处理,最后输出结果(打印、储存……)

S(ource) -> workflow -> R(esult)

Workflow的基本工作单位WorkUnit代表一个单独的处理单元,为工作流程中的最小工作单元。

image-20200220213238550

# 特征

一条完整Workflow有以下要求。

  • 并行处理正交WorkUnit
  • 串行处理前后依赖的WorkUnit
  • 处理数据储存在一个单独的上下文中

# 抽象

以抖音发布短视频为例子。当用户拍摄并编辑完毕短视频之后,点击发布按钮触发视频上传。但是,在视频可以被检索和推荐之前,需要有很多工作需要完成。例如,视频转码,视频内容识别,视频打标签,视频内容审核(政治、色情、),视频入库……

使用Workflow抽象该案例,上传视频后,即往视频处理链投入视频Source,然后创建工作流程上下文Context正式开始工作流处理。

定义基本工作单元

  • WorkUnit1第一步为视频转码,需要在所有工作之前进行。
  • WorkUnit2第二步为视频内容识别,这一步为后续提供一些必要基本信息。
  • WorkUnit3第三步为视频打上标签,例如美女、舞蹈、搞笑、段子……
  • ParallelWorkUnit4A第四步为视频审核,这一步有很多但是互不相关的审核策略,完全可以并行化处理
  • ParallelWorkUnit4B并行策略B
  • ParallelWorkUnit4C并行策略C

FinalContextConsumer 最后一步为视频入库,入库后的视频既可以被检索然后交由Feed流推荐系统投喂给其他用户

image-20200220213354361

每日会有成千上万个视频流入该数据处理流,即Workflow是对数据流中的每一个数据进行处理。我们换一个视觉表述Workflow

----o---o---o----->
    |   |   |
    ----------
---| workflow |----
    ----------
    |   |   |
----x---x---x----->

很直观的,o为每一条上传的数据源,x为经过数据流处理完毕的结果,从左到右为时间线的发展。 最终细化如下。

----o-----o-----o------->
    |     |     |
----c-----c-----c-------> map to Context
    |     |     |
----c1----c1----c1------> WorkUnit1
    |     |     |
----c2----c2----c2------> WorkUnit2
    |     |     |
----c3----c3----c3------> WorkUnit3
    |     |     |
----ccc---ccc---ccc-----> WorkUnit4A + WorkUnit4B + WorkUnit4C
    |     |     |
----x-----x-----x------->

# 设计

于是,完整的工作流程抽象完毕,可以开始设计代码。

首先定义视频源的类型,例如定义为VideoRequest。于是数据源流的类型为Publisher<VideoRequest>,对于[n:m]的视频源流来讲,会是Flux<VideoRequest>

工作流的结果应该是一个包含源Request,并且有所有相关的处理完毕信息的单一Context,于是工作流的返回值是[0:1]Publisher<Context>,具体为Mono<Context>

伪代码如下

Mono<Context> contextMono = ((Flux<VideoRequest>)source)
    .map(request -> Context.of("request", request))
    .map(videoWorkflow::apply);

# 工作单元定义

每一个工作单元都是根据传入的上下文进行工作,并处理成新的上下文返回,于是它为一个UnaryOperator,具体类型为UnaryOperator<Context>,为其定义别名WorkUnit

@FunctionalInterface
public interface WorkUnit extends UnaryOperator<Context> {
}

# 工作流定义

以串行工作流中进行并行工作流的方式设计,于是工作流程类型为Flux<ParallelFlux<WorkUnit>>

于串行工作流中,每一个工作单元都依赖上一个工作单元的结果,最终返回最后一个工作单元处理的上下文。为典型的reduceOperator于是代码可以如下实现

Flux<ParallelFlux<WorkUnit>> workflow;

Mono<Context> contextMono = ((Flux<VideoRequest>)source)
    .map(request -> Context.of("request", request))
    .flatmap(context -> (Mono<Context>) workflow
             .reduce(
                 context,
                 parallelWorkflow::apply
             ));

而并行工作流中,所有工作单元可以根据输入的Context并行处理,在最终聚合为一个Context结果即可。

由于reduce中的聚合函数为异步函数,所以reduce的State必须为一个Publisher。不然需要在该异步函数的最后加上block()语句。在响应式编程中,一般是不允许阻塞的。

于是代码可以做如下细化,其中toProcessor为将数据流转换为hot stream,在响应式数据流中cold/hot stream的性质不同,由于这里reduce操作的State是一个Publisher,如果使用cold stream会使得上游数据流重复被订阅触发(见marked注释处)。这一步使用cache()也可,原理和结果应该是一样的(不保证底层)。

Flux<ParallelFlux<WorkUnit>> workflow;

Mono<Context> contextMono = ((Flux<VideoRequest>)source)
    .map(request -> Context.of("request", request))
    .flatmap(context -> (Mono<Context>) workflow
             .reduce(
                 Mono.just(context),
                 (Mono<Context> contextMono, ParallelFlux<WorkUnit> parallelFlux) ->
                     (Mono<Context>) parallelWorkflow
                         .flatMap(contextMono::map) // marked
                         .reduce(Context::putAll)
                         .toProcessor() // .cache() can also
             )
             .flatMap(Function.identity())
    );

reduce的过程如下面运算,在reduce操作过程中,如果使用cold stream会导致重复计算之前的函数,这在Workflow中是不允许(没有必要,且性能浪费),尽管在纯函数中它是合法且不会出错的。于是,turn it hot,或者cache掉运算结果,能够保证程序往预想的工作流抽象运行。

state0 = F(x)
state1 = H(x) = h1(F(x)) + h2(F(x)) + ... + h?(F(x)) = h1(state0) + h2(state0) + ... + h?(state0)
state2 = I(x) = i1(H(x)) + i2(H(x)) + ... + i?(H(x)) = i1(state1) + i2(state1) + ... + i?(state1)
...
stateN = N(x) = ...

return stateN

到此,工作流程就基本实现完毕了。

# Spring Boot Starter支持

设计预想的配置方式为:业务开发者开发WorkUnit并注册成Bean,然后通过properties配置具体的一条或多条工作流。

下面为配置两条不同的工作流样例。

blog:
  lohoknang:
    workflow:
      reactive:
        enabled: true
        definitions:
          workflow1:
            - workUnit1
            - workUnit3A
            - workUnit3B
          workflow2:
            - workUnit2
            - workUnit3A
            - workUnit3C

# AutoConfiguration

根据配置的设计,开发自动配置类WorkflowDefinitionAutoConfiguration。其中,需要获取ApplicationContext以注册Bean实例,需要将配置读取到definitions

@Configuration
@ConditionalOnProperty(value = "blog.lohoknang.workflow.reactive.enabled", havingValue = "true")
@ConfigurationProperties("blog.lohoknang.workflow.reactive")
public class WorkflowDefinitionAutoConfiguration implements ApplicationContextAware {
    private GenericApplicationContext applicationContext;
    
    @Getter
    @Setter
    private Map<String, List<List<String>>> definitions;
    
    @PostConstruct
    public void postConstruct() {
        
    }
    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (GenericApplicationContext) applicationContext;
    }
}

postConstruct注册工作流。注意创建Workflow Flux的时候,将stream collect掉后再创建Flux。因为如果使用Flux.fromStream()操作符,将会导致该stream flux第一次使用完毕后即被关闭。这里不一定使用list flux,一般只要符合can be re-used就可。

public void postConstruct() {
    Objects.requireNonNull(definitions, "Invalid workflow definition");
    Map<String, WorkUnit> workUnitMap = applicationContext.getBeansOfType(WorkUnit.class);

    definitions
            .entrySet()
            .stream()
            .map(entry -> new WorkflowDefinition(
                    entry.getKey(),
                    Flux
                            .fromIterable(entry
                                    .getValue()
                                    .stream()
                                    .map(list -> Flux
                                            .fromIterable(list
                                                    .stream()
                                                    .map(workUnitMap::get)
                                                    .collect(Collectors.toList())
                                            )
                                            .parallel())
                                        .collect(Collectors.toList()))
            ))
            .forEach(workflowDefinition -> applicationContext.registerBean(
                    workflowDefinition.getName(),
                    WorkflowDefinition.class,
                    () -> workflowDefinition
            ));
}

# WorkflowDefinition

是一个包装类,对基本信息包装一下而已。

@Data
@AllArgsConstructor
public class WorkflowDefinition {
    private String name;
    private Flux<ParallelFlux<WorkUnit>> workflowDefinitionFlux;
}

# WorkUnit

如上文定义

@FunctionalInterface
public interface WorkUnit extends UnaryOperator<Context> {
}

# WorkflowFactory

为具体的完整的Workflow的实现,代码如上文。用于绑定数据源和Workflow。

public class WorkflowFactory {
    public static final String REQUEST_KEY = "blog.lohoknang.workflow.reacitve.request";

    public static <T> Flux<Context> create(Flux<T> source, WorkflowDefinition workFlowDefinition) {
        return source
                .map(request -> Context.of(REQUEST_KEY, request))
                .flatMap(context -> workFlowDefinition
                        .getWorkflowDefinitionFlux()
                        .reduce(
                                MonoProcessor.just(context),
                                (contextMono, parallelFlux) -> parallelFlux
                                        .flatMap(contextMono::map)
                                        .reduce(Context::putAll)
                                        .toProcessor()
                        )
                        .flatMap(Function.identity())
                );
    }
}

# 使用案例

在Spring Boot应用中如何使用该框架。

# 配置

先是完成,由开发者实现并注册的工作单元WorkUnit

@Configuration
static class TestConfig {
    @Bean
    public WorkUnit workUnit1() {
        return context -> context.put("workUnit1", "workUnit1");
    }

    @Bean
    public WorkUnit workUnit2() {
        return context -> context.put("workUnit2", "workUnit2");
    }

    @Bean
    public WorkUnit workUnit3A() {
        return context -> context.put("workUnit3A", "workUnit3A");
    }

    @Bean
    public WorkUnit workUnit3B() {
        return context -> context.put("workUnit3B", "workUnit3B");
    }

    @Bean
    public WorkUnit workUnit3C() {
        return context -> context.put("workUnit3C", "workUnit3C");
    }
}

接着配置Workflow Definition

blog:
  lohoknang:
    workflow:
      reactive:
        enabled: true
        definitions:
          workflow1:
            - workUnit1
            - workUnit3A
            - workUnit3B
          workflow2:
            - workUnit2
            - workUnit3A
            - workUnit3C

# 使用

定义服务TestService测试Workflow

@Service
class TestService {
    @Resource
    private WorkflowDefinition workflow1;
    
    @Resource
    private WorkflowDefinition workflow2;
    
    public void test() {
        WorkflowFactory.create(Flux.just("video1", "video2"), workflow1).log().subscribe();
        WorkflowFactory.create(Flux.just("video1", "video2"), workflow2).log().subscribe();
    }
}

# 测试

调用test方法,测试结果如下

2020-02-20 19:29:58.435  INFO 8644 --- [           main] reactor.Flux.FlatMap.1                   : onSubscribe(FluxFlatMap.FlatMapMain)
2020-02-20 19:29:58.437  INFO 8644 --- [           main] reactor.Flux.FlatMap.1                   : request(unbounded)
2020-02-20 19:29:58.487  INFO 8644 --- [           main] reactor.Flux.FlatMap.1                   : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video1, workUnit1=workUnit1, workUnit3A=workUnit3A, workUnit3B=workUnit3B})
2020-02-20 19:29:58.489  INFO 8644 --- [           main] reactor.Flux.FlatMap.1                   : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video2, workUnit1=workUnit1, workUnit3A=workUnit3A, workUnit3B=workUnit3B})
2020-02-20 19:29:58.489  INFO 8644 --- [           main] reactor.Flux.FlatMap.1                   : onComplete()
2020-02-20 19:29:58.489  INFO 8644 --- [           main] reactor.Flux.FlatMap.2                   : onSubscribe(FluxFlatMap.FlatMapMain)
2020-02-20 19:29:58.489  INFO 8644 --- [           main] reactor.Flux.FlatMap.2                   : request(unbounded)
2020-02-20 19:29:58.490  INFO 8644 --- [           main] reactor.Flux.FlatMap.2                   : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video1, workUnit2=workUnit2, workUnit3A=workUnit3A, workUnit3C=workUnit3C})
2020-02-20 19:29:58.491  INFO 8644 --- [           main] reactor.Flux.FlatMap.2                   : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video2, workUnit2=workUnit2, workUnit3A=workUnit3A, workUnit3C=workUnit3C})
2020-02-20 19:29:58.491  INFO 8644 --- [           main] reactor.Flux.FlatMap.2                   : onComplete()

# 代码

案例代码仓库位于Github reactive-workflow (opens new window)