一个基于reactor-netty
和jProtobuf
开发的, 遵循 Protobuf RPC协议 实现的 简单响应式RPC框架
reactive-rpc4j-spring-boot-starter (opens new window)
# 引入
<dependency>
<groupId>com.github.759434091</groupId>
<artifactId>reactive-rpc4j-spring-boot-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
# 定义
# request
@Data
@Builder
@ProtobufClass
@NoArgsConstructor
@AllArgsConstructor
public class TestRequest {
private int id;
}
# response
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ProtobufClass
public class TestResponse {
private int id;
private String value;
}
# service
@ProtobufService("testService")
public interface TestService {
@ProtobufMethod
Mono<TestResponse> testMethod(Mono<TestRequest> testRequestMono);
}
# 服务端
这里并没有选择暴露和发现分开注解操作, 因为注册接口的时候就已经注定这是暴露还是发现.
# 实现
public class TestServiceImpl implements TestService {
@Override
public Mono<TestResponse> testMethod(Mono<TestRequest> testRequestMono) {
return testRequestMono
.map(testRequest -> TestResponse.builder()
.id(testRequest.getId())
.value("hello")
.build())
.log();
}
}
# 注册
或者@Component
、@Service
@Configuration
public class ServerConfiguration {
@Bean
TestServiceImpl testService() {
return new TestServiceImpl();
}
}
# 配置
application.properties
reactive-rpc4j.port=8080
# 客户端
怎么简单怎么来..
# 注册
TestService testService = RpcServiceClientRegister.register(
TestService.class,
"127.0.0.1",
8000
);
# 测试
服务端启动服务.
# Client Test
纯异步操作, 使用CountDownLatch
避免提前退出.
CountDownLatch countDownLatch = new CountDownLatch(1);
TestRequest testRequest = TestRequest.builder()
.id(1)
.build();
testService.testMethod(Mono.just(testRequest))
.subscribe(testResponse -> {
log.info(JSON.toJSONString(testResponse));
countDownLatch.countDown();
});
countDownLatch.await();
# 日志
Server
2019-04-18 15:59:15.746 INFO 39478 --- [ main] c.g.r.server.ServerTestApplication : Starting ServerTestApplication on BDSHYF000105534 with PID 39478 (/Users/luxueneng/IdeaProjects/reactive-rpc4j-spring-boot-starter/target/test-classes started by luxueneng in /Users/luxueneng/IdeaProjects/reactive-rpc4j-spring-boot-starter) 2019-04-18 15:59:15.750 INFO 39478 --- [ main] c.g.r.server.ServerTestApplication : No active profile set, falling back to default profiles: default 2019-04-18 15:59:19.394 INFO 39478 --- [ main] c.g.r.server.RpcServer : start server at port=8000 2019-04-18 15:59:36.981 INFO 39478 --- [ loop-nio-2] reactor.Mono.Map.1 : onSubscribe(FluxMap.MapSubscriber) 2019-04-18 15:59:36.982 INFO 39478 --- [ loop-nio-2] reactor.Mono.Map.1 : request(32) 2019-04-18 15:59:36.984 INFO 39478 --- [ loop-nio-2] reactor.Mono.Map.1 : onNext(TestResponse(id=1, value=hello)) 2019-04-18 15:59:37.002 INFO 39478 --- [ loop-nio-2] reactor.Mono.Map.1 : onComplete()
Client
15:59:36.886 [main] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6dbcf214 15:59:36.902 [reactor-tcp-nio-4] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x08d21b4c, L:/127.0.0.1:51998 - R:/127.0.0.1:8000] Writing object PooledUnsafeHeapByteBuf(ridx: 0, widx: 47, cap: 47) 15:59:37.027 [reactor-tcp-nio-4] DEBUG com.github.reactiverpc4jspringbootstarter.handler.DataPackageDecoder - [profiling] decode cost=3ms 15:59:37.229 [reactor-tcp-nio-4] INFO com.github.reactiverpc4jspringbootstarter.client.ClientApplication - {"id":1,"value":"hello"}