deploy, bulkLoader data, and query for titan 1.0 with cassandra cluster story
搞了大約有半個月的titan, 只為了載入2億點(4中型別 , 10個屬性) , 10億邊(3中型別)的圖資料,期間遇到了不少的坑,最終配置了cassandra叢集,成功匯入資料到titan1.0,並查詢query
1 部署cassandra 叢集
如果你的資料量不是很大,例如1000w點,2000w邊,那麼估計單擊匯入就可以,因此也不需要配置cassandra叢集, 因為titan自己嵌入了cassandra, 直接 ./bin/titan.sh start 就可以啟動了,預設的資料儲存位置就在 db/data/ 下面
2 部署titan1.0
直接官網下載titan1.0版本,解壓就能用 ,唯一需要的就是配置 jdk1.8版本 ,配置步驟如下:
$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
之後 設定環境變數 export JAVA_HOME=/usr/lib/jvm/java-8-oracle
3 準備點和邊的資料
我的資料是有databene-benerator生成的,格式如下
point_format: 逗號分割依次為 id,label,property1~property10
1,BankCard,Mf2Ei3Cm9El8Cz0Vt6Iq5Od1Uk3Hk4,Fu6Ph3Ij2Fq6Uh1,La5Rd7Kv4Ze7Xj8Gg2Cv0Cq,Nh1Ma4Di5Ig2Mc0Zn3Xs8Af0Do0Jp0,2054,594,Ir5Gw3Hf7Ty4Tc3Gi5Ki4Pn1Ze7Rn2,1,17675474023,225169216
edge_format:(adj形式)中間有個空格,前面為source,label, 後面為到達的dist,edge_label.
1,BankCard 13:recommend,14:terminal,37:recommend
意思是id為1的點(型別是BankCard,如果和點檔案中點的型別不一致,則是兩個點),到達id為13 14 37號點,有向邊的型別分別得後面的英文.
4 點 和 邊 資料解析檔案(groovy檔案)
點檔案解析 檔名:point.groovy
def parse(line, factory) {def (id, vertextype, orderno, status, apptime, status2, hstyovertime, curtovertime, identify, vlabel, phonenum, bankcardnum) = line.split(/,/).toList()
def v1 = factory.vertex(id, vertextype)
if (orderno != null) { v1.property("orderNo", orderno) }
if (status != null) { v1.property("status", status) }
if (apptime != null) { v1.property("appTime", apptime) }
if (status2 != null) { v1.property("status2", status2) }
if (hstyovertime != null) { v1.property("historyOverdueTime", Integer.valueOf(hstyovertime)) }
if (curtovertime != null) { v1.property("currentOverdueTime", Integer.valueOf(curtovertime)) }
if (identify != null) { v1.property("Identification", identify) }
if (vlabel != null) { v1.property("vlabel", vlabel) }
if (phonenum != null) { v1.property("phoneNum", phonenum) }
if (bankcardnum != null) { v1.property("bankCardNum", bankcardnum) }
v1.property("bulkLoader.vertex.id",id)
return v1
}
邊檔案解析 檔名:edge.groovy
def parse(line, factory) {def parts = line.split(/ /)
def (src, vertextype) = parts[0].split(/,/).toList()def v1 = factory.vertex(src, vertextype)
if (parts.length == 2) {
parts[1].split(/,/).grep {!it.isEmpty() }.each {
def (dist, edgetype) = it.split(/:/).toList()
def v2 = factory.vertex(dist)
def edge = factory.edge(v1, v2, edgetype)
}
}
return v1
}
5 使用bulkLoader 批量匯入檔案: 檔名 bulkLoader.groovy
path = "/home/ubuntu/titan/data"
graph = TitanFactory.open("conf/titan-cassandra.properties")
mgmt = graph.openManagement()
// define the vertex label
if (!mgmt.containsVertexLabel("ApplyInfo")) {
ApplyInfo = mgmt.makeVertexLabel('ApplyInfo').make()
} else {
ApplyInfo = mgmt.getVertexLabel("ApplyInfo")
}
if (!mgmt.containsVertexLabel("Terminal")) {
Terminal = mgmt.makeVertexLabel('Terminal').make()
} else {
Terminal = mgmt.getVertexLabel("Terminal")
}
if (!mgmt.containsVertexLabel("BankCard")) {
BankCard = mgmt.makeVertexLabel('BankCard').make()
} else {
BankCard = mgmt.getVertexLabel("BankCard")
}
if (!mgmt.containsVertexLabel("Mobile")) {
Mobile = mgmt.makeVertexLabel('Mobile').make()
} else {
Mobile = mgmt.getVertexLabel("Mobile")
}
// define the edge label
if (!mgmt.containsEdgeLabel("terminal")) {
terminal = mgmt.makeEdgeLabel('terminal').multiplicity(MULTI).make()
} else {
terminal = mgmt.getEdgeLabel("terminal")
}
if (!mgmt.containsEdgeLabel("bankcard")) {
bankcard = mgmt.makeEdgeLabel('bankcard').multiplicity(MULTI).make()
} else {
bankcard = mgmt.getEdgeLabel("bankcard")
}
if (!mgmt.containsEdgeLabel("recommend")) {
recommend = mgmt.makeEdgeLabel('recommend').multiplicity(MULTI).make()
} else {
recommend = mgmt.getEdgeLabel("recommend")
}
// define the vertex property
// orderNo
if (!mgmt.containsPropertyKey("orderNo")) {
orderNo = mgmt.makePropertyKey('orderNo').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
orderNo = mgmt.getPropertyKey("orderNo")
}
// status
if (!mgmt.containsPropertyKey("status")) {
status = mgmt.makePropertyKey('status').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
status = mgmt.getPropertyKey("status")
}
// appTime
if (!mgmt.containsPropertyKey("appTime")) {
appTime = mgmt.makePropertyKey('appTime').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
appTime = mgmt.getPropertyKey("appTime")
}
// status2
if (!mgmt.containsPropertyKey("status2")) {
status2 = mgmt.makePropertyKey('status2').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
status2 = mgmt.getPropertyKey("status2")
}
// historyOverdueTime
if (!mgmt.containsPropertyKey("historyOverdueTime")) {
historyOverdueTime = mgmt.makePropertyKey('historyOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
historyOverdueTime = mgmt.getPropertyKey("historyOverdueTime")
}
// currentOverdueTime
if (!mgmt.containsPropertyKey("currentOverdueTime")) {
currentOverdueTime = mgmt.makePropertyKey('currentOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
currentOverdueTime = mgmt.getPropertyKey("currentOverdueTime")
}
// Identification
if (!mgmt.containsPropertyKey("Identification")) {
Identification = mgmt.makePropertyKey('Identification').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
Identification = mgmt.getPropertyKey("Identification")
}
// label
if (!mgmt.containsPropertyKey("vlabel")) {
vlabel = mgmt.makePropertyKey('vlabel').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
vlabel = mgmt.getPropertyKey("vlabel")
}
// phoneNum
if (!mgmt.containsPropertyKey("phoneNum")) {
phoneNum = mgmt.makePropertyKey('phoneNum').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
} else {
phoneNum = mgmt.getPropertyKey("phoneNum")
}
// bankCardNum
if (!mgmt.containsPropertyKey("bankCardNum")) {
bankCardNum = mgmt.makePropertyKey('bankCardNum').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
bankCardNum = mgmt.getPropertyKey("bankCardNum")
}
// bulkLoader.vertex.id
if (!mgmt.containsPropertyKey("bulkLoader.vertex.id")) {
blid = mgmt.makePropertyKey("bulkLoader.vertex.id").dataType(Integer.class).make()
} else {
blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
}
// build index
byPhoneComposite = mgmt.getGraphIndex("byPhoneComposite")
byOrderComposite = mgmt.getGraphIndex("byOrderComposite")
byBulkLoaderVertexId = mgmt.getGraphIndex("byBulkLoaderVertexId")
if (byPhoneComposite == null) {
byPhoneComposite = mgmt.buildIndex("byPhoneComposite", Vertex.class).addKey(phoneNum).buildCompositeIndex()
}
if (byOrderComposite == null) {
byOrderComposite = mgmt.buildIndex("byOrderComposite", Vertex.class).addKey(orderNo).buildCompositeIndex()
}
if (byBulkLoaderVertexId == null) {byBulkLoaderVertexId = mgmt.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
}
mgmt.commit()
graph.close()
// load vfilegraph = GraphFactory.open('conf/hadoop-graph/hadoop-script.properties')
hdfs.copyFromLocal("${path}/point.v", "point.v")
hdfs.copyFromLocal("${path}/point.groovy", "point.groovy")
graph.configuration.setInputLocation("point.v")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "point.groovy")
blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()
// load efile
hdfs.copyFromLocal("${path}/add100_e.txt", "edge.e")
hdfs.copyFromLocal("${path}/edge.groovy", "edge.groovy")
graph.configuration.setInputLocation("edge.e")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "edge.groovy")
blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()
6 執行 titan
./bin/gremlin.sh ./bulkLoader.groovy
7 如果資料量很大,就需要配置一些引數
schema.default = nonestorage.batch-loading = true
ids.block-size = 200000
ids.authority.wait = 1000
spark.master=local[4]