1. 程式人生 > >springboot 使用webflux響應式開發教程(二)

springboot 使用webflux響應式開發教程(二)

public src service stand jet ech mediatype event frame

本篇是對springboot 使用webflux響應式開發教程(一)的進一步學習。
分三個部分:

數據庫操作
webservice
websocket
創建項目,artifactId = trading-service,groupId=io.spring.workshop。選擇Reactive Web , Devtools, Thymeleaf , Reactive Mongo。
WEB容器
spring-boot-starter-webflux 附帶了 spring-boot-starter-reactor-netty,所以默認使用Reactor Netty作為web server。
如果要用Tomcat,添加pom即可

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
同樣支持Undertow和Jetty

響應式數據庫操作

這個示例使用MongoDB。作為reactive模式,數據庫的驅動與傳統模式區分開。截至目前還沒有mysql的reactive驅動,據悉正在研發。本例中使用內存版的mongodb,需要添加依賴

<dependency>
    <groupId>de.flapdoodle.embed</groupId>
    <artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
在初次運行時會自動下載mongodb模塊,但是墻國是直連不到mongodb的官網,所以在需要添加代理。在這推薦使用JVM參數的方式,-DproxySet=true -Dhttps.proxyHost=127.0.0.1 -Dhttps.proxyPort=1080。需要註意的是http和https協議是區分開來配置的,如果需要http的代理就需要把Dhttps改為Dhttp。 
數據庫的存儲實體 TradingUser
@Document
@Data
public class TradingUser {

    @Id
    private String id;

    private String userName;

    private String fullName;

    public TradingUser() {
    }

    public TradingUser(String id, String userName, String fullName) {
        this.id = id;
        this.userName = userName;
        this.fullName = fullName;
    }

    public TradingUser(String userName, String fullName) {
        this.userName = userName;
        this.fullName = fullName;
    }



    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        TradingUser that = (TradingUser) o;

        if (!id.equals(that.id)) return false;
        return userName.equals(that.userName);
    }

    @Override
    public int hashCode() {
        int result = id.hashCode();
        result = 31 * result + userName.hashCode();
        return result;
    }
}

創建TradingUserRepository繼承ReactiveMongoRepository。添加findByUserName方法返回一個實體。
在項目啟動的時候我們要初始化一些數據,為此創建UsersCommandLineRunner並繼承CommandLineRunner並重寫run方法,在該方法裏初始化數據,並插入到數據庫中。

@Component
public class UsersCommandLineRunner implements CommandLineRunner {

    private final TradingUserRepository repository;

    public UsersCommandLineRunner(TradingUserRepository repository) {
        this.repository = repository;
    }

    @Override
    public void run(String... strings) throws Exception {
        List<TradingUser> users = Arrays.asList(
                new TradingUser("sdeleuze", "Sebastien Deleuze"),
                new TradingUser("snicoll", "Stephane Nicoll"),
                new TradingUser("rstoyanchev", "Rossen Stoyanchev"),
                new TradingUser("poutsma", "Arjen Poutsma"),
                new TradingUser("smaldini", "Stephane Maldini"),
                new TradingUser("simonbasle", "Simon Basle"),
                new TradingUser("violetagg", "Violeta Georgieva"),
                new TradingUser("bclozel", "Brian Clozel")
        );
        this.repository.insert(users).blockLast(Duration.ofSeconds(3));
    }
}
由於該方法是void類型,實現是阻塞的,因此在 repository 插入數據返回Flux的時候需要調用 blockLast(Duration) 
。也可以使用 then().block(Duration) 將 Flux 轉化為 Mono<Void> 等待執行結束。

創建 webservice, @RestController標註 的 UserController,添加兩個控制器方法
1、get請求,”/users”,返回所有TradingUser,content-type = “application/json”
2、get請求,”/users/{username}”,返回單個TradingUser,content-type = “application/json”

@RestController
public class UserController {

    private final TradingUserRepository tradingUserRepository;

    public UserController(TradingUserRepository tradingUserRepository) {
        this.tradingUserRepository = tradingUserRepository;
    }
    @GetMapping(path = "/users", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<TradingUser> listUsers() {
        return this.tradingUserRepository.findAll();
    }

    @GetMapping(path = "/users/{username}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TradingUser> showUsers(@PathVariable String username) {
        return this.tradingUserRepository.findByUserName(username);
    }
}

編寫測試

@RunWith(SpringRunner.class)
@WebFluxTest(UserController.class)
public class UserControllerTests {

  @Autowired
  private WebTestClient webTestClient;

  @MockBean
  private TradingUserRepository repository;

  @Test
  public void listUsers() {
    TradingUser juergen = new TradingUser("1", "jhoeller", "Juergen Hoeller");
    TradingUser andy = new TradingUser("2", "wilkinsona", "Andy Wilkinson");

    BDDMockito.given(this.repository.findAll())
        .willReturn(Flux.just(juergen, andy));

    this.webTestClient.get().uri("/users").accept(MediaType.APPLICATION_JSON)
        .exchange()
        .expectBodyList(TradingUser.class)
        .hasSize(2)
        .contains(juergen, andy);

  }

  @Test
  public void showUser() {
    TradingUser juergen = new TradingUser("1", "jhoeller", "Juergen Hoeller");

    BDDMockito.given(this.repository.findByUserName("jhoeller"))
        .willReturn(Mono.just(juergen));

    this.webTestClient.get().uri("/users/jhoeller").accept(MediaType.APPLICATION_JSON)
        .exchange()
        .expectBody(TradingUser.class)
        .isEqualTo(juergen);
  }

}

用Thymeleaf渲染頁面
pom添加前端依賴

<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>bootstrap</artifactId>
    <version>3.3.7</version>
</dependency>
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>highcharts</artifactId>
    <version>5.0.8</version>
</dependency>

創建HomeController

@Controller
public class HomeController {

    private final TradingUserRepository tradingUserRepository;

    public HomeController(TradingUserRepository tradingUserRepository) {
        this.tradingUserRepository = tradingUserRepository;
    }

    @GetMapping("/")
    public String home(Model model) {
        model.addAttribute("users", this.tradingUserRepository.findAll());
        return "index";
    }
}

創建首頁

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li class="active"><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
                <li><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>Trading users</h2>
    <table class="table table-striped">
        <thead>
        <tr>
            <th>#</th>
            <th>User name</th>
            <th>Full name</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="user: ${users}">
            <th scope="row" th:text="${user.id}">42</th>
            <td th:text="${user.userName}">janedoe</td>
            <td th:text="${user.fullName}">Jane Doe</td>
        </tr>
        </tbody>
    </table>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
</body>
</html>
Spring WebFlux在渲染視圖之前自動解析Publisher實例,因此不需包含阻塞代碼

使用WebClient 將 stream JSON 輸送到瀏覽器

現在要用到springboot 使用webflux響應式開發教程(一)的示例,遠程調用該服務。然後創建視圖

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
    <link rel="stylesheet" href="/webjars/highcharts/5.0.8/css/highcharts.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li class="active"><a href="/quotes">Quotes</a></li>
                <li><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <div id="chart" style="height: 400px; min-width: 310px"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/highcharts/5.0.8/highcharts.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">

    // Setting up the chart
    var chart = new Highcharts.chart(chart, {
        title: {
            text: My Stock Portfolio
        },
        yAxis: {
            title: {
                text: Stock Price
            }
        },
        legend: {
            layout: vertical,
            align: right,
            verticalAlign: middle
        },
        xAxis: {
            type: datetime,
        },
        series: [{
            name: CTXS,
            data: []
        }, {
            name: MSFT,
            data: []
        }, {
            name: ORCL,
            data: []
        }, {
            name: RHT,
            data: []
        }, {
            name: VMW,
            data: []
        }, {
            name: DELL,
            data: []
        }]
    });

    // This function adds the given data point to the chart
    var appendStockData = function (quote) {
        chart.series
            .filter(function (serie) {
                return serie.name == quote.ticker
            })
            .forEach(function (serie) {
                var shift = serie.data.length > 40;
                serie.addPoint([new Date(quote.instant), quote.price], true, shift);
            });
    };

    // The browser connects to the server and receives quotes using ServerSentEvents
    // those quotes are appended to the chart as they‘re received
    var stockEventSource = new EventSource("/quotes/feed");
    stockEventSource.onmessage = function (e) {
        appendStockData(JSON.parse(e.data));
    };
</script>
</body>
</html>
頁面會通過Server Sent Event(SSE) 向服務器請求Quotes。

創建控制器QuotesController並添加兩個方法如下

@Controller
public class QuotesController {

    @GetMapping("/quotes")
    public String quotes() {
        return "quotes";
    }

    @GetMapping(path = "/quotes/feed", produces = TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Quote> quotesStream() {
        return WebClient.create("http://localhost:8081")
                .get()
                .uri("/quotes")
                .accept(APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToFlux(Quote.class)
                .share()
                .log("io.spring.workshop.tradingservice");
    }
}
quotesStream方法返回的content-type為”text/event-stream”,並將Flux<Quote>作為響應主體,數據已由stock-quotes提供,在這使用WebClient來請求並檢索數據。 
同時應該避免為每個瀏覽器的請求都去向數據服務提供方發送請求,可以使用Flux.share()

接下來進入頁面查看效果

創建WebSocket Handler
WebFlux 支持函數響應式WebSocket 客戶端和服務端。
服務端主要分兩部分:WebSocketHandlerAdapter 負責處理請求,然後委托給WebSocketService和WebSocketHandler返回響應完成會話。
spring mvc 的 reactive websocket 官方文檔參考 這裏.

先創建EchoWebSocketHandler 實現 WebSocketHandler接口

public class EchoWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(session.receive()
                .doOnNext(WebSocketMessage::retain)
                .delayElements(Duration.ofSeconds(1)).log());
    }
}

實現handle方法,接收傳入的消息然後在延遲一秒後輸出。
為了將請求映射到Handler,需要創建WebSocketRouter

@Configuration
public class WebSocketRouter {

    @Bean
    public HandlerMapping handlerMapping() {

        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket/echo", new EchoWebSocketHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(10);
        mapping.setUrlMap(map);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

然後創建WebSocketController

@Controller
public class WebSocketController {

    @GetMapping("/websocket")
    public String websocket() {
        return "websocket";
    }
}

返回視圖,在頁面上查看效果

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
                <li class="active"><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>Websocket Echo</h2>
    <form class="form-inline">
        <div class="form-group">
            <input class="form-control" type="text" id="input" value="type something">
            <input class="btn btn-default" type="submit" id="button" value="Send"/>
        </div>
    </form>
    <div id="output"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">
    $(document).ready(function () {
        if (!("WebSocket" in window)) WebSocket = MozWebSocket;
        var socket = new WebSocket("ws://localhost:8080/websocket/echo");

        socket.onopen = function (event) {
            var newMessage = document.createElement(p);
            newMessage.textContent = "-- CONNECTED";
            document.getElementById(output).appendChild(newMessage);

            socket.onmessage = function (e) {
                var newMessage = document.createElement(p);
                newMessage.textContent = "<< SERVER: " + e.data;
                document.getElementById(output).appendChild(newMessage);
            }

            $("#button").click(function (e) {
                e.preventDefault();
                var message = $("#input").val();
                socket.send(message);
                var newMessage = document.createElement(p);
                newMessage.textContent = ">> CLIENT: " + message;
                document.getElementById(output).appendChild(newMessage);
            });
        }
    });
</script>
</body>
</html>

也可以使用WebSocketClient寫測試

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class EchoWebSocketHandlerTests {

    @LocalServerPort
    private String port;

    @Test
    public void echo() throws Exception {
        int count = 4;
        Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
        ReplayProcessor<Object> output = ReplayProcessor.create(count);

        WebSocketClient client = new StandardWebSocketClient();
        client.execute(getUrl("/websocket/echo"),
                session -> session
                        .send(input.map(session::textMessage))
                        .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
                        .subscribeWith(output)
                        .then())
                .block(Duration.ofMillis(5000));

        assertEquals(input.collectList().block(Duration.ofMillis(5000)), output.collectList().block(Duration.ofMillis(5000)));
    }

    protected URI getUrl(String path) throws URISyntaxException {
        return new URI("ws://localhost:" + this.port + path);
    }
}

github源碼地址

springboot 使用webflux響應式開發教程(二)