1. 程式人生 > >【Netty4 簡單專案實踐】十一、用Netty分發mpegts到websocket介面

【Netty4 簡單專案實踐】十一、用Netty分發mpegts到websocket介面

【前言】

推視訊流的時候,rtmp會有3秒的延遲。目前有一種解決方案是用mpegts的格式解決。如果考慮用ffmpeg來推流的話,可以使用http格式和udp格式來推流。現在要做的事情是用Netty來轉發rtmp到websocket介面上,然後用H5來播放。播放的外掛使用jsmpeg這個專案來實現。

【ffmpeg推mpegts】

ffmpeg推流支援http和udp兩種協議,目前還不支援websocket的方式。所以就打算用Netty做協議轉發。假定本地接收流地址是 http://localhost:9090 在Mac上推螢幕上的畫面可以用下面的命令

ffmpeg -f avfoundation -i "1" -vcodec libx264-f mpegts -codec:v mpeg1video -b 0

http://localhost:9090


如果是推UDP的話,假定也是推到localhost,埠9094,可以使用下面的命令

ffmpeg -f avfoundation -i "1" -vcodec libx264-f mpegts -codec:v mpeg1video -b 0 udp://localhost:9094

上面這兩種方式都沒有加入音訊編碼,如果要包含音訊的話,需要指定音訊方式

ffmpeg -f avfoundation -i "1" -vcodec libx264-f mpegts -codec:v mpeg1video -acodec libfaac -b 0 udp://localhost

:9094


【嘗試HTTP推流】

按jsmpeg的教程,他把ffmpeg流推給nginx,讓nginx轉發到websocket上,而且並沒有修改nginx的模組,所以我想如果把ffmpeg推的資料buff直接轉發給websocket應該是可行的。至少http是可行的。那麼下來要做的事情就是把Http的報文去掉頭,只轉發response body就可以了。真的是這樣麼?

我用Netty建了一個bootstrap,包含監聽的EventLoopGroup和傳輸的EventLoopGroup,就是典型的一個服務bootstrap型別

ServerBootstrap,然後配置TCP模式設定一下TCP的nodelay,收發緩衝區大小等引數

            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_SNDBUF, 1024*256)
                .option(ChannelOption.SO_RCVBUF, 1024*256)
                .option(ChannelOption.TCP_NODELAY, true);

之後沒有載入任何的編解碼器,直接把處理Handler加上去。
bootstrap.group(bossLoop, workerLoop)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast("logging", new LoggingHandler(LogLevel.DEBUG));
                    if (sslCtx != null) {
                        p.addLast(sslCtx.newHandler(ch.alloc()));
                    }
                    p.addLast(new MpegTsHandler());
                }
             });

這個處理Handler也超級簡單,就是把訊息分發到ws的連線組裡就完事了。注意Handler用的是ByteBuf,並不需要解析成Http協議。
public class MpegTsHandler extends SimpleChannelInboundHandler<ByteBuf>{   
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
       // 轉發位元組流
        PlayerGroup.broadCast(msg);
    }
}
而其中的PlayerGroup,是一個channelGroup。注意其中在廣播的時候retain了一下。
public class PlayerGroup {
    static private ChannelGroup channelGroup
        = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    
    static public void addChannel(Channel channel) {
        channelGroup.add(channel);
    }

    static public void removeChannel(Channel channel) {
        channelGroup.remove(channel);
    }

    static public void broadCast(ByteBuf message) {
        if (channelGroup == null || channelGroup.isEmpty()) {
            return;
        }
        BinaryWebSocketFrame frame = new BinaryWebSocketFrame(message);
        message.retain();

        channelGroup.writeAndFlush(frame);
    }

    static public void destory() {
        if (channelGroup == null || channelGroup.isEmpty()) {
            return;
        }
        channelGroup.close();
    }
}
這樣做是可以播放的。只是裡面含有很多HTTP頭部位元組。至於把HTTP頭部都去掉能不能行,沒有嘗試。因為無論轉發時是否去掉,在接收的時候同樣是多了很多的開銷,所以後面我轉到UDP方式的推流。

【UDP推流處理】

UDP方式的server要用NioDatagramChannel,也不需要編解碼模組,直接配上處理Handler

        EventLoopGroup bossLoop = null;
        try {
            bossLoop = new NioEventLoopGroup();

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioDatagramChannel.class);
            
            bootstrap
                .group(bossLoop)
                .option(ChannelOption.SO_BROADCAST, true) // 支援廣播
                .option(ChannelOption.SO_SNDBUF, 1024 * 256)
                .option(ChannelOption.SO_RCVBUF, 1024 * 256);

            bootstrap
                .handler(new UdpMpegTsHandler());

            ChannelFuture future = bootstrap.bind(port).sync();

            if (future.isSuccess()) {
                System.out.println("UDP stream server start at port: " + port + ".");
            }
            future.channel().closeFuture().await();
        } catch (Exception e) {
        } finally {
            if (bossLoop != null) {
                bossLoop.shutdownGracefully();
            }
        }
    

Handler也超級簡單,唯一需要注意的是Handler接收的資料型別不再是ByteBuf,而是DatagramPacket
public class UdpMpegTsHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
        PlayerGroup.broadCast(msg.content());
    }
}
同樣的是轉發到group裡面去廣播。唯一需要注意的是要取出DatagramPacket中的ByteBuf


【websocket】

websocket階段和之前寫的ws稍微一點不一樣。這裡是用ws傳輸二進位制,所以ws的資料格式是BinaryWebSocketFrame。Server載入的編解碼器看起來像這樣:

 bootstrap.localAddress(new InetSocketAddress(port))
            .childHandler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast("readTimeout", new ReadTimeoutHandler(45)); // 長時間不寫會斷
                    ch.pipeline().addLast("HttpServerCodec", new HttpServerCodec());
                    ch.pipeline().addLast("ChunkedWriter", new ChunkedWriteHandler());
                    ch.pipeline().addLast("HttpAggregator", new HttpObjectAggregator(65535));
                    ch.pipeline().addLast("WsProtocolHandler",
                        new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));
                    ch.pipeline().addLast("WsBinaryDecoder", new WebSocketFrameDecoder()); // ws解碼成位元組
                    ch.pipeline().addLast("WsEncoder", new WebSocketFramePrepender()); // 位元組編碼成ws
                    ch.pipeline().addLast(new VideoPlayerHandler());
                }
            });

WebSocketFrameDecoder是自定義的解碼器。WebSocketFramePrepender是自定義的編碼器。VideoPlayerHandler是自定義處理Handler。

先看解碼器WebSocketFrameDecoder,這裡我用直接記憶體了。
public class WebSocketFrameDecoder extends MessageToMessageDecoder<WebSocketFrame> {
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf buff = msg.content();
        byte[] messageBytes = new byte[buff.readableBytes()];
        buff.readBytes(messageBytes);
        // TODO: 直接記憶體小心
        ByteBuf bytebuf = PooledByteBufAllocator.DEFAULT.buffer(); // 直接記憶體
        bytebuf.writeBytes(messageBytes);
        out.add(bytebuf.retain());
    }
}
再看編碼器,按之前說的編碼成二進位制流,而不是字元流
public class WebSocketFramePrepender extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        WebSocketFrame webSocketFrame = new BinaryWebSocketFrame(msg);
        out.add(webSocketFrame);
    }
}
最後的Handler也超級簡單,就是把channel加到group裡面
public class VideoPlayerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ServerLogger.log("ws 連線 ctx:" + ctx);
        PlayerGroup.addChannel(ctx.channel());
    }
}
這樣就通過ChannelGroup打通了UDP到ws的通道。

【結束語】

UDP的效果比較好,推HTTP的時候經常花屏,最後附帶說一下jsmpeg的選項,如果播放效果不好,需要調整jsmpeg的選項。

<!DOCTYPE html>
<html>
<head>
    <title>JSMpeg Stream Client</title>
    <style type="text/css">
        html, body {                                                                                                              
            background-color: #111;
            text-align: center;
        }   
    </style>
    
</head>
<body>
    <canvas id="video-canvas"></canvas>
    <script type="text/javascript" src="js/jsmpeg.min.js"></script>
    <script type="text/javascript">
        var canvas = document.getElementById('video-canvas');
        var url = 'ws://'+'127.0.0.1:9092/wsEntry';
        var config = {
            canvas: canvas,
            autoplay: true,
            audio: false,
            video: true,
            protocols: 'haofei',
        };
        var player = new JSMpeg.Player(url, config);
    </script>
</body>
</html>

注意config裡面的protocols,記得和ws的server裡面配置一致
ch.pipeline().addLast("WsProtocolHandler",
                        new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));
這個子協議可以用來做鑑權神馬的,不過記得不能填"" 即空字串,因為在Netty裡不把空字串當成子協議。這裡不知道算不算Netty的bug。