1. 程式人生 > >akka分散式爬蟲框架(一)——設計思路與demo

akka分散式爬蟲框架(一)——設計思路與demo

最近在學習akka,在讀了一下解析actor model的文章以及熟悉了一下官方文件的例子的後
我覺得需要一個專案來幫我進一步熟悉akka與scala程式設計,進過一番思索,我覺得akka可以用來
實現一個分散式爬蟲框架。
   設計思路

1. 依賴的庫,

    http請求方面使用async-http-client,連結:https://github.com/AsyncHttpClient/async-http-client

    分散式框架則是使用akka。

    集中式儲存系統使用kafka

    快取使用redis

2. 執行流程

    1. 使用者提交任務,就是提交一個jar包給manager。

    2. manager接收到jar包後對其進行掃描,讀取其中的配置檔案以及爬蟲的具體邏輯,爬蟲的介面現在考慮仿照scrapy的來

    3. manager根據jar包的爬蟲邏輯類和配置建立相應的檢查url的執行緒與執行任務的actor,並且將初始url推入kafka

    4. 檢查url的執行緒啟動,不斷的從kafka中讀取url,然後封裝後投遞給actor(此處應該考慮actor的負載均衡問題,暫時打算用平均方法演算法,以後考慮支援負載均衡的配置)

    5. actor呼叫async-http-client非同步請求url,同時註冊回撥時將response封裝後投遞給處理actor,同時將成功的url非同步寫入redis(防止爬取重複url)

    6. 處理actor呼叫使用者自定義的處理方法,將解析獲得的url集合與redis中已爬取url取差集,然後將未爬取url推入kafka

  核心邏輯測試

    模擬了使用者解析過程,直接生成url,同時kafka與redis也捨去,直接用單例的LinkedBlockingQueue替代

    實現

package awm

import scala.collection.JavaConverters._
import java.util.concurrent.{BlockingQueue, Future, LinkedBlockingQueue}


import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient, Response}


case class UrlMessage(url: String, id: Int)

object Singleton {
  val queue: BlockingQueue[String] = new LinkedBlockingQueue[String]()
  val c = new AsyncHttpClient
  var count: Long = 0
}

class CrawlerDemoActor extends Actor with ActorLogging{


  override def receive = {
    case UrlMessage(url, id) => {
      val futureResponse: Future[Response] = Singleton.c.prepareGet(url).execute(new AsyncCompletionHandler[Response] {
        override def onCompleted(response: Response) = {
          Singleton.queue.addAll(List(url, url).asJava)
          Singleton.count += 1
          log.info(s"${Singleton.count}, ${Thread.activeCount}")
          response
        }
      })
      Singleton.queue.addAll(List(url, url).asJava)
    }
  }
}

object CrawlerDemoActor{
  def props: Props = Props(new CrawlerDemoActor())
}

 測試
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import akka.actor.{Actor, ActorSystem, Props, ActorRef}
import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
import awm.{CrawlerDemoActor, UrlMessage}

import util.control.Breaks._
import awm.Singleton._



class TestAsyncClientWithAkka(_system: ActorSystem)
  extends TestKit(_system)
  with Matchers
  with FlatSpecLike
  with BeforeAndAfterAll{

  object N{var i = 0}



  def this() = this(ActorSystem("akka-demo"))

  override def afterAll(): Unit = {
    shutdown(system)
  }

  "A Demo" should "succesful" in {
    var c: List[ActorRef] = List.empty[ActorRef]

    for (x <- 1 to 40){
      val actor = system.actorOf(CrawlerDemoActor.props)
      c = actor::c
    }

    queue.add("http://www.csdn.net/")
    queue.add("http://www.oschina.net/")
    val testProbe = TestProbe()
    var i = 0
    while (true){
      val url = queue.poll()
      breakable {
        if (url == null)
          break()
        if (i >= 40)
          i = 0
        c(i) ! UrlMessage(url, N.i)
        N.i += 1
        i += 1
      }
    }
  }
}

pom.xml

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.11</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.11</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.11</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.ning/async-http-client -->
        <dependency>
            <groupId>com.ning</groupId>
            <artifactId>async-http-client</artifactId>
            <version>1.9.40</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>


        <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.11 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.5.6</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.11 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-testkit_2.11</artifactId>
            <version>2.5.6</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scalatest/scalatest_2.11 -->
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_2.11</artifactId>
            <version>3.0.4</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api -->
        <dependency>
            <groupId>javax.xml.bind</groupId>
            <artifactId>jaxb-api</artifactId>
            <version>2.3.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


輸出結果

[INFO] [11/05/2017 14:07:32.728] [New I/O worker #14] [akka://akka-demo/user/$H] 21823, 47
[INFO] [11/05/2017 14:07:32.730] [New I/O worker #5] [akka://akka-demo/user/$z] 21824, 47
[INFO] [11/05/2017 14:07:32.731] [New I/O worker #11] [akka://akka-demo/user/$k] 21825, 47
[INFO] [11/05/2017 14:07:32.734] [New I/O worker #13] [akka://akka-demo/user/$c] 21826, 47
[INFO] [11/05/2017 14:07:32.734] [New I/O worker #13] [akka://akka-demo/user/$n] 21827, 47
[INFO] [11/05/2017 14:07:32.736] [New I/O worker #8] [akka://akka-demo/user/$z] 21828, 47
[INFO] [11/05/2017 14:07:32.746] [New I/O worker #9] [akka://akka-demo/user/$n] 21829, 47
[INFO] [11/05/2017 14:07:32.746] [New I/O worker #6] [akka://akka-demo/user/$A] 21830, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$K] 21831, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #5] [akka://akka-demo/user/$r] 21832, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$o] 21833, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #14] [akka://akka-demo/user/$j] 21834, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$o] 21835, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #10] [akka://akka-demo/user/$r] 21836, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #10] [akka://akka-demo/user/$N] 21837, 47
[INFO] [11/05/2017 14:07:32.754] [New I/O worker #16] [akka://akka-demo/user/$K] 21838, 47

這個設計思路只是我一點不成熟的想法,歡迎大家提出建議。