# 前言
阅读本文需要
- 函数式编程
- 响应式编程
- Spring Boot、Project Reactor
Workflow,即为工作流。其需要对输入数据源(pull数据、push数据)通过串行和并行的工作链进行处理,最后输出结果(打印、储存……)
S(ource) -> workflow -> R(esult)
Workflow的基本工作单位WorkUnit
代表一个单独的处理单元,为工作流程中的最小工作单元。
# 特征
一条完整Workflow有以下要求。
- 并行处理正交WorkUnit
- 串行处理前后依赖的WorkUnit
- 处理数据储存在一个单独的上下文中
# 抽象
以抖音发布短视频为例子。当用户拍摄并编辑完毕短视频之后,点击发布按钮触发视频上传。但是,在视频可以被检索和推荐之前,需要有很多工作需要完成。例如,视频转码,视频内容识别,视频打标签,视频内容审核(政治、色情、),视频入库……
使用Workflow抽象该案例,上传视频后,即往视频处理链投入视频Source,然后创建工作流程上下文Context正式开始工作流处理。
定义基本工作单元
WorkUnit1
第一步为视频转码,需要在所有工作之前进行。WorkUnit2
第二步为视频内容识别,这一步为后续提供一些必要基本信息。WorkUnit3
第三步为视频打上标签,例如美女、舞蹈、搞笑、段子……ParallelWorkUnit4A
第四步为视频审核,这一步有很多但是互不相关的审核策略,完全可以并行化处理ParallelWorkUnit4B
并行策略BParallelWorkUnit4C
并行策略C
FinalContextConsumer
最后一步为视频入库,入库后的视频既可以被检索然后交由Feed流推荐系统投喂给其他用户
每日会有成千上万个视频流入该数据处理流,即Workflow是对数据流中的每一个数据进行处理。我们换一个视觉表述Workflow
----o---o---o----->
| | |
----------
---| workflow |----
----------
| | |
----x---x---x----->
很直观的,o为每一条上传的数据源,x为经过数据流处理完毕的结果,从左到右为时间线的发展。 最终细化如下。
----o-----o-----o------->
| | |
----c-----c-----c-------> map to Context
| | |
----c1----c1----c1------> WorkUnit1
| | |
----c2----c2----c2------> WorkUnit2
| | |
----c3----c3----c3------> WorkUnit3
| | |
----ccc---ccc---ccc-----> WorkUnit4A + WorkUnit4B + WorkUnit4C
| | |
----x-----x-----x------->
# 设计
于是,完整的工作流程抽象完毕,可以开始设计代码。
首先定义视频源的类型,例如定义为VideoRequest
。于是数据源流的类型为Publisher<VideoRequest>
,对于[n:m]
的视频源流来讲,会是Flux<VideoRequest>
工作流的结果应该是一个包含源Request
,并且有所有相关的处理完毕信息的单一Context
,于是工作流的返回值是[0:1]
的Publisher<Context>
,具体为Mono<Context>
伪代码如下
Mono<Context> contextMono = ((Flux<VideoRequest>)source)
.map(request -> Context.of("request", request))
.map(videoWorkflow::apply);
# 工作单元定义
每一个工作单元都是根据传入的上下文进行工作,并处理成新的上下文返回,于是它为一个UnaryOperator
,具体类型为UnaryOperator<Context>
,为其定义别名WorkUnit
@FunctionalInterface
public interface WorkUnit extends UnaryOperator<Context> {
}
# 工作流定义
以串行工作流中进行并行工作流的方式设计,于是工作流程类型为Flux<ParallelFlux<WorkUnit>>
于串行工作流中,每一个工作单元都依赖上一个工作单元的结果,最终返回最后一个工作单元处理的上下文。为典型的reduceOperator
于是代码可以如下实现
Flux<ParallelFlux<WorkUnit>> workflow;
Mono<Context> contextMono = ((Flux<VideoRequest>)source)
.map(request -> Context.of("request", request))
.flatmap(context -> (Mono<Context>) workflow
.reduce(
context,
parallelWorkflow::apply
));
而并行工作流中,所有工作单元可以根据输入的Context
并行处理,在最终聚合为一个Context
结果即可。
由于reduce
中的聚合函数为异步函数,所以reduce的State必须为一个Publisher
。不然需要在该异步函数的最后加上block()
语句。在响应式编程中,一般是不允许阻塞的。
于是代码可以做如下细化,其中toProcessor
为将数据流转换为hot stream,在响应式数据流中cold/hot stream的性质不同,由于这里reduce
操作的State是一个Publisher
,如果使用cold stream会使得上游数据流重复被订阅触发(见marked
注释处)。这一步使用cache()
也可,原理和结果应该是一样的(不保证底层)。
Flux<ParallelFlux<WorkUnit>> workflow;
Mono<Context> contextMono = ((Flux<VideoRequest>)source)
.map(request -> Context.of("request", request))
.flatmap(context -> (Mono<Context>) workflow
.reduce(
Mono.just(context),
(Mono<Context> contextMono, ParallelFlux<WorkUnit> parallelFlux) ->
(Mono<Context>) parallelWorkflow
.flatMap(contextMono::map) // marked
.reduce(Context::putAll)
.toProcessor() // .cache() can also
)
.flatMap(Function.identity())
);
reduce的过程如下面运算,在reduce
操作过程中,如果使用cold stream会导致重复计算之前的函数,这在Workflow中是不允许(没有必要,且性能浪费),尽管在纯函数中它是合法且不会出错的。于是,turn it hot,或者cache掉运算结果,能够保证程序往预想的工作流抽象运行。
state0 = F(x)
state1 = H(x) = h1(F(x)) + h2(F(x)) + ... + h?(F(x)) = h1(state0) + h2(state0) + ... + h?(state0)
state2 = I(x) = i1(H(x)) + i2(H(x)) + ... + i?(H(x)) = i1(state1) + i2(state1) + ... + i?(state1)
...
stateN = N(x) = ...
return stateN
到此,工作流程就基本实现完毕了。
# Spring Boot Starter支持
设计预想的配置方式为:业务开发者开发WorkUnit
并注册成Bean,然后通过properties配置具体的一条或多条工作流。
下面为配置两条不同的工作流样例。
blog:
lohoknang:
workflow:
reactive:
enabled: true
definitions:
workflow1:
- workUnit1
- workUnit3A
- workUnit3B
workflow2:
- workUnit2
- workUnit3A
- workUnit3C
# AutoConfiguration
根据配置的设计,开发自动配置类WorkflowDefinitionAutoConfiguration
。其中,需要获取ApplicationContext
以注册Bean实例,需要将配置读取到definitions
。
@Configuration
@ConditionalOnProperty(value = "blog.lohoknang.workflow.reactive.enabled", havingValue = "true")
@ConfigurationProperties("blog.lohoknang.workflow.reactive")
public class WorkflowDefinitionAutoConfiguration implements ApplicationContextAware {
private GenericApplicationContext applicationContext;
@Getter
@Setter
private Map<String, List<List<String>>> definitions;
@PostConstruct
public void postConstruct() {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (GenericApplicationContext) applicationContext;
}
}
在postConstruct
注册工作流。注意创建Workflow Flux的时候,将stream collect掉后再创建Flux。因为如果使用Flux.fromStream()
操作符,将会导致该stream flux第一次使用完毕后即被关闭。这里不一定使用list flux,一般只要符合can be re-used就可。
public void postConstruct() {
Objects.requireNonNull(definitions, "Invalid workflow definition");
Map<String, WorkUnit> workUnitMap = applicationContext.getBeansOfType(WorkUnit.class);
definitions
.entrySet()
.stream()
.map(entry -> new WorkflowDefinition(
entry.getKey(),
Flux
.fromIterable(entry
.getValue()
.stream()
.map(list -> Flux
.fromIterable(list
.stream()
.map(workUnitMap::get)
.collect(Collectors.toList())
)
.parallel())
.collect(Collectors.toList()))
))
.forEach(workflowDefinition -> applicationContext.registerBean(
workflowDefinition.getName(),
WorkflowDefinition.class,
() -> workflowDefinition
));
}
# WorkflowDefinition
是一个包装类,对基本信息包装一下而已。
@Data
@AllArgsConstructor
public class WorkflowDefinition {
private String name;
private Flux<ParallelFlux<WorkUnit>> workflowDefinitionFlux;
}
# WorkUnit
如上文定义
@FunctionalInterface
public interface WorkUnit extends UnaryOperator<Context> {
}
# WorkflowFactory
为具体的完整的Workflow的实现,代码如上文。用于绑定数据源和Workflow。
public class WorkflowFactory {
public static final String REQUEST_KEY = "blog.lohoknang.workflow.reacitve.request";
public static <T> Flux<Context> create(Flux<T> source, WorkflowDefinition workFlowDefinition) {
return source
.map(request -> Context.of(REQUEST_KEY, request))
.flatMap(context -> workFlowDefinition
.getWorkflowDefinitionFlux()
.reduce(
MonoProcessor.just(context),
(contextMono, parallelFlux) -> parallelFlux
.flatMap(contextMono::map)
.reduce(Context::putAll)
.toProcessor()
)
.flatMap(Function.identity())
);
}
}
# 使用案例
在Spring Boot应用中如何使用该框架。
# 配置
先是完成,由开发者实现并注册的工作单元WorkUnit
@Configuration
static class TestConfig {
@Bean
public WorkUnit workUnit1() {
return context -> context.put("workUnit1", "workUnit1");
}
@Bean
public WorkUnit workUnit2() {
return context -> context.put("workUnit2", "workUnit2");
}
@Bean
public WorkUnit workUnit3A() {
return context -> context.put("workUnit3A", "workUnit3A");
}
@Bean
public WorkUnit workUnit3B() {
return context -> context.put("workUnit3B", "workUnit3B");
}
@Bean
public WorkUnit workUnit3C() {
return context -> context.put("workUnit3C", "workUnit3C");
}
}
接着配置Workflow Definition
blog:
lohoknang:
workflow:
reactive:
enabled: true
definitions:
workflow1:
- workUnit1
- workUnit3A
- workUnit3B
workflow2:
- workUnit2
- workUnit3A
- workUnit3C
# 使用
定义服务TestService
测试Workflow
@Service
class TestService {
@Resource
private WorkflowDefinition workflow1;
@Resource
private WorkflowDefinition workflow2;
public void test() {
WorkflowFactory.create(Flux.just("video1", "video2"), workflow1).log().subscribe();
WorkflowFactory.create(Flux.just("video1", "video2"), workflow2).log().subscribe();
}
}
# 测试
调用test
方法,测试结果如下
2020-02-20 19:29:58.435 INFO 8644 --- [ main] reactor.Flux.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2020-02-20 19:29:58.437 INFO 8644 --- [ main] reactor.Flux.FlatMap.1 : request(unbounded)
2020-02-20 19:29:58.487 INFO 8644 --- [ main] reactor.Flux.FlatMap.1 : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video1, workUnit1=workUnit1, workUnit3A=workUnit3A, workUnit3B=workUnit3B})
2020-02-20 19:29:58.489 INFO 8644 --- [ main] reactor.Flux.FlatMap.1 : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video2, workUnit1=workUnit1, workUnit3A=workUnit3A, workUnit3B=workUnit3B})
2020-02-20 19:29:58.489 INFO 8644 --- [ main] reactor.Flux.FlatMap.1 : onComplete()
2020-02-20 19:29:58.489 INFO 8644 --- [ main] reactor.Flux.FlatMap.2 : onSubscribe(FluxFlatMap.FlatMapMain)
2020-02-20 19:29:58.489 INFO 8644 --- [ main] reactor.Flux.FlatMap.2 : request(unbounded)
2020-02-20 19:29:58.490 INFO 8644 --- [ main] reactor.Flux.FlatMap.2 : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video1, workUnit2=workUnit2, workUnit3A=workUnit3A, workUnit3C=workUnit3C})
2020-02-20 19:29:58.491 INFO 8644 --- [ main] reactor.Flux.FlatMap.2 : onNext(ContextN{blog.lohoknang.workflow.reacitve.request=video2, workUnit2=workUnit2, workUnit3A=workUnit3A, workUnit3C=workUnit3C})
2020-02-20 19:29:58.491 INFO 8644 --- [ main] reactor.Flux.FlatMap.2 : onComplete()
# 代码
案例代码仓库位于Github reactive-workflow (opens new window)