1. 程式人生 > >【ODL-Openstack學習系列-04】-openflowplugin 氧版本連線分析

【ODL-Openstack學習系列-04】-openflowplugin 氧版本連線分析

0 準備說明

  • 版本:openflowplugin-release-oxygen-sr2
  • 功能:openflow伺服器建立及連線物件流程;

1 啟動分析

1.1 blueprint配置檔案位置

OpenFlowPluginProviderImpl通過OpenFlowPluginProviderFactoryImpl建立,其在兩個配置檔案中都有啟動,一個是

org/opendaylight/blueprint/openflowplugin.xml

另一個為:

org/opendaylight/blueprint/openflowplugin-impl.xml

多個配置檔案都存在的情況下,具體在哪個啟動還有待進一步分析和總結;

1.2 啟動netty服務、客戶端程式碼

org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java
啟動後執行 onSystemBootReady() ---> startSwitchConnections
//輪詢所有switchConnectionProviders執行  
switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
return switchConnectionProvider.
startup(); --------------------------------------------- org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java ListenableFuture<Boolean> startup(); ----> createAndConfigureServer() -----> createAndConfigureServer方法根據ConnectionConfiguration啟動配置生成不同 的ServerFacade,包括TcpHandler和UdpHandler,完成伺服器的基本引數配置,特別是TcpChannelInitializer channel初始化器,同時獲取到服務的工作執行緒組,用於客戶端方式的主動連線配置

總結: org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java類中,OpenFlowPluginProviderImpl通過呼叫createAndConfigureServer完成了netty服務端的初始化以及客戶端初始化。但是無論是服務端還是客戶端,有兩個引數是不變的:

  • 一個是channel初始化器(含有序列化器、訊息處理handler)
  • 另外一個是公用工作執行緒組;

這兩點保障了無論是客戶端還是服務端方式,openflow協議的上層可以是等同的,只是傳輸層有變化,支援了傳輸層的多樣化,也保持了應用層的一致性。

2 channel初始化器分析

重寫的這個初始化器,含有我們所有需要關注的入口位置,有必要把函式列出來重點分析以下:

    protected void initChannel(final SocketChannel ch) {
        //流程1:檢查遠端ip,檢查是否接受此ip地址,不接受則斷開連線;
        if (ch.remoteAddress() != null) {
            final InetAddress switchAddress = ch.remoteAddress().getAddress();
            final int port = ch.localAddress().getPort();
            final int remotePort = ch.remoteAddress().getPort();
            LOG.debug("Incoming connection from (remote address): {}:{} --> :{}",
                switchAddress.toString(), remotePort, port);

            if (!getSwitchConnectionHandler().accept(switchAddress)) {
                ch.disconnect();
                LOG.debug("Incoming connection rejected");
                return;
            }
        }
        LOG.debug("Incoming connection accepted - building pipeline");
        //步驟2:儲存通道;
        allChannels.add(ch);
        ConnectionFacade connectionFacade = null;
        //步驟3:新建連線工具類
        connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, useBarrier(),
                getChannelOutboundQueueSize());
        try {
            LOG.debug("Calling OF plugin: {}", getSwitchConnectionHandler());
 		//步驟4-getSwitchConnectionHandler基本引數填入介面卡
         getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
            connectionFacade.checkListeners();
            ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(),
                    new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS));
            boolean tlsPresent = false;

            // If this channel is configured to support SSL it will only support SSL
            if (getTlsConfiguration() != null) {
                tlsPresent = true;
                final SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration());
                final SSLEngine engine = sslFactory.getServerContext().createSSLEngine();
                engine.setNeedClientAuth(true);
                engine.setUseClientMode(false);
                List<String> suitesList = getTlsConfiguration().getCipherSuites();
                if (suitesList != null && !suitesList.isEmpty()) {
                    LOG.debug("Requested Cipher Suites are: {}", suitesList);
                    String[] suites = suitesList.toArray(new String[suitesList.size()]);
                    engine.setEnabledCipherSuites(suites);
                    LOG.debug("Cipher suites enabled in SSLEngine are: {}",
                            Arrays.toString(engine.getEnabledCipherSuites()));
                }
                final SslHandler ssl = new SslHandler(engine);
                final Future<Channel> handshakeFuture = ssl.handshakeFuture();
                final ConnectionFacade finalConnectionFacade = connectionFacade;
                handshakeFuture.addListener(future -> finalConnectionFacade.fireConnectionReadyNotification());
                ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), ssl);
            }
            ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(),
                    new OFFrameDecoder(connectionFacade, tlsPresent));
            ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector());
            final OFDecoder ofDecoder = new OFDecoder();
            ofDecoder.setDeserializationFactory(getDeserializationFactory());
            ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder);
            final OFEncoder ofEncoder = new OFEncoder();
            ofEncoder.setSerializationFactory(getSerializationFactory());
            ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder);
            ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
                    new DelegatingInboundHandler(connectionFacade));
            if (!tlsPresent) {
                connectionFacade.fireConnectionReadyNotification();
            }
        } catch (RuntimeException e) {
            LOG.warn("Failed to initialize channel", e);
            ch.close();
        }
    }

2.1 連線介面卡--connectionFacade

抽象父類為AbstractConnectionAdapter,含有通道的基本連線資訊,抽象方法為幾個重要監聽器的設定,實現類ConnectionAdapterImpl中包含如下幾個重要監聽器和模組:

  • ConnectionReadyListener–連線準備監聽器
  • OpenflowProtocolListener–協議監聽器-》接收後續的各種協議訊息,完成訊息處理;
  • SystemNotificationsListener–系統訂閱監聽器,主要監聽交換機閒置和斷連事件;
  • AlienMessageListener–不同性質訊息監聽器;監聽不識別的訊息???待後續進一步分析確定功能;
  • SystemNotificationsListener–輸出管理佇列
  • OFVersionDetector–版本檢測器
  • useBarrier–是否使用useBarrier標誌;

2.2 配器引數提供者--SwitchConnectionHandler

在2章開頭程式碼的第四步,如下程式碼為生成的介面卡提供了基本的連線監聽器等引數的初始設定;

getSwitchConnectionHandler().onSwitchConnected(connectionFacade);

onSwitchConnected的核心功能就是new出2.1提到的幾個重要監聽器,然後連線成功後把相關引數傳入connectionFacade;

  • ConnectionReadyListenerImpl
    • connectionContext ----> ConnectionContextImpl
    • handshakeContext ----> HandshakeContextImpl
      • executorService
      • handshakeManager ----> createHandshakeManager(connectionAdapter, handshakeListener);
  • OpenflowProtocolListenerInitialImpl
  • SystemNotificationsListenerImpl

2.3 訊息內部代理類--DelegatingInboundHandler

此類負責註冊,以及監聽到訊息後,排程訊息消費者,完成訊息的監聽器執行,進入不同的訊息處理流程;

       ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
                    new DelegatingInboundHandler(connectionFacade));

此處程式碼完成openflow訊息的直接處理註冊,當監聽到訊息,由這裡的DelegatingInboundHandler呼叫channelRead,完成訊息的讀操作;

DelegatingInboundHandler ---> channelRead
---->consumer.consume((DataObject) msg);
這裡的consumer則是上述2.1和2.2生產構造的connectionFacade;
---->
consumeDeviceMessage
  • ConnectionAdapterImpl重寫了consumeDeviceMessage訊息完成訊息的處理和呼叫不同的監聽器,後續檢視類似程式碼可以通過介面卡的consumeDeviceMessage為入口繼續分析相關流程;

3 openflow連線建立流程

3.1 收到hello訊息-OpenflowProtocolListenerInitialImpl

  • 2.2一開始註冊的OpenflowProtocolListener為OpenflowProtocolListenerInitialImpl,也就是初始化訊息相關的一些處理;一旦連線成功後,後續會替換掉該listener換成OpenflowProtocolListenerFullImpl,後面步驟會分析到此處;

主要函式執行位置為OpenflowProtocolListenerInitialImpl--》onHelloMessage;

此時通道的狀態應該為CONNECTION_STATE.HANDSHAKING,函式主要功能是開啟一個HandshakeStepWrapper,啟動執行緒進行握手處理;

  • HandshakeManagerImpl的握手處理如下所示:

    //01-如果未收到hello訊息,先發送hello
    

未完待續...