Skip to content

chenggangpro/rsocket-micro-connect

Repository files navigation

rsocket-micro-connect Maven Central

This project enables services to maintain compatibility with HTTP-based RPC communication (using JSON) while incorporating RSocket to take advantage of its advanced features and capabilities. It serves as a compelling alternative to OpenFeign or WebClient, which are commonly employed for HTTP-based remote procedure calls (RPC) within Spring Boot or Spring Cloud frameworks.

The calls diagram between services is: call-diagram

Instructions

  • Autoconfiguration for both server RSocket and client RSocket.

  • Using RSocket connector similarly to HTTP client with annotations like @RequestParam, @RequestBody, etc.

  • Automatic logging for both server RSocket and client RSocket.

  • This repository MUST be used alongside spring-boot-starter-webflux

  • Client Side Dependency

<dependency>
    <groupId>pro.chenggang</groupId>
    <artifactId>rsocket-micro-connect-client-starter</artifactId>
    <version>${LATEST_VERSION}</version>
</dependency>
  • Server Side Dependency
<dependency>
    <groupId>pro.chenggang</groupId>
    <artifactId>rsocket-micro-connect-server-starter</artifactId>
    <version>${LATEST_VERSION}</version>
</dependency>

Normal Usage

Server Side
  • Configure the Port for an RSocket Server
    • By default, Spring uses TCP as the transport protocol for RSocket.
    • You can also configure any properties supported by spring-boot-starter-rsocket.
spring:
  rsocket:
    server:
      port: 23408
  • Configure logging exclusion for RSocket Server
rsocket-micro-connect:
  server:
    logging:
      exclude-headers:
        - some-header # if you don't want to log the specific header in the RSocket server logging content
      exclude-routes:
        - /server/data/** # if you don't want to log the specific RSocket route in the RSocket server logging content
  • Define an RSocket class or use an existing REST controller class, then add the annotation @RSocketMicroEndpoint to it.

  • Use @MessageMapping (org.springframework.messaging.handler.annotation.MessageMapping) on the method you want to expose as a RSocket endpoint.

    • You can define a RSocket path using @MessageMapping#value, similar to an HTTP path.
      @MessageMapping("/server/rsocket/path")
      public Mono<String> getData() {
          return Mono.just(extraHeader);
      }
    • If there is a @RequestMapping or any REST mapping like @PostMapping on the endpoint method, you can also expose a RSocket endpoint using the path of @RequestMapping.
      @MessageMapping
      @RequestMapping("/server/rsocket/path")
      // OR @PostMapping("/server/rsocket/path")
      public Mono<String> getData() {
          return Mono.just(extraHeader);
      }
    • If paths are defined in both @MessageMapping and @RequestMapping, the value of @MessageMapping will be used.
      @MessageMapping("/server/rsocket/path/will-be-used") // this path will be used as RSocket's endpoint.
      @RequestMapping("/server/rsocket/path/will-not-be-used")
      public Mono<String> getData() {
          return Mono.just(extraHeader);
      }
  • Then you can start the server application.

  • If you want to disable server side autoconfiguration, you can use following properties for configuration

rsocket-micro-connect:
  server:
    enabled: false
Client Side
  • Configure logging exclusion for RSocket Client
rsocket-micro-connect:
  client:
    logging:
      exclude-headers:
        - some-header # if you don't want to log the specific header in the RSocket server logging content
      exclude-routes:
        - /server/data/** # if you don't want to log the specific RSocket route in the RSocket server logging content
  • Define an interface with the annotation @RSocketMicroConnector.
  • Configure the address of the RSocket server in @RSocketMicroConnector#value, which should match the transport protocol of the RSocket server:
    • ws://server-host:server-port for WebSocket transport
    • wss://server-host:server-port for secure WebSocket transport
    • tcp://server-host:server-port for TCP transport
  • Use @MessageMapping or @RequestMapping to define the RSocket request path.
  • Supported annotations:
    • @PathVariable
    • @RequestParam
    • @RequestBody
    • @RequestHeader
    • @RequestPartName: used in file-uploading scenarios to propagate the uploading file's name to the server.
  • Full example of a connector:
@RSocketMicroConnector("tcp://127.0.0.1:23408")
public interface SimpleRSocketMicroConnector {

    @MessageMapping("/server/data/path-variable/{variable}")
    Mono<String> getSingleData(@PathVariable("variable") String variable);

    @RequestMapping("/server/data/query-variable")
    Mono<String> getMultiData(@RequestParam("var1") String variable1, @RequestParam("var2") String variable2);

    @MessageMapping("/server/data/body")
    Mono<String> sendBodyData(@RequestBody BodyValue bodyValue);

    @MessageMapping("/server/data/stream")
    Flux<String> getStreamData(@RequestHeader HttpHeaders headers);

    @MessageMapping("/server/data/stream/auto-header")
    Flux<String> getStreamDataAutoHeader();

    @MessageMapping("/server/data/extra-header")
    Mono<String> getExtraHeader(@RequestHeader(name = "x-extra-header") String extraHeader);

    @RequestMapping("/server/data/file-upload")
    Mono<String> uploadFile(@RequestPartName String name, Flux<DataBuffer> dataBufferFlux);
}
  • Then you can use the bean of the defined interface in your Spring application.

  • If you want to disable client side autoconfiguration, you can use following properties for configuration

rsocket-micro-connect:
  client:
    enabled: false

While Using RSocket with Discovery like Eureka or Nacos

Server Side

The server side configuration is the same as the configuration of simple-server-side.

Client Side
  • Configure load-balance support in your YAML configuration:

    • Enable RSocket load balancing.
      rsocket-micro-connect:
        client:
          enable-discover: true
    • Configure the refresh interval for the load balancer, with a default refresh interval of 10S.
      rsocket-micro-connect:
        client:
          refresh-discover-interval: PT10S
  • The connector configuration is the same as the configuration of simple-client-side, but the host of the RSocket server is different:

    • The host of the RSocket Server should be the service name registered in discovery:
      • tcp://some-service-a
      • ws://some-service-b
      • wss://some-service-c
  • The RSocket server port ought to be configured within the instance's metadata during the discovery registration process. In Nacos Discovery, the metadata can be configured through the web editor.

    metadata:
      "rsocket-micro-connect.port": 23408 # the port of RSocket instance
      "rsocket-micro-connect.enable": true # if this instance can be used in RSocket's load-balancer

    If either "rsocket-micro-connect.enable": true or "rsocket-micro-connect.port" is not configured within the instance's metadata, the client will be unable to utilize the server instance for RSocket load balancing.

  • Then you can use the bean of the defined interface in your Spring application as usual.

File Uploading Example

  • Server Side:
@RSocketMicroEndpoint
public class ServerController {

    @MessageMapping("/server/data/file-upload")
    public Flux<String> getData(@RequestPartName String partName, Flux<DataBuffer> dataBufferFlux) {
        String userDir = System.getProperty("user.dir");
        String savedFilePath = userDir + File.separator + partName;
        return DataBufferUtils.write(dataBufferFlux, Paths.get(savedFilePath), StandardOpenOption.CREATE_NEW)
                .thenMany(Mono.defer(() -> {
                    return Mono.fromCallable(() -> {
                                File savedFile = new File(savedFilePath);
                                long length = savedFile.length();
                                savedFile.delete();
                                return length;
                            })
                            .map(length -> partName + ":" + length);
                }));
    }
}
  • Client Side:
@RSocketMicroConnector("tcp://127.0.0.1:23408")
public interface SimpleRSocketMicroConnector {

    @RequestMapping("/server/data/file-upload")
    Mono<String> uploadFile(@RequestPartName String name, Flux<DataBuffer> dataBufferFlux);
}

Customizations

  • Customize client connector's execution:

Using RSocketMicroConnectorExecutionCustomizer as a bean.

This is useful for customizing connector execution globally, such as propagating HTTP headers from the HTTP context to the RSocket context.

@Configuration(proxyBeanMethods = false)
public class RSocketMicroConnectorConfiguration {

    @Bean
    public RSocketMicroConnectorExecutionCustomizer autoPropagateHeadersCustomizer() {
        // This will automatically propagate ServerWebExchange from WebFlux's Context to RSocket Connector.
        return connectorExecution -> {
            return Mono.deferContextual(contextView -> {
                        return Mono.justOrEmpty(contextView.getOrEmpty(ServerWebExchangeContextFilter.EXCHANGE_CONTEXT_ATTRIBUTE))
                                .ofType(ServerWebExchange.class)
                                .switchIfEmpty(Mono.error(new IllegalStateException("No ServerWebExchange found in context")));
                    })
                    .flatMap(serverWebExchange -> {
                        return Mono.fromRunnable(() -> {
                            HttpHeaders headers = serverWebExchange.getRequest().getHeaders();
                            connectorExecution.addHeaders(headers);
                        });
                    });
        };
    }
}

Advanced Usages

  • There are two interceptors in both rsocket-micro-connect-client-starter and rsocket-micro-connect-server-starter.
  • The interceptors are:

About

This project allows services to maintain compatibility with HTTP-based RPC calling (with JSON) mechanisms while integrating RSocket to take advantage of its advanced features and capabilities.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages