使用Spark批量上傳圖片到HBase中並同時使用OpenCV提取SIFT特徵值
阿新 • • 發佈:2019-02-07
最近正在學習利用Spark做影象的分類和檢索實驗,首先需要上傳影象資料(儲存在本地檔案系統中)到HBase中,提取的影象特徵是SIFT,藉助OpenCV庫提取,剛開始是寫一個任務上傳圖片,然後再寫一個任務提取HBase中影象的特徵值,考慮到圖片的序列化和反序列化會耗費大量的時間,且頻繁的磁碟IO對時間消耗也很大,因此,將兩個任務合併成一個任務處理,減少處理時間,同時實驗過程中也遇到不少錯誤。
批量上傳資料並提取SIFT特徵
package com.fang.spark
import java.awt.image.{BufferedImage, DataBufferByte}
import java.io.ByteArrayInputStream
import javax.imageio.ImageIO
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.opencv.core.{Core, CvType, Mat, MatOfKeyPoint}
import org.opencv.features2d.{DescriptorExtractor, FeatureDetector}
/**
* Created by hadoop on 16-11-15.
* 批量上傳資料到HBase表中,並計算sift值
* 表結構為:
* imagesTable:(image:imageBinary,sift)
* RowKey設計為:
*/
object HBaseUpLoadImages {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("HBaseUpLoadImages").
setMaster("local[4]").
set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkContext = new SparkContext(sparkConf)
//TODO 單機測試情況下,圖片檔案太多,程式執行失敗,打出hs_err_pid***_log日誌,具體情況不明
val imagesRDD = sparkContext.binaryFiles("/home/fang/images/train/test")
System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
// val imagesRDD = sparkContext.newAPIHadoopFile[Text, BufferedImage, ImmutableBytesWritable]("/home/fang/images/train/1")
// val columnFaminlys :Array[String] = Array("image")
//createTable(tableName,columnFaminlys,connection)
imagesRDD.foreachPartition {
iter => {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConfig.set("hbase.zookeeper.quorum", "fang-ubuntu,fei-ubuntu,kun-ubuntu")
val connection: Connection = ConnectionFactory.createConnection(hbaseConfig);
val tableName = "imagesTest"
val table: Table = connection.getTable(TableName.valueOf(tableName))
iter.foreach {
imageFile => {
//val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(imageFile._2.toArray()))
//println(bi.getColorModel)
val tempPath = imageFile._1.split("/")
val len = tempPath.length
val imageName = tempPath(len - 1)
//TODO 嘗試直接獲取BufferedImage資料,提升效率
val imageBinary: scala.Array[Byte] = imageFile._2.toArray()
val put: Put = new Put(Bytes.toBytes(imageName))
put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("binary"), imageBinary)
put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("path"), Bytes.toBytes(imageFile._1))
val sift = getImageSift(imageBinary)
if(!sift.isEmpty) {
put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("sift"),sift.get)
}
table.put(put)
}
}
connection.close()
}
}
sparkContext.stop()
}
//獲取影象的sift特徵
def getImageSift(image: Array[Byte]):Option[Array[Byte]] = {
val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(image))
val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8U)
//java.lang.UnsupportedOperationException:
// Provided data element number (188000) should be multiple of the Mat channels count (3)
//更改CvType.CV_8UC3為CvType.CV_8U,解決上面錯誤
// val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8UC3)
val data = bi.getRaster.getDataBuffer.asInstanceOf[DataBufferByte].getData
// if(bi.getColorModel.getNumComponents==3) {
test_mat.put(0, 0, data)
val desc = new Mat
val fd = FeatureDetector.create(FeatureDetector.SIFT)
val mkp = new MatOfKeyPoint
fd.detect(test_mat, mkp)
val de = DescriptorExtractor.create(DescriptorExtractor.SIFT)
//提取sift特徵
de.compute(test_mat, mkp, desc)
//判斷是否有特徵值
if(desc.rows()!=0) {
Some(Utils.serializeMat(desc))
}else{
None
}
}
}
需要序列化提取的Mat型別特徵值
public class Utils{
public static byte[] serializeMat(Mat mat) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
float[] data = new float[(int) mat.total() * mat.channels()];
mat.get(0, 0, data);
ObjectOutput out = new ObjectOutputStream(bos);
out.writeObject(data);
out.close();
// Get the bytes of the serialized object
byte[] buf = bos.toByteArray();
return buf;
} catch (IOException ioe) {
ioe.printStackTrace();
return null;
}
}
}
遇到的錯誤及解決
- java.lang.UnsupportedOperationException:Provided data element number (188000) should be multiple of the Mat channels count (3)
更改CvType.CV_8UC3為CvType.CV_8U,解決上面錯誤 - not serialization
沒有使用foreachPartition出現下面的錯誤
org.apache.spark.SparkException: Task not serializable - UnsatisfiedLinkError: Native method not found: org.opencv.core.Mat.n_Mat:()J
忘記載入或載入失敗(System.loadLibrary(Core.NATIVE_LIBRARY_NAME)) - java.lang.UnsupportedOperationException: Mat data type is not compatible: 0
mat型別資料序列化時沒有判斷mat是否有值
執行的過程中需要載入opencv lib包
-Djava.library.path=/home/fang/BigDataSoft/opencv-2.4.13/release/lib
當上傳過多的影象時程式會執行失敗,初步推斷是記憶體溢位,下一步解決問題