1. 程式人生 > >大資料相關面試題整理-帶答案

大資料相關面試題整理-帶答案

1、fsimage和edit的區別?
  大家都知道namenode與secondary namenode 的關係,當他們要進行資料同步時叫做checkpoint時就用到了fsimage與edit,fsimage是儲存最新的元資料的資訊,當fsimage資料到一定的大小事會去生成一個新的檔案來儲存元資料的資訊,這個新的檔案就是edit,edit會回滾最新的資料。


2、列舉幾個配置檔案優化? --發揮
  1)Core-site.xml 檔案的優化
    a、fs.trash.interval,預設值: 0;說明: 這個是開啟hdfs檔案刪除自動轉移到垃圾箱的選項,值為垃圾箱檔案清除時間。一般開啟這個會比較好,以防錯誤刪除重要檔案。單位是分鐘。
    b、dfs.namenode.handler.count,預設值:10;說明:hadoop系統裡啟動的任務執行緒數,這裡改為40,同樣可以嘗試該值大小對效率的影響變化進行最合適的值的設定。
    c、mapreduce.tasktracker.http.threads,預設值:40;說明:map和reduce是通過http進行資料傳輸的,這個是設定傳輸的並行執行緒數。


3、datanode 首次加入 cluster 的時候,如果 log 報告不相容檔案版本,那需要namenode 執行格式化操作,這樣處理的原因是?


  1)這樣處理是不合理的,因為那麼 namenode 格式化操作,是對檔案系統進行格式化,namenode 格式化時清空 dfs/name 下空兩個目錄下的所有檔案,之後,會在目錄 dfs.name.dir 下建立檔案。
  2)文字不相容,有可能時 namenode 與 datanode 的 資料裡的 namespaceID、clusterID 不一致,找到兩個 ID 位置,修改為一樣即可解決。


4、MapReduce 中排序發生在哪幾個階段?這些排序是否可以避免?為什麼?
  1)一個 MapReduce 作業由 Map 階段和 Reduce 階段兩部分組成,這兩階段會對資料排序,從這個意義上說,MapReduce 框架本質就是一個 Distributed Sort。
  2)在 Map 階段,Map Task 會在本地磁碟輸出一個按照 key 排序(採用的是快速排序)的檔案(中間可能產生多個檔案,但最終會合併成一個),在 Reduce 階段,每個 Reduce Task 會對收到的資料排序,這樣,資料便按照 Key 分成了若干組,之後以組為單位交給 reduce()處理。
  3)很多人的誤解在 Map 階段,如果不使用 Combiner便不會排序,這是錯誤的,不管你用不用 Combiner,Map Task 均會對產生的資料排序(如果沒有 Reduce Task,則不會排序,實際上 Map 階段的排序就是為了減輕 Reduce端排序負載)。
  4)由於這些排序是 MapReduce 自動完成的,使用者無法控制,因此,在hadoop 1.x 中無法避免,也不可以關閉,但 hadoop2.x 是可以關閉的。


5、hadoop的優化?

  1)優化的思路可以從配置檔案和系統以及程式碼的設計思路來優化
  2)配置檔案的優化:調節適當的引數,在調引數時要進行測試
  3)程式碼的優化:combiner的個數儘量與reduce的個數相同,資料的型別保持一致,可以減少拆包與封包的進度
  4)系統的優化:可以設定linux系統開啟最大的檔案數預計網路的頻寬MTU的配置
  5)為 job 新增一個 Combiner,可以大大的減少shuffer階段的maoTask拷貝過來給遠端的   reduce task的資料量,一般而言combiner與reduce相同。
  6)在開發中儘量使用stringBuffer而不是string,string的模式是read-only的,如果對它進行修改,會產生臨時的物件,二stringBuffer是可修改的,不會產生臨時物件。
  7)修改一下配置:以下是修改 mapred-site.xml 檔案
    a、修改最大槽位數:槽位數是在各個 tasktracker 上的 mapred-site.xml 上設定的,預設都是 2
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>2</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>2</value>
</property>
    b、調整心跳間隔:叢集規模小於 300 時,心跳間隔為 300 毫秒
mapreduce.jobtracker.heartbeat.interval.min 心跳時間
mapred.heartbeats.in.second 叢集每增加多少節點,時間增加下面的值
mapreduce.jobtracker.heartbeat.scaling.factor 叢集每增加上面的個數,心跳增多少
    c、啟動帶外心跳
mapreduce.tasktracker.outofband.heartbeat 預設是 false
    d、配置多塊磁碟
mapreduce.local.dir
    e、配置 RPC hander 數目
mapred.job.tracker.handler.count 預設是 10,可以改成 50,根據機器的能力
    f、配置 HTTP 執行緒數目
tasktracker.http.threads 預設是 40,可以改成 100 根據機器的能力
    g、選擇合適的壓縮方式,以 snappy 為例:
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.map.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>


6(好)、兩個檔案合併的問題:給定a、b兩個檔案,各存放50億個url,每個url各佔用64位元組,記憶體限制是4G,如何找出a、b檔案共同的url?

  1)主要的思想是把檔案分開進行計算,在對每個檔案進行對比,得出相同的URL,因為以上說是含有相同的URL所以不用考慮資料傾斜的問題。詳細的解題思路如下:
    a、可以估計每個檔案的大小為5G*64=300G,遠大於4G。所以不可能將其完全載入到記憶體中處理。考慮採取分而治之的方法。 
    b、遍歷檔案a,對每個url求取hash(url)%1000,然後根據所得值將url分別儲存到1000個小檔案(設為a0,a1,...a999)當中。這樣每個小檔案的大小約為300M。
    b、遍歷檔案b,採取和a相同的方法將url分別儲存到1000個小檔案(b0,b1....b999)中。這樣處理後,所有可能相同的url都在對應的小檔案(a0 vs b0, a1 vs b1....a999 vs b999)當中,不對應的小檔案(比如a0 vs b99)不可能有相同的url。然後我們只要求出1000對小檔案中相同的url即可。 
    c、比如對於a0 vs b0,我們可以遍歷a0,將其中的url儲存到hash_map當中。然後遍歷b0,如果url在hash_map中,則說明此url在a和b中同時存在,儲存到檔案中即可。 
    d、如果分成的小檔案不均勻,導致有些小檔案太大(比如大於2G),可以考慮將這些太大的小檔案再按類似的方法分成小小檔案即可


7、按照需求使用spark編寫一下程式?
  A、當前檔案a.text的格式如下,請統計每個單詞出現的個數
A,b,c,d
B,b,f,e
A,a,c,f


sc.textFile(“/user/local/a.text”).flatMap(_.split(“,”)).map((_,1)).ReduceByKey(_+_).collect()
或:
package cn.bigdata
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object Demo {
  /*
a,b,c,d
b,b,f,e
a,a,c,f
c,c,a,d
   * 計算第四列每個元素出現的個數
   */
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val data: RDD[String] = sc.textFile("f://demo.txt")
    //資料切分
    val fourthData: RDD[(String, Int)] = data.map { x =>
      val arr: Array[String] = x.split(",")
      val fourth: String = arr(3)
      (fourth, 1)
    }
    val result: RDD[(String, Int)] = fourthData.reduceByKey(_ + _);
    println(result.collect().toBuffer)
  }
}


  B、HDFS中有兩個檔案a.text與b.text,檔案的格式為(ip,username),如:a.text,b.text
a.text
127.0.0.1  xiaozhang
127.0.0.1  xiaoli
127.0.0.2  wangwu
127.0.0.3  lisi 


B.text
127.0.0.4  lixiaolu
127.0.0.5  lisi 


每個檔案至少有1000萬行,請用程式完成以下工作,
1)每個檔案的個子的IP
2)出現在b.text而沒有出現在a.text的IP
3)每個user出現的次數以及每個user對應的IP的個數 


程式碼如下:
1)各個檔案的ip數
package cn.bigdata
import java.util.concurrent.Executors
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.RemoteIterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 


//各個檔案的ip數
object Demo2 {
  val cachedThreadPool = Executors.newCachedThreadPool()
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo2").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val hdpConf: Configuration = new Configuration
    val fs: FileSystem = FileSystem.get(hdpConf)
    val listFiles: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path("f://txt/2/"), true)


    while (listFiles.hasNext) {
      val fileStatus = listFiles.next
      val pathName = fileStatus.getPath.getName
      cachedThreadPool.execute(new Runnable() {
        override def run(): Unit = {
          println("=======================" + pathName)
          analyseData(pathName, sc)
        }
      })
    }
  }


  def analyseData(pathName: String, sc: SparkContext): Unit = {
    val data: RDD[String] = sc.textFile("f://txt/2/" + pathName)
    val dataArr: RDD[Array[String]] = data.map(_.split(" "))
    val ipAndOne: RDD[(String, Int)] = dataArr.map(x => {
      val ip = x(0)
      (ip, 1)
    })
    val counts: RDD[(String, Int)] = ipAndOne.reduceByKey(_ + _)
    val sortedSort: RDD[(String, Int)] = counts.sortBy(_._2, false)
    sortedSort.saveAsTextFile("f://txt/3/" + pathName)
  }
}


2)出現在b.txt而沒有出現在a.txt的ip
package cn.bigdata
import java.util.concurrent.Executors
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/*
 * 出現在b.txt而沒有出現在a.txt的ip
 */
object Demo3 {
  val cachedThreadPool = Executors.newCachedThreadPool()
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Demo3").setMaster("local")
    val sc = new SparkContext(conf)
    val data_a = sc.textFile("f://txt/2/a.txt")
    val data_b = sc.textFile("f://txt/2/b.txt")
    val splitArr_a = data_a.map(_.split(" "))
    val ip_a: RDD[String] = splitArr_a.map(x => x(0))
    val splitArr_b = data_b.map(_.split(" "))
    val ip_b: RDD[String] = splitArr_b.map(x => x(0))
    val subRdd: RDD[String] = ip_b.subtract(ip_a)
    subRdd.saveAsTextFile("f://txt/4/")
  }



3)
package cn.bigdata
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.collection.mutable.Set
/*
 * 每個user出現的次數以及每個user對應的ip數
 */
object Demo4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Demo4").setMaster("local")
    val sc = new SparkContext(conf)
    val data: RDD[String] = sc.textFile("f://txt/5/")
    val lines = data.map(_.split(" "))
    val userIpOne = lines.map(x => {
      val ip = x(0)
      val user = x(1)
      (user, (ip, 1))
    })


    val userListIpCount: RDD[(String, (Set[String], Int))] = userIpOne.combineByKey(
      x => (Set(x._1), x._2),
      (a: (Set[String], Int), b: (String, Int)) => {
        (a._1 + b._1, a._2 + b._2)
      },
      (m: (Set[String], Int), n: (Set[String], Int)) => {
        (m._1 ++ n._1, m._2 + n._2)
      }) 


    val result: RDD[String] = userListIpCount.map(x => {
      x._1 + ":userCount:" + x._2._2 + ",ipCount:" + x._2._1.size
    })


    println(result.collect().toBuffer)
  }
}


8、設計題
  1)採集nginx產生的日誌,日誌的格式為user  ip   time  url   htmlId  每天產生的檔案的資料量上億條,請設計方案把資料儲存到HDFS上,並提供一下實時查詢的功能(響應時間小於3s)
A、某個使用者某天訪問某個URL的次數
B、某個URL某天被訪問的總次數 


實時思路是:使用Logstash + Kafka + Spark-streaming + Redis + 報表展示平臺
離線的思路是:Logstash + Kafka + Elasticsearch +  Spark-streaming + 關係型資料庫
A、B、資料在進入到Spark-streaming 中進行過濾,把符合要求的資料儲存到Redis中


9、有 10 個檔案,每個檔案 1G,每個檔案的每一行存放的都是使用者的 query,每個檔案的query 都可能重複。要求你按照 query 的頻度排序。 還是典型的 TOP K 演算法,
  解決方案如下: 
    1)方案 1: 
    順序讀取 10 個檔案,按照 hash(query)%10 的結果將 query 寫入到另外 10 個檔案(記為)中。這樣新生成的檔案每個的大小大約也 1G(假設 hash 函式是隨機的)。 找一臺記憶體在 2G 左右的機器,依次對用 hash_map(query, query_count)來統計每個query 出現的次數。利用快速/堆/歸併排序按照出現次數進行排序。將排序好的 query 和對應的 query_cout 輸出到檔案中。這樣得到了 10 個排好序的檔案(記為)。 對這 10 個檔案進行歸併排序(內排序與外排序相結合)。 
    2)方案 2: 
    一般 query 的總量是有限的,只是重複的次數比較多而已,可能對於所有的 query,一次性就可以加入到記憶體了。這樣,我們就可以採用 trie 樹/hash_map等直接來統計每個 query出現的次數,然後按出現次數做快速/堆/歸併排序就可以了。 
    3)方案 3: 
    與方案 1 類似,但在做完 hash,分成多個檔案後,可以交給多個檔案來處理,採用分散式的架構來處理(比如 MapReduce),最後再進行合併。


10、在 2.5 億個整數中找出不重複的整數,注,記憶體不足以容納這 2.5 億個整數。 
  1)方案 1:採用 2-Bitmap(每個數分配 2bit,00 表示不存在,01 表示出現一次,10 表示多次,11 無意義)進行,共需記憶體 2^32 * 2 bit=1 GB 記憶體,還可以接受。然後掃描這 2.5億個整數,檢視 Bitmap 中相對應位,如果是 00 變 01,01 變 10,10 保持不變。所描完事後,檢視 bitmap,把對應位是 01 的整數輸出即可。 
  2)方案 2:也可採用與第 1 題類似的方法,進行劃分小檔案的方法。然後在小檔案中找出不重複的整數,並排序。然後再進行歸併,注意去除重複的元素。 


11、騰訊面試題:給 40 億個不重複的 unsigned int 的整數,沒排過序的,然後再給一個數,如何快速判斷這個數是否在那 40 億個數當中? 
  1)方案 1:oo,申請 512M 的記憶體,一個 bit 位代表一個 unsigned int 值。讀入 40 億個數,設定相應的 bit 位,讀入要查詢的數,檢視相應 bit 位是否為 1,為 1 表示存在,為 0 表示不存在。 
  2)方案 2:這個問題在《程式設計珠璣》裡有很好的描述,大家可以參考下面的思路,探討一下: 又因為 2^32 為 40 億多,所以給定一個數可能在,也可能不在其中; 這裡我們把 40 億個數中的每一個用 32 位的二進位制來表示 ,假設這 40 億個數開始放在一個檔案中。 然後將這 40 億個數分成兩類: 
1.最高位為 0 
2.最高位為 1 
    並將這兩類分別寫入到兩個檔案中,其中一個檔案中數的個數<=20 億,而另一個>=20 億(這相當於折半了); 與要查詢的數的最高位比較並接著進入相應的檔案再查詢 再然後把這個檔案為又分成兩類: 
1.次最高位為 0 
2.次最高位為 1 
    並將這兩類分別寫入到兩個檔案中,其中一個檔案中數的個數<=10 億,而另一個>=10 億(這相當於折半了); 與要查詢的數的次最高位比較並接著進入相應的檔案再查詢。 
..... 
    以此類推,就可以找到了,而且時間複雜度為 O(logn),方案 2 完。 
  3)附:這裡,再簡單介紹下,點陣圖方法: 使用點陣圖法判斷整形陣列是否存在重複 ,判斷集合中存在重複是常見程式設計任務之一,當集合中資料量比較大時我們通常希望少進行幾次掃描,這時雙重迴圈法就不可取了。 
    點陣圖法比較適合於這種情況,它的做法是按照集合中最大元素 max 建立一個長度為 max+1的新陣列,然後再次掃描原陣列,遇到幾就給新陣列的第幾位置上 1,如遇到 5 就給新陣列的第六個元素置 1,這樣下次再遇到 5 想置位時發現新陣列的第六個元素已經是 1 了,這說明這次的資料肯定和以前的資料存在著重複。這 種給新陣列初始化時置零其後置一的做法類似於點陣圖的處理方法故稱點陣圖法。它的運算次數最壞的情況為 2N。如果已知陣列的最大值即能事先給新陣列定長的話效 率還能提高一倍。


12、怎麼在海量資料中找出重複次數最多的一個? 
  1)方案 1:先做 hash,然後求模對映為小檔案,求出每個小檔案中重複次數最多的一個,並記錄重複次數。然後找出上一步求出的資料中重複次數最多的一個就是所求(具體參考前面的題)。


13、上千萬或上億資料(有重複),統計其中出現次數最多的錢 N 個數據。 
  1)方案 1:上千萬或上億的資料,現在的機器的記憶體應該能存下。所以考慮採用 hash_map/搜尋二叉樹/紅黑樹等來進行統計次數。然後就是取出前 N 個出現次數最多的資料了,可以用第 2 題提到的堆機制完成。


14、一個文字檔案,大約有一萬行,每行一個詞,要求統計出其中最頻繁出現的前 10 個詞,給出思想,給出時間複雜度分析。 
  1)方案 1:這題是考慮時間效率。用 trie 樹統計每個詞出現的次數,時間複雜度是 O(n*le)(le表示單詞的平準長度)。然後是找出出現最頻繁的前 10 個詞,可以用堆來實現,前面的題中已經講到了,時間複雜度是 O(n*lg10)。所以總的時間複雜度,是 O(n*le)與 O(n*lg10)中較大的哪一 個。


15、100w 個數中找出最大的 100 個數。 
  1)方案 1:在前面的題中,我們已經提到了,用一個含 100 個元素的最小堆完成。複雜度為O(100w*lg100)。 
  2)方案 2:採用快速排序的思想,每次分割之後只考慮比軸大的一部分,知道比軸大的一部分在比 100 多的時候,採用傳統排序演算法排序,取前 100 個。複雜度為 O(100w*100)。 
  3)方案 3:採用區域性淘汰法。選取前 100 個元素,並排序,記為序列 L。然後一次掃描剩餘的元素 x,與排好序的 100 個元素中最小的元素比,如果比這個最小的 要大,那麼把這個最小的元素刪除,並把 x 利用插入排序的思想,插入到序列 L 中。依次迴圈,直到掃描了所有的元素。複雜度為 O(100w*100)。 


16、有一千萬條簡訊,有重複,以文字檔案的形式儲存,一行一條,有重複。 請用 5 分鐘時間,找出重複出現最多的前 10 條。 
  1)分析: 常規方法是先排序,在遍歷一次,找出重複最多的前 10 條。但是排序的演算法複雜度最低為nlgn。 
  2)可以設計一個 hash_table, hash_map<string, int>,依次讀取一千萬條簡訊,載入到hash_table 表中,並且統計重複的次數,與此同時維護一張最多 10 條的簡訊表。 這樣遍歷一次就能找出最多的前 10 條,演算法複雜度為 O(n)。