1. 程式人生 > >deploy, bulkLoader data, and query for titan 1.0 with cassandra cluster story

deploy, bulkLoader data, and query for titan 1.0 with cassandra cluster story

搞了大約有半個月的titan, 只為了載入2億點(4中型別 , 10個屬性) , 10億邊(3中型別)的圖資料,期間遇到了不少的坑,最終配置了cassandra叢集,成功匯入資料到titan1.0,並查詢query

1 部署cassandra 叢集

如果你的資料量不是很大,例如1000w點,2000w邊,那麼估計單擊匯入就可以,因此也不需要配置cassandra叢集, 因為titan自己嵌入了cassandra, 直接 ./bin/titan.sh start 就可以啟動了,預設的資料儲存位置就在 db/data/ 下面

2 部署titan1.0

直接官網下載titan1.0版本,解壓就能用 ,唯一需要的就是配置 jdk1.8版本 ,配置步驟如下:

$ sudo apt-get install software-properties-common

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update

$ sudo apt-get install oracle-java8-installer

之後 設定環境變數 export JAVA_HOME=/usr/lib/jvm/java-8-oracle

3 準備點和邊的資料

我的資料是有databene-benerator生成的,格式如下

point_format: 逗號分割依次為 id,label,property1~property10

1,BankCard,Mf2Ei3Cm9El8Cz0Vt6Iq5Od1Uk3Hk4,Fu6Ph3Ij2Fq6Uh1,La5Rd7Kv4Ze7Xj8Gg2Cv0Cq,Nh1Ma4Di5Ig2Mc0Zn3Xs8Af0Do0Jp0,2054,594,Ir5Gw3Hf7Ty4Tc3Gi5Ki4Pn1Ze7Rn2,1,17675474023,225169216

edge_format:(adj形式)中間有個空格,前面為source,label, 後面為到達的dist,edge_label.

1,BankCard 13:recommend,14:terminal,37:recommend

意思是id為1的點(型別是BankCard,如果和點檔案中點的型別不一致,則是兩個點),到達id為13 14 37號點,有向邊的型別分別得後面的英文.

4 點 和 邊 資料解析檔案(groovy檔案)

點檔案解析 檔名:point.groovy
def parse(line, factory) {
    def (id, vertextype, orderno, status, apptime, status2, hstyovertime, curtovertime, identify, vlabel, phonenum, bankcardnum) = line.split(/,/).toList()
    def v1 = factory.vertex(id, vertextype)
    
    if (orderno != null) { v1.property("orderNo", orderno) }
    if (status != null) { v1.property("status", status) }
    if (apptime != null) { v1.property("appTime", apptime) }
    if (status2 != null) { v1.property("status2", status2) }
    if (hstyovertime != null) { v1.property("historyOverdueTime", Integer.valueOf(hstyovertime)) }
    if (curtovertime != null) { v1.property("currentOverdueTime", Integer.valueOf(curtovertime)) }
    if (identify != null) { v1.property("Identification", identify) }
    if (vlabel != null) { v1.property("vlabel", vlabel) }
    if (phonenum != null) { v1.property("phoneNum", phonenum) }
    if (bankcardnum != null) { v1.property("bankCardNum", bankcardnum) }
    v1.property("bulkLoader.vertex.id",id)
    return v1

}

邊檔案解析 檔名:edge.groovy
def parse(line, factory) {

    def parts = line.split(/ /)

    def (src, vertextype) = parts[0].split(/,/).toList()
    def v1 = factory.vertex(src, vertextype)

    if (parts.length == 2) {
        parts[1].split(/,/).grep {!it.isEmpty() }.each {
            def (dist, edgetype) = it.split(/:/).toList()
            def v2 = factory.vertex(dist)
            def edge = factory.edge(v1, v2, edgetype)
        }
    }
    return v1
}

5 使用bulkLoader 批量匯入檔案: 檔名 bulkLoader.groovy

path = "/home/ubuntu/titan/data"
graph = TitanFactory.open("conf/titan-cassandra.properties")

mgmt = graph.openManagement()

// define the vertex label
if (!mgmt.containsVertexLabel("ApplyInfo")) {
   ApplyInfo = mgmt.makeVertexLabel('ApplyInfo').make()
} else {
   ApplyInfo = mgmt.getVertexLabel("ApplyInfo")
}

if (!mgmt.containsVertexLabel("Terminal")) {
   Terminal = mgmt.makeVertexLabel('Terminal').make()
} else {
  Terminal = mgmt.getVertexLabel("Terminal")
}

if (!mgmt.containsVertexLabel("BankCard")) {
   BankCard = mgmt.makeVertexLabel('BankCard').make()
} else {
   BankCard = mgmt.getVertexLabel("BankCard")
}

if (!mgmt.containsVertexLabel("Mobile")) {
   Mobile = mgmt.makeVertexLabel('Mobile').make()
} else {
   Mobile = mgmt.getVertexLabel("Mobile")
}

// define the edge label
if (!mgmt.containsEdgeLabel("terminal")) {
    terminal = mgmt.makeEdgeLabel('terminal').multiplicity(MULTI).make()
} else {
    terminal = mgmt.getEdgeLabel("terminal")
}

if (!mgmt.containsEdgeLabel("bankcard")) {
    bankcard = mgmt.makeEdgeLabel('bankcard').multiplicity(MULTI).make()
} else {
    bankcard = mgmt.getEdgeLabel("bankcard")
}

if (!mgmt.containsEdgeLabel("recommend")) {
    recommend = mgmt.makeEdgeLabel('recommend').multiplicity(MULTI).make()
} else {
    recommend = mgmt.getEdgeLabel("recommend")
}

// define the vertex property

// orderNo
if (!mgmt.containsPropertyKey("orderNo")) {
    orderNo = mgmt.makePropertyKey('orderNo').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    orderNo = mgmt.getPropertyKey("orderNo")
}

// status
if (!mgmt.containsPropertyKey("status")) {
    status = mgmt.makePropertyKey('status').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    status = mgmt.getPropertyKey("status")
}

// appTime
if (!mgmt.containsPropertyKey("appTime")) {
    appTime = mgmt.makePropertyKey('appTime').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    appTime = mgmt.getPropertyKey("appTime")
}

// status2
if (!mgmt.containsPropertyKey("status2")) {
    status2 = mgmt.makePropertyKey('status2').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    status2 = mgmt.getPropertyKey("status2")
}

// historyOverdueTime
if (!mgmt.containsPropertyKey("historyOverdueTime")) {
    historyOverdueTime = mgmt.makePropertyKey('historyOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
    historyOverdueTime = mgmt.getPropertyKey("historyOverdueTime")
}

// currentOverdueTime
if (!mgmt.containsPropertyKey("currentOverdueTime")) {
    currentOverdueTime = mgmt.makePropertyKey('currentOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
    currentOverdueTime = mgmt.getPropertyKey("currentOverdueTime")
}

// Identification
if (!mgmt.containsPropertyKey("Identification")) {
    Identification = mgmt.makePropertyKey('Identification').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    Identification = mgmt.getPropertyKey("Identification")
}

// label
if (!mgmt.containsPropertyKey("vlabel")) {
    vlabel = mgmt.makePropertyKey('vlabel').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
    vlabel = mgmt.getPropertyKey("vlabel")
}

// phoneNum
if (!mgmt.containsPropertyKey("phoneNum")) {
    phoneNum = mgmt.makePropertyKey('phoneNum').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
} else {
    phoneNum = mgmt.getPropertyKey("phoneNum")
}

// bankCardNum
if (!mgmt.containsPropertyKey("bankCardNum")) {
    bankCardNum = mgmt.makePropertyKey('bankCardNum').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    bankCardNum = mgmt.getPropertyKey("bankCardNum")
}

// bulkLoader.vertex.id
if (!mgmt.containsPropertyKey("bulkLoader.vertex.id")) {
    blid = mgmt.makePropertyKey("bulkLoader.vertex.id").dataType(Integer.class).make()
} else {
    blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
}

// build index
byPhoneComposite = mgmt.getGraphIndex("byPhoneComposite")
byOrderComposite = mgmt.getGraphIndex("byOrderComposite")
byBulkLoaderVertexId = mgmt.getGraphIndex("byBulkLoaderVertexId")

if (byPhoneComposite == null) {
    byPhoneComposite = mgmt.buildIndex("byPhoneComposite", Vertex.class).addKey(phoneNum).buildCompositeIndex()
}

if (byOrderComposite == null) {
    byOrderComposite = mgmt.buildIndex("byOrderComposite", Vertex.class).addKey(orderNo).buildCompositeIndex()

}

if (byBulkLoaderVertexId == null) {
    byBulkLoaderVertexId = mgmt.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
}
mgmt.commit()

graph.close()

// load vfile
graph = GraphFactory.open('conf/hadoop-graph/hadoop-script.properties')


hdfs.copyFromLocal("${path}/point.v", "point.v")
hdfs.copyFromLocal("${path}/point.groovy", "point.groovy")

graph.configuration.setInputLocation("point.v")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "point.groovy")

blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()

// load efile

        hdfs.copyFromLocal("${path}/add100_e.txt", "edge.e")
hdfs.copyFromLocal("${path}/edge.groovy", "edge.groovy")

graph.configuration.setInputLocation("edge.e")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "edge.groovy")


blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()

6 執行 titan

./bin/gremlin.sh ./bulkLoader.groovy 

7 如果資料量很大,就需要配置一些引數

schema.default = none

storage.batch-loading = true
ids.block-size = 200000

ids.authority.wait = 1000

spark.master=local[4]