当前位置:网站首页>Communication between client and server based on rsocket protocol
Communication between client and server based on rsocket protocol
2022-07-28 15:39:00 【pshdhx_ albert】
RSocket Basic development demo
package com.pshdhx.rsocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.DefaultPayload;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class MessageRSocketHandler implements RSocket {
@Override
public Mono<Void> fireAndForget(Payload payload) { // No response For log
String message = payload.getDataUtf8(); // get data
log.info("[fireAndForget] Receive request data :{}",message);
return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) { // The traditional model There are requests and responses
String message = payload.getDataUtf8(); // get data
log.info("[RequestAndResponse] Receive request data :{}",message);
return Mono.just(DefaultPayload.create("[echo]"+message));
}
@Override
public Flux<Payload> requestStream(Payload payload) { // Processing stream data
String message = payload.getDataUtf8(); // get data
log.info("[RequestStream] Receive request data :{}",message);
return Flux.fromStream(message.chars() // Convert the received string to int Type data flow
.mapToObj(c->Character.toUpperCase(c)) // Encode each character inside in uppercase
.map(Object::toString)// Convert characters to String
.map(DefaultPayload::create)); // establish payload Additional data
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> publisher) { // Two-way flow
return Flux.from(publisher).map(Payload::getDataUtf8).map(msg->{
log.info("【RequestChannel】 Receive request data :{}",msg);
return msg;
}).map(DefaultPayload::create);
}
@Override
public Mono<Void> metadataPush(Payload payload) {
return null;
}
@Override
public Mono<Void> onClose() {
return null;
}
@Override
public void dispose() {
}
}
RSocket Server development
package com.pshdhx.controller;
import com.pshdhx.message.MessageService;
import com.pshdhx.vo.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Controller
// Don't use rest
@Slf4j
public class MessageFluxController {
@Autowired
private MessageService messageService;
@MessageMapping("message.echo")
public Mono<Message> echoMessage(Mono<Message> message){
return message.doOnNext(msg->this.messageService.echo(msg)) // Response processing
.doOnNext(msg->log.info(" Message reception {}",message));
}
@MessageMapping("message.delete")
public void deleteMessage(Mono<String> title){
title.doOnNext(msg->log.info(" Message deletion {}",msg)).subscribe();
}
@MessageMapping("message.list")
public Flux<Message> listMessage(){
return Flux.fromStream(this.messageService.list().stream());
}
@MessageMapping("message.get")
public Flux<Message> getMessage(Flux<String> title){
return title.doOnNext(t->log.info(" Message query {}",t))
.map(titleInfo->titleInfo.toLowerCase())
.map(this.messageService::get)
.delayElements(Duration.ofSeconds(1));
}
}
RSocket Client development
Configure policy and communication
package com.pshdhx.config;
import io.rsocket.RSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
@Configuration
public class RSocketConfig {
/**
* Configure policy , Encoding and decoding
*/
@Bean
public RSocketStrategies getRSocketStrategies(){
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}
/**
* To configure RSocket Connection strategy
*/
@Bean
public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
return Mono.just(
builder.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(
Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MediaType.APPLICATION_CBOR)
.transport(TcpClientTransport.create(6869))
);
}
}
Client impersonation call
import com.pshdhx.AppClient;
import com.pshdhx.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import reactor.core.publisher.Mono;
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = AppClient.class)
public class TestRSocketClient {
@Autowired
private Mono<RSocketRequester> requesterMono; // To make a service call
@Test
public void testEchoMessage(){ // Test service response
this.requesterMono.map(r->r.route("message.echo")
.data(new Message("pshdhx","fighting")))
.flatMap(r->r.retrieveMono(Message.class))
.doOnNext(o-> System.out.println(o)).block();
}
@Test
public void testDeleteMessage(){
this.requesterMono.map(r->r.route("message.delete")
.data("pshdhx"))
.flatMap(RSocketRequester.RetrieveSpec::send).block();
}
@Test
public void testListMessage(){
this.requesterMono.map(r->r.route("message.list"))
.flatMapMany(r->r.retrieveFlux(Message.class))
.doOnNext(o->System.out.println(o)).blockLast();
}
}
RSocket File upload
RSocket The protocol itself is based on binary transmission , Therefore, it also provides processing support for convenient file upload . When uploading files , It is not to upload a file as a whole , Instead, file blocks are used (chunk) Cut the uploaded file in the form of , utilize Flux A set of file blocks to upload the package , After the server receives the file block , It will be saved through a dedicated channel , At the same time, it will also send the uploaded status to the client .

common modular
package com.pshdhx.type;
public enum UploadStatus {
CHUNK_COMPLETED, // File upload processing
COMPLETED, // File upload completed
FAILED;// Failure
}
package com.pshdhx.constants;
public class UploadConstants {
public static final String MINE_FILE_NAME = "message/x.upload.file.name";
public static final String MINE_FILE_EXTENSION = "message/x.upload.file.extension";
public static final String FILE_NAME = "file.name";
public static final String FILE_EXT = "file.ext";
}
Server side :
package com.pshdhx.config;
import com.pshdhx.constants.UploadConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeType;
@Configuration
public class RSocketServerConfig {
@Bean
public RSocketStrategies getRSocketStrategies(){
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.metadataExtractorRegistry(metadataExtractorRegistry -> {
metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(UploadConstants.MINE_FILE_NAME),String.class,UploadConstants.FILE_NAME);
metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(UploadConstants.MINE_FILE_EXTENSION),String.class,UploadConstants.MINE_FILE_EXTENSION);
})
.build();
}
}
@Controller
// Don't use rest
@Slf4j
public class MessageFluxController {
@Autowired
private MessageService messageService;
@Value("${output.file.path.upload}")
private Path outputPath;
@MessageMapping("message.upload")
public Flux<UploadStatus> upload(
@Headers Map<String,Object> metadata,
@Payload Flux<DataBuffer> content
) throws Exception{
log.info("【 Post upload path 】outputPaht={}",this.outputPath);
var fileName = metadata.get(UploadConstants.FILE_NAME);
var fileExt = metadata.get(UploadConstants.MINE_FILE_EXTENSION);
var path = Paths.get(fileName+"."+fileExt);
log.info("【 Upload files 】fileName={}、fileExt = {},path={}",fileName,fileExt,path);
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
this.outputPath.resolve(path),
StandardOpenOption.CREATE, // File creation
StandardOpenOption.WRITE // File is written to
);// Asynchronous file channel
return Flux.concat(DataBufferUtils.write(content,channel)
.map(s->UploadStatus.CHUNK_COMPLETED),Mono.just(UploadStatus.COMPLETED))
.doOnComplete(()->{
try {
channel.close();
}catch(Exception e){
e.printStackTrace();
}
})
.onErrorReturn(UploadStatus.FAILED);
}client :
@Value("classpath:/images/pic.jpg")
private Resource resource;
@Test
public void testUpload(){
String fileName = "pshdhx"+ UUID.randomUUID();
String fileExt = this.resource.getFilename().substring(this.resource.getFilename().lastIndexOf(".")+1);
Flux<DataBuffer> resourceFlux = DataBufferUtils.read(this.resource,new DefaultDataBufferFactory(),1024)
.doOnNext(s-> System.out.println(" Upload files :"+s));
Flux<UploadStatus> uploadStatusFlux = this.requesterMono
.map(r->r.route("message.upload")
.metadata(metadataSpec -> {
System.out.println("【 Upload test :】 File name "+fileName+"."+fileExt);
metadataSpec.metadata(fileName, MimeType.valueOf(UploadConstants.MINE_FILE_NAME));
metadataSpec.metadata(fileExt, MimeType.valueOf(UploadConstants.MINE_FILE_EXTENSION));
}).data(resourceFlux)).flatMapMany(r->r.retrieveFlux(UploadStatus.class))
.doOnNext(o-> System.out.println(" Upload progress :"+o));
uploadStatusFlux.blockLast();
}边栏推荐
猜你喜欢
随机推荐
多线程
如何获取及嵌入Go二进制执行包信息
Baidu proposes a dynamic self distillation method to realize dense paragraph retrieval by combining interactive model and double tower model
Canoe tutorial
Matlab exports high-definition pictures without distortion in word compression and PDF conversion
monkey压力测试
Nftscan and nftplay have reached strategic cooperation in the field of NFT data
flowable工作流所有业务概念
samba服务器如何配置
使用Mock技术帮助提升测试效率的小tips,你知道几个?
字符数组和字符串的区别
[leetcode] binary search given an N-element ordered (ascending) integer array num and a target value target, write a function to search the target in num. if the target value exists, return the subscr
Summarize the knowledge points of the ten JVM modules. If you don't believe it, you still don't understand it
Daily news on July 28, 2022: Science: AI has made another breakthrough in protein design, and can design specific functional proteins
samba服务器搭建指南
详解.NET的求复杂类型集合的差集、交集、并集
Differences between two ways of QT creating folders
Stateflow逻辑系统建模
软件架构与设计(九)-----基于组件的架构
9、相关数据累积任务定义





![[jspwiki]jspwiki installation deployment and configuration](/img/3c/81a201bb80dcbb17d1c97b1a5bb215.png)

![[delete specified number leetcode]](/img/16/b40492d8414a363a3a24f00b4afd47.png)
