【Netty4 簡單專案實踐】十一、用Netty分發mpegts到websocket介面
【前言】
推視訊流的時候,rtmp會有3秒的延遲。目前有一種解決方案是用mpegts的格式解決。如果考慮用ffmpeg來推流的話,可以使用http格式和udp格式來推流。現在要做的事情是用Netty來轉發rtmp到websocket介面上,然後用H5來播放。播放的外掛使用jsmpeg這個專案來實現。
【ffmpeg推mpegts】
ffmpeg推流支援http和udp兩種協議,目前還不支援websocket的方式。所以就打算用Netty做協議轉發。假定本地接收流地址是 http://localhost:9090 在Mac上推螢幕上的畫面可以用下面的命令
ffmpeg -f avfoundation -i "1" -vcodec libx264-f mpegts -codec:v mpeg1video -b 0
如果是推UDP的話,假定也是推到localhost,埠9094,可以使用下面的命令
ffmpeg -f avfoundation -i "1" -vcodec libx264-f mpegts -codec:v mpeg1video -b 0 udp://localhost:9094
上面這兩種方式都沒有加入音訊編碼,如果要包含音訊的話,需要指定音訊方式
ffmpeg -f avfoundation -i "1" -vcodec libx264-f mpegts -codec:v mpeg1video -acodec libfaac -b 0 udp://localhost
【嘗試HTTP推流】
按jsmpeg的教程,他把ffmpeg流推給nginx,讓nginx轉發到websocket上,而且並沒有修改nginx的模組,所以我想如果把ffmpeg推的資料buff直接轉發給websocket應該是可行的。至少http是可行的。那麼下來要做的事情就是把Http的報文去掉頭,只轉發response body就可以了。真的是這樣麼?
我用Netty建了一個bootstrap,包含監聽的EventLoopGroup和傳輸的EventLoopGroup,就是典型的一個服務bootstrap型別
ServerBootstrap,然後配置TCP模式設定一下TCP的nodelay,收發緩衝區大小等引數
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_SNDBUF, 1024*256)
.option(ChannelOption.SO_RCVBUF, 1024*256)
.option(ChannelOption.TCP_NODELAY, true);
之後沒有載入任何的編解碼器,直接把處理Handler加上去。
bootstrap.group(bossLoop, workerLoop)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("logging", new LoggingHandler(LogLevel.DEBUG));
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new MpegTsHandler());
}
});
這個處理Handler也超級簡單,就是把訊息分發到ws的連線組裡就完事了。注意Handler用的是ByteBuf,並不需要解析成Http協議。
public class MpegTsHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 轉發位元組流
PlayerGroup.broadCast(msg);
}
}
而其中的PlayerGroup,是一個channelGroup。注意其中在廣播的時候retain了一下。
public class PlayerGroup {
static private ChannelGroup channelGroup
= new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
static public void addChannel(Channel channel) {
channelGroup.add(channel);
}
static public void removeChannel(Channel channel) {
channelGroup.remove(channel);
}
static public void broadCast(ByteBuf message) {
if (channelGroup == null || channelGroup.isEmpty()) {
return;
}
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(message);
message.retain();
channelGroup.writeAndFlush(frame);
}
static public void destory() {
if (channelGroup == null || channelGroup.isEmpty()) {
return;
}
channelGroup.close();
}
}
這樣做是可以播放的。只是裡面含有很多HTTP頭部位元組。至於把HTTP頭部都去掉能不能行,沒有嘗試。因為無論轉發時是否去掉,在接收的時候同樣是多了很多的開銷,所以後面我轉到UDP方式的推流。
【UDP推流處理】
UDP方式的server要用NioDatagramChannel,也不需要編解碼模組,直接配上處理Handler
EventLoopGroup bossLoop = null;
try {
bossLoop = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
bootstrap
.group(bossLoop)
.option(ChannelOption.SO_BROADCAST, true) // 支援廣播
.option(ChannelOption.SO_SNDBUF, 1024 * 256)
.option(ChannelOption.SO_RCVBUF, 1024 * 256);
bootstrap
.handler(new UdpMpegTsHandler());
ChannelFuture future = bootstrap.bind(port).sync();
if (future.isSuccess()) {
System.out.println("UDP stream server start at port: " + port + ".");
}
future.channel().closeFuture().await();
} catch (Exception e) {
} finally {
if (bossLoop != null) {
bossLoop.shutdownGracefully();
}
}
Handler也超級簡單,唯一需要注意的是Handler接收的資料型別不再是ByteBuf,而是DatagramPacket
public class UdpMpegTsHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
PlayerGroup.broadCast(msg.content());
}
}
同樣的是轉發到group裡面去廣播。唯一需要注意的是要取出DatagramPacket中的ByteBuf
【websocket】
websocket階段和之前寫的ws稍微一點不一樣。這裡是用ws傳輸二進位制,所以ws的資料格式是BinaryWebSocketFrame。Server載入的編解碼器看起來像這樣:
bootstrap.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("readTimeout", new ReadTimeoutHandler(45)); // 長時間不寫會斷
ch.pipeline().addLast("HttpServerCodec", new HttpServerCodec());
ch.pipeline().addLast("ChunkedWriter", new ChunkedWriteHandler());
ch.pipeline().addLast("HttpAggregator", new HttpObjectAggregator(65535));
ch.pipeline().addLast("WsProtocolHandler",
new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));
ch.pipeline().addLast("WsBinaryDecoder", new WebSocketFrameDecoder()); // ws解碼成位元組
ch.pipeline().addLast("WsEncoder", new WebSocketFramePrepender()); // 位元組編碼成ws
ch.pipeline().addLast(new VideoPlayerHandler());
}
});
WebSocketFrameDecoder是自定義的解碼器。WebSocketFramePrepender是自定義的編碼器。VideoPlayerHandler是自定義處理Handler。
先看解碼器WebSocketFrameDecoder,這裡我用直接記憶體了。public class WebSocketFrameDecoder extends MessageToMessageDecoder<WebSocketFrame> {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
ByteBuf buff = msg.content();
byte[] messageBytes = new byte[buff.readableBytes()];
buff.readBytes(messageBytes);
// TODO: 直接記憶體小心
ByteBuf bytebuf = PooledByteBufAllocator.DEFAULT.buffer(); // 直接記憶體
bytebuf.writeBytes(messageBytes);
out.add(bytebuf.retain());
}
}
再看編碼器,按之前說的編碼成二進位制流,而不是字元流
public class WebSocketFramePrepender extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
WebSocketFrame webSocketFrame = new BinaryWebSocketFrame(msg);
out.add(webSocketFrame);
}
}
最後的Handler也超級簡單,就是把channel加到group裡面
public class VideoPlayerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ServerLogger.log("ws 連線 ctx:" + ctx);
PlayerGroup.addChannel(ctx.channel());
}
}
這樣就通過ChannelGroup打通了UDP到ws的通道。【結束語】
UDP的效果比較好,推HTTP的時候經常花屏,最後附帶說一下jsmpeg的選項,如果播放效果不好,需要調整jsmpeg的選項。
<!DOCTYPE html>
<html>
<head>
<title>JSMpeg Stream Client</title>
<style type="text/css">
html, body {
background-color: #111;
text-align: center;
}
</style>
</head>
<body>
<canvas id="video-canvas"></canvas>
<script type="text/javascript" src="js/jsmpeg.min.js"></script>
<script type="text/javascript">
var canvas = document.getElementById('video-canvas');
var url = 'ws://'+'127.0.0.1:9092/wsEntry';
var config = {
canvas: canvas,
autoplay: true,
audio: false,
video: true,
protocols: 'haofei',
};
var player = new JSMpeg.Player(url, config);
</script>
</body>
</html>
注意config裡面的protocols,記得和ws的server裡面配置一致
ch.pipeline().addLast("WsProtocolHandler",
new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));
這個子協議可以用來做鑑權神馬的,不過記得不能填"" 即空字串,因為在Netty裡不把空字串當成子協議。這裡不知道算不算Netty的bug。