Spring 5:使用 Spring Webflux 開發 Reactive 應用

開源中國OSC-協作翻譯2017-06-01 18:00:16



OSC 協作翻譯

原文:Spring 5 Reactive Web

鏈接:https://dzone.com/articles/spring-5-reactive-web-services

譯者:Tocy, imqipan, chloe900, 無若, tnjin, sniff_hu, 總長


Spring 5 - Spring webflux 是一個新的非堵塞函數式 Reactive Web 框架,可以用來建立異步的,非阻塞,事件驅動的服務,並且擴展性非常好。


把阻塞(不可避免的)風格的代碼遷移到函數式的非阻塞 Reactive 風格代碼,需要把商業邏輯作為異步函數來調用。這可以參考 Java 8 的方法或者 lambda 表達式。由於線程是非阻塞的,處理能力能被最大化使用。


在發佈這篇文章的時候,Spring 5 還處於一個里程碑版本中(5.0.0 M5)。



創建一個 Spring Boost 項目

可以通過 Spring initializer 創建一個Spring Boot項目。將如下的依賴添加到 pom.xml 中

<dependencies>

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter</artifactId>

    </dependency>

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-webflux</artifactId>

    </dependency>

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-test</artifactId>

        <scope>test</scope>

    </dependency>

</dependencies>


Spring-boot-starter-webflux 包中帶了 spring-webflux, netty。其他的依賴需要自行添加。


建立一個簡單的用户數據表和從 list 中獲取到 user 數據的 DTO 類。這僅僅是一個虛擬的數據 bean,但是這可以實時從其它的數據源像 Rdbms,MongoDb,或者 RestClient 加載數據。由於 JDBC 天生不是響應式的,所以任何對數據庫的調用都會阻塞這個線程。MongoDB 有一個響應式的客户端驅動。在測試響應式 Web 服務時的進一步渲染時,REST 風格的調用不會導致任何的阻塞。

public class User {

public User(){}


public User(Long id, String user) {

this.id = id;

this.user = user;

}


private Long id;

private String user;


public Long getId() { return id; }

public void setId(Long id) { this.id = id; }

public String getUser() { return user; }

public void setUser(String user) { this.user = user; }

}


@Repository

public class UserRepository {

private final List<User> users = Arrays.asList(new User(1L, "User1"), new User(2L, "User2"));


public Mono<User> getUserById(String id) {

return Mono.justOrEmpty(users.stream().filter(user -> {

return user.getId().equals(Long.valueOf(id));

}).findFirst().orElse(null));

}


public Flux<User> getUsers() {

return Flux.fromIterable(users);

}

}


Mono 和 Flux 是由目標反應器提供的響應類型。Springs 還提供其他的響應流的實現,例如 RXJava。


Mono 和 Flux 是 Reactive streams 的發佈者實現。Mono 是 0 或者任意單個值的發佈,Flux 是 0 到任意值的發佈。他們和 RXJava 中的 Flowable 和 Observable 類似。他們代替流向這些訂閲者發佈信息。


GetUserById() 返回一個 Mono<User> 對象,這個對象不論何時都會返回 0~1 個用户對象,GetUsers() 返回一連串變動的用户對象,不論何時都包含 0~n 個用户對象。


相比命令式編程風格,我們並不返回可用前阻塞線程的 User/List<User> 對象,而只是返回一個流的引用,流可以在後面訪問 User/List<User>。


創建帶有處理 HTTP 請求函數的 Handler 類


@Service

public class UserHandler {

@Autowired

private UserRepository userRepository;


public Mono<ServerResponse> handleGetUsers(ServerRequest request) {

return ServerResponse.ok().body(userRepository.getUsers(), User.class);

}


public Mono<ServerResponse> handleGetUserById(ServerRequest request) {

return userRepository.getUserById(request.pathVariable("id"))

.flatMap(user -> ServerResponse.ok().body(Mono.just(user), User.class))

.switchIfEmpty(ServerResponse.notFound().build());

}

}

handler 類就像 Spring Web 中的 Service beans 一樣,我們需要編寫該服務的大部分業務功能。ServerResponse 就像 Spring Web 中的 ResponseEntity 類一樣,我們可以在 ServerResponse 對象中打包 Response 的數據、狀態碼、頭信息等。 ServerResponse 有很多有用的默認方法,如 notFound(), ok(), accepted(), created()等,可用於創建不同類型的反饋。


UserHandler 有不同的方法,都返回 Mono<ServerResponse>; UserRepository.getUsers() 返回Flux<User>; 和 ServerResponse.ok().body(UserRepository.getUsers(), User.class) 可將此 Flux <User> 轉換為 Mono<ServerResponse>,這表明只要可用時均可發起 ServerResponse 的流。UserRepository.getUserById()返回一個Mono<User>,ServerResponse.ok().body(Mono.just(user), User.class) 將此 Mono<User> 轉換為Mono<ServerResponse>,這説明隨時都可以發起 ServerResponse 的流。


在給定的路徑變量(pathVariable)中沒有找到用户時,ServerResponse.notFound().build() 返回一個 Mono<ServerResponse>,表名是一個返回 404 服務響應的流。


在命令式編程風格中,數據接收前線程會一直阻塞,這樣使得其線程在數據到來前無法運行。而響應式編程中,我們定義一個獲取數據的流,然後定義一個在數據到來後的回調函數操作。這樣不會使線程堵塞,在數據被返回時,可用線程就用於執行。


創建一個定義應用程序路由的路由類


import static org.springframework.web.reactive.function.server.RequestPredicates.GET;

import static org.springframework.web.reactive.function.server.RequestPredicates.accept;

import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration

public class Routes {

private UserHandler userHandler;


    public Routes(UserHandler userHandler) {

this.userHandler = userHandler;

}


@Bean

public RouterFunction<?> routerFunction() {

return route(GET("/api/user").and(accept(MediaType.APPLICATION_JSON)), userHandler::handleGetUsers)

.and(route(GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::handleGetUserById));

}

}

RouterFunction就像Spring Web中的@RequestMapping類一樣。 RouterFunction用於定義Spring5應用程序的路由。 


RouterFunctions幫助器類有一個有用的方法,類似路由,可用於定義路由並構建RouterFunction對象。 RequestPredicates有許多有用的方法,如GET,POST,path,queryParam,accept,headers,contentType等,來定義路由並構建RouterFunction。 每個路由映射到一個處理程序方法,當接收到適當的HttpRequest時,該方法必須被調用。


Spring5還支持定義應用程序處理程序映射的@RequestMapping類型的控制器。 我們可以編寫如下所示的控制器方法,以在@RequestMapping樣式中創建類似的API。

@GetMapping("/user") public Mono<ServerResponse> handleGetUsers() {}


控制器方法返回Mono<ServerResponse>。

RouterFunction為應用程序提供了DSL類型的路由功能。 到目前為止,Springs不支持混合這兩種類型。


創建HttpServerConfig類,用於創建HttpServer類


import org.springframework.http.server.reactive.HttpHandler;

import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;

import reactor.ipc.netty.http.server.HttpServer;

@Configuration

public class HttpServerConfig {

@Autowired

private Environment environment;


@Bean

public HttpServer httpServer(RouterFunction<?> routerFunction) {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);

ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);

HttpServer server = HttpServer.create("localhost", Integer.valueOf(environment.getProperty("server.port")));

server.newHandler(adapter);

return server;

}

}

這將使用應用程序屬性中定義的端口創建一個 netty HttpServer。Spring 支持的其他服務器也跟 Tomcat 和 undertow 一樣。由於 netty 是異步的,而且天生基於事件驅動,因此更適合響應式的應用程序。Tomcat 使用 Java NIO 來實現 servlet 規範。Netty 是 NIO 的一個實現,它針對異步、事件驅動的非阻塞 IO 應用程序進行了優化。


Tomcat 服務器也可以按照如下代碼所示的用法使用:

Tomcat tomcatServer = new Tomcat();

tomcatServer.setHostname("localhost");

tomcatServer.setPort(Integer.valueOf(environment.getProperty("server.port")));

Context rootContext = tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));

ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler);

Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);

rootContext.addServletMapping("/", "httpHandlerServlet");

tomcatServer.start();


創建用於啟動應用的Spring啟動主類

@SpringBootApplication

public class Spring5ReactiveApplication {

public static void main(String[] args) throws IOException {

SpringApplication.run(Spring5ReactiveApplication.class, args);

}

}


測試應用

你可以使用任意諸如Postman、CURL等的HTTP測試工具測試該應用。


Spring測試也支持為響應式服務編寫集成測試的功能。

@RunWith(SpringRunner.class)

@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)

public class UserTest {

@Autowired

private WebTestClient webTestClient;


@Test

public void test() throws IOException {

FluxExchangeResult<User> result = webTestClient.get().uri("/api/user").accept(MediaType.APPLICATION_JSON)

.exchange().returnResult(User.class);

assert result.getStatus().value() == 200;

List<User> users = result.getResponseBody().collectList().block();

assert users.size() == 2;

assert users.iterator().next().getUser().equals("User1");

}


@Test

public void test1() throws IOException {

User user = webTestClient.get().uri("/api/user/1")

.accept(MediaType.APPLICATION_JSON).exchange().returnResult(User.class).getResponseBody().blockFirst();

assert user.getId() == 1;

assert user.getUser().equals("User1");

}


@Test

public void test2() throws IOException {

webTestClient.get().uri("/api/user/10").accept(MediaType.APPLICATION_JSON).exchange().expectStatus()

.isNotFound();

}

}


WebTestClient 和 TestRestTemplate 類似, 他們都有調用 Spring 啟動應用的 rest 方法,並能夠驗證響應結果。在 test 的配置中,Spring 測試創建了一個 TestRestTemplate 的 bean。這裏面有一個 WebClient,就跟 Spring Web 中的 RestTemplate 類似。這可用於處理響應式和非阻塞的 rest 調用。


WebClient.create("http://localhost:9000").get().uri("/api/user/1")

        .accept(MediaType.APPLICATION_JSON).exchange().flatMap(resp -> resp.bodyToMono(User.class)).block();


exchange()返回Mono<ClientResponse>,它在Emits clientResponse可用時表示一個流。


    block()阻塞線程執行,直到Mono返回User/List<User>,因為這是我們需要數據來驗證響應的測試用例。

Spring Web 因其易於開發/調試而是必要的。使用Spring5響應式或Spring Web命令式服務的決定必須根據用例明智地做出。在許多情況下,只有命令式的可能會很好,但是在高可擴展性是關鍵因素時,響應式非阻塞將更適合。





推薦閲讀

盤點那些評分最高的項目管理工具,不服來戰!

Redis 單例、主從模式、sentinel 以及集羣的配置方式及優缺點對比

Spring 思維導圖,讓 Spring 不再難懂(ioc 篇)

一名 40 歲“老”程序員的反思

“放碼過來”邀您亮“項”,一不小心就火了!

點擊“閲讀原文”查看更多精彩內容


閲讀原文

TAGS:響應式應用程序線程阻塞