# 函数式编程
在函数式编程里面, 函数为第一等公民, 或者 纯函数是唯一的第一等公民(Pure function is the one and only first-class citizen). 函数式是无副作用的, 在函数是编程里面, 只能够是无状态的纯函数, 同时变量也应该是常量化的. 借此可以得到更加可预测和更可靠的编程代码.
# 反应式编程
反应式编程是一种编写异步数据流的编程方式. 利用函数式编程组合数据流的验算, 整个程序将成为事件和行为的组合. 它可以提高代码的抽象级别, 因而可以专注于定义业务逻辑的事件的相互依赖性.
# Project Reactor
PR 是一个实现以上思想的基于JVM的反应式编程库.
(以下机翻官网) 特点
REACTIVE CORE
Reactor是一个具有高效的需求管理的完全无阻塞的基础. 它直接直接与Java Functional API, Completable Future, Stream和Duration进行交互
TYPED [0|1|N] SEQUENCES
Reactor提供2个反应式组合
Publisher
,Flux
[0:N]和Mono
[0:1] , 它们广泛地实现了反应性扩展NON BLOCKING IO
reactor适用于微服务体系结构, 为HTTP(包括WebSockets), TCP和UDP提供背压式网络引擎.
# 对比传统
# Future
CountDownLatch countDownLatch = new CountDownLatch(1);
FutureTask<String> futureTask = new FutureTask<>(() -> {
try {
Thread.sleep(2 * 1000);
log.info("doing something");
return "done";
} finally {
countDownLatch.countDown();
}
});
executor.execute(futureTask);
log.info("task is done? {}", futureTask.isDone());
countDownLatch.await();
log.info("task is done? {}", futureTask.isDone());
log.info("task end. {}", futureTask.get());
# Callback
CountDownLatch countDownLatch = new CountDownLatch(1);
Callback<String, Void> callback = param -> {
log.info("task end. {}", param);
countDownLatch.countDown();
return null;
};
FutureTask<Void> futureTask = new FutureTask<>(() -> {
try {
Thread.sleep(2 * 1000);
log.info("doing something");
callback.call("done");
return null;
} finally {
countDownLatch.countDown();
}
});
executor.execute(futureTask);
log.info("task is done? {}", futureTask.isDone());
countDownLatch.await();
log.info("task is done? {}", futureTask.isDone());
log.info("task end. {}", futureTask.get());
# CompletableFuture
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture completableFuture = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("doing something");
return "done";
}, executor)
.thenAccept(str -> {
log.info("task end. {}", str);
countDownLatch.countDown();
});
log.info("task is done? {}", completableFuture.isDone());
countDownLatch.await();
log.info("task is done? {}", completableFuture.isDone());
# Reactor
CountDownLatch countDownLatch = new CountDownLatch(1);
Disposable disposable = Mono
.fromSupplier(() -> {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("doing something");
return "done";
})
.cast(String.class)
.subscribe(str -> {
log.info("task end. {}", str);
countDownLatch.countDown();
});
log.info("task is done? {}", disposable.isDisposed());
countDownLatch.await();
log.info("task is done? {}", disposable.isDisposed());
Reactor 和 CompletableFuture 在这里很相似, 这也印证了上文特点之一. 你可以很方便的通过大部分方式创建你的Publisher
Mono.just
单纯创建Mono.create
由一个Consumer<MonoSink<T>>
Callback 创建defer
延迟创建Publisher
delay
通过Duration
空延迟empty
空from
从另一个Publisher
fromCallable
从一个Callable
fromFuture
从一个CompletableFuture
fromRunnable
,fromSupplier
,using
- ......
Mono.using(
() -> Mono.just(1),
param -> Mono.just(String.valueOf(param)),
integer -> log.info("cleanup"),
true
);