1. 程式人生 > >使用Spark批量上傳圖片到HBase中並同時使用OpenCV提取SIFT特徵值

使用Spark批量上傳圖片到HBase中並同時使用OpenCV提取SIFT特徵值

最近正在學習利用Spark做影象的分類和檢索實驗,首先需要上傳影象資料(儲存在本地檔案系統中)到HBase中,提取的影象特徵是SIFT,藉助OpenCV庫提取,剛開始是寫一個任務上傳圖片,然後再寫一個任務提取HBase中影象的特徵值,考慮到圖片的序列化和反序列化會耗費大量的時間,且頻繁的磁碟IO對時間消耗也很大,因此,將兩個任務合併成一個任務處理,減少處理時間,同時實驗過程中也遇到不少錯誤。

批量上傳資料並提取SIFT特徵

package com.fang.spark

import java.awt.image.{BufferedImage, DataBufferByte}
import
java.io.ByteArrayInputStream import javax.imageio.ImageIO import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import org.opencv.core.{Core, CvType, Mat, MatOfKeyPoint} import org.opencv.features2d.{DescriptorExtractor, FeatureDetector} /** * Created by hadoop on 16-11-15. * 批量上傳資料到HBase表中,並計算sift值 * 表結構為: * imagesTable:(image:imageBinary,sift) * RowKey設計為: */
object HBaseUpLoadImages { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setAppName("HBaseUpLoadImages"). setMaster("local[4]"). set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = new SparkContext(sparkConf) //TODO 單機測試情況下,圖片檔案太多,程式執行失敗,打出hs_err_pid***_log日誌,具體情況不明
val imagesRDD = sparkContext.binaryFiles("/home/fang/images/train/test") System.loadLibrary(Core.NATIVE_LIBRARY_NAME) // val imagesRDD = sparkContext.newAPIHadoopFile[Text, BufferedImage, ImmutableBytesWritable]("/home/fang/images/train/1") // val columnFaminlys :Array[String] = Array("image") //createTable(tableName,columnFaminlys,connection) imagesRDD.foreachPartition { iter => { val hbaseConfig = HBaseConfiguration.create() hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181") hbaseConfig.set("hbase.zookeeper.quorum", "fang-ubuntu,fei-ubuntu,kun-ubuntu") val connection: Connection = ConnectionFactory.createConnection(hbaseConfig); val tableName = "imagesTest" val table: Table = connection.getTable(TableName.valueOf(tableName)) iter.foreach { imageFile => { //val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(imageFile._2.toArray())) //println(bi.getColorModel) val tempPath = imageFile._1.split("/") val len = tempPath.length val imageName = tempPath(len - 1) //TODO 嘗試直接獲取BufferedImage資料,提升效率 val imageBinary: scala.Array[Byte] = imageFile._2.toArray() val put: Put = new Put(Bytes.toBytes(imageName)) put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("binary"), imageBinary) put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("path"), Bytes.toBytes(imageFile._1)) val sift = getImageSift(imageBinary) if(!sift.isEmpty) { put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("sift"),sift.get) } table.put(put) } } connection.close() } } sparkContext.stop() } //獲取影象的sift特徵 def getImageSift(image: Array[Byte]):Option[Array[Byte]] = { val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(image)) val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8U) //java.lang.UnsupportedOperationException: // Provided data element number (188000) should be multiple of the Mat channels count (3) //更改CvType.CV_8UC3為CvType.CV_8U,解決上面錯誤 // val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8UC3) val data = bi.getRaster.getDataBuffer.asInstanceOf[DataBufferByte].getData // if(bi.getColorModel.getNumComponents==3) { test_mat.put(0, 0, data) val desc = new Mat val fd = FeatureDetector.create(FeatureDetector.SIFT) val mkp = new MatOfKeyPoint fd.detect(test_mat, mkp) val de = DescriptorExtractor.create(DescriptorExtractor.SIFT) //提取sift特徵 de.compute(test_mat, mkp, desc) //判斷是否有特徵值 if(desc.rows()!=0) { Some(Utils.serializeMat(desc)) }else{ None } } }

需要序列化提取的Mat型別特徵值

public class Utils{
 public static byte[] serializeMat(Mat mat) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            float[] data = new float[(int) mat.total() * mat.channels()];
            mat.get(0, 0, data);
            ObjectOutput out = new ObjectOutputStream(bos);
            out.writeObject(data);
            out.close();
            // Get the bytes of the serialized object
            byte[] buf = bos.toByteArray();
            return buf;
        } catch (IOException ioe) {
            ioe.printStackTrace();
            return null;
        }
    }
 }

遇到的錯誤及解決

  1. java.lang.UnsupportedOperationException:Provided data element number (188000) should be multiple of the Mat channels count (3)
    更改CvType.CV_8UC3為CvType.CV_8U,解決上面錯誤
  2. not serialization
    沒有使用foreachPartition出現下面的錯誤
    org.apache.spark.SparkException: Task not serializable
  3. UnsatisfiedLinkError: Native method not found: org.opencv.core.Mat.n_Mat:()J
    忘記載入或載入失敗(System.loadLibrary(Core.NATIVE_LIBRARY_NAME))
  4. java.lang.UnsupportedOperationException: Mat data type is not compatible: 0
    mat型別資料序列化時沒有判斷mat是否有值

執行的過程中需要載入opencv lib包
-Djava.library.path=/home/fang/BigDataSoft/opencv-2.4.13/release/lib
當上傳過多的影象時程式會執行失敗,初步推斷是記憶體溢位,下一步解決問題