1. 程式人生 > >關於Neo4j和Cypher批量更新和批量插入優化的5個建議

關於Neo4j和Cypher批量更新和批量插入優化的5個建議

原文連結: http://jexp.de/blog/2017/03/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher
注:我在測試後,對原文中的部分Cypher語句進行修改,使得其符合語法規則

當通過程式向圖形化資料庫中寫入大量資料的時候,你會希望它能夠高效的處理。

低效的方式

下面這些方式不是十分有效:
- 將值直接寫入到語句中,而不是通過引數的方式
- 每一個更新都通過一個Transaction傳送一個請求
- 通過一個Transaction傳送大量的單個請求
- 生成一個巨大複雜的語句(幾百行),然後通過一個Transaction進行提交
- 在一個Transaction中,傳送一個巨大的請求,會導致OOM錯誤

正確的方式

你需要構造儘可能小的請求,並且語句格式固定(這樣可以利用快取),然後通過引數方式進行使用。

每一個請求可以只修改一個屬性,或者修改整個子圖(上百個節點),但是它的語句結構必須是一致的,否則就不能使用快取。

UNWIND – 救星

為了實現這個目標,你只需要在你單次請求的前面加上一個UNWIND語句。UNWIND會將大量的資料(高達10k或者50k條)分散成一行一行的,每一行都會包含每一次更新所需要的全部資訊。

你新增一個{batch}引數,並且將它的值設定成一個Map列表,其中可以包含你的資料(10k或者50k條)。這些資料會被打包成一個完整的請求,並且符合語法結構,還用上了快取(因為其結構一致)。

語法結構

輸入:

{batch: [{row1},{row2},{row3},...10k]}

語句:

UNWIND {batch} as row

// 基於每一行的Map資料,編寫更新語句

示例

下面是一些示例

建立節點並寫入屬性

資料:

{batch: [{name:"Alice",age:32},{name:"Bob",age:42}]}

語句:

UNWIND {batch} as row
CREATE (n:Label)
SET n.name = row.name, n.age = row.age

Merge節點並寫入屬性

資料:

{batch: [{id:"
[email protected]
",properties:{name:"Alice",age:32}},{id:"[email protected]",properties:{name:"Bob",age:42}}]}

語句:

UNWIND {batch} as row
MERGE (n:Label {id:row.id})
(ON CREATE) SET n.name = row.properties.name, n.age = row.properties.age

尋找節點,建立/Merge關係,並寫入屬性

資料:

{batch: [{from:"[email protected]",to:"[email protected]",properties:{since:2012}},{from:"[email protected]",to:"[email protected]",properties:{since:2016}}]}

語句:

UNWIND {batch} as row
MATCH (from:Label {from:row.from})
MATCH (to:Label {to:row.to})
CREATE/MERGE (from)-[rel:KNOWS]->(to)
(ON CREATE) SET rel.since = row.properties.since

通過id或者id列表找節點

對於多叉樹很好用

在這裡我們只傳入了一個單獨的屬性created。實際上你可以不傳入任何屬性,或者傳入一個map的屬性來進行更新。

資料:

{batch: [{from:123,to:[44,12,128],created:"2016-01-13"}, {from:34,to:[23,35,2983],created:"2016-01-15"},...]}

語句:

UNWIND {batch} as row
MATCH (from) WHERE id(from) = row.from
MATCH (to) WHERE id(from) IN row.to // list of ids
CREATE/MERGE (from)-[rel:FOO]->(to)
SET rel.created = row.created

更快更高效

下面是一些更多的技巧。

你可以傳入一個Map,其中的key是節點id或者關係id。這樣以來,通過id查詢會變得更高效。

通過id更新已有的節點

資料:

{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}

語句:

WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH (n) WHERE id(n) IN ids

// 單個屬性更新
SET n.count = data[toString(id(n))]

通過id更新已有的關係

資料:

{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}
  • 1

語句:

WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH ()-[rel]->() WHERE id(rel) IN ids
SET rel.foo = data[toString(id(rel))]

有條件的建立資料

有些時候,你希望根據輸入動態的建立資料。但是Cypher目前沒有諸如WHEN或者IF的條件語句,CASE WHEN也只是一個表示式,因此,你必須使用一個我多年前想出來的技巧。

Cypher提供FOREACH語句,用來遍歷列表中的每一個元素並分別執行更新操作。於是,一個包含0個元素或者1個元素的列表則可以看成一個條件表示式。因為當0個元素的時候,就不會執行遍歷,而當1個元素的時候,就只執行一次遍歷。

大致思路如下:

...
FOREACH (_ IN CASE WHEN predicate THEN [true] ELSE [] END |
... update operations ....
)
  •  

其中,列表中的true值可以是其他任何值,42,"",null等等。只要它是一個值,那麼我們就可以得到一個非空的列表。

相似的,你也可以使用RANGE(1, CASE WHEN predicate THEN 1 ELSE 0 END)。當predicate的值為false的時候,就會範圍一個空列表。或者,如果你喜歡使用filter,那麼也可以通過filter(_ IN [1] WHERE predicate)來構造。

下面是一個完整的示例:

LOAD CSV FROM {url} AS row
MATCH (o:Organization {name:row.org})
FOREACH (_ IN case when row.type = 'Person' then [1] else [] end|
   MERGE (p:Person {name:row.name})
   CREATE (p)-[:WORKS_FOR]->(o)
)
FOREACH (_ IN case when row.type = 'Agency' then [1] else [] end|
   MERGE (a:Agency {name:row.name})
   CREATE (a)-[:WORKS_FOR]->(o)
)

需要注意的是,在FOREACH內部建立的變數無法在外部訪問。你需要再重新查詢一次,或者你需要再FOREACH內完成全部更新操作。

使用APOC庫

APOC庫提供了很多有用的方法供你使用。在這裡,我推薦下面3個方法:

  • 建立節點和關係,並且可以動態設定標籤和屬性
  • 批量提交和更新
  • 動態建立或者操作Map,並賦給屬性

動態建立節點和關係

通過apoc.create.nodeapoc.create.relationship你可以動態的計算節點標籤,關係型別和任意的屬性。

  • 標籤是一個String陣列
  • 屬性就是一個Map
UWNIND {batch} as row
CALL apoc.create.node(row.labels, row.properties) yield node
RETURN count(*)

apoc.create.*方法中,也提供了設定/更新/刪除屬性和標籤的功能。

UWNIND {batch} as row
MATCH (from) WHERE id(n) = row.from
MATCH (to:Label) where to.key = row.to
CALL apoc.create.relationship(from, row.type, row.properties, to) yield rel
RETURN count(*)

批量提交

在一開始j就提到了,大量的提交Transaction是有問題的。你可以用2G-4G的heap來更新百萬條記錄,但當量級更大了之後就會很困難了。在使用32G的heap下,我最大的Transaction可以達到10M的節點。

這時,apoc.periodic.iterate可以提供很大的幫助。

它的原理很簡單:你有兩個Cypher語句,第一條語句能夠提供可操縱的資料併產生巨大的資料流,第二條語句執行真正的更新操作,它對每一個數據都進行一次更新操作,但是它只在處理一定數量的資料後才建立一個新的Transaction。

打個比方,假如你第一條語句返回了五百萬個需要更新的節點,如果使用內部語句的話,那麼每一個節點都會進行一次更新操作。但是如果你設定批處理大小為10k的話,那麼每一個Transaction會批量更新10k的節點。

如果你的更新操作是相互獨立的話(建立節點,更新屬性或者更新獨立的子圖),那麼你可以新增parallel:true來充分利用cpu。

比方說,你想計算多個物品的評分,並通過批處理的方式來更新屬性,你應該按下面這樣操作

call apoc.periodic.iterate('
MATCH (n:User)-[r1:LIKES]->(thing)<-[r2:RATED]-(m:User) WHERE id(n)<id(m) RETURN thing, avg( r1.rating + r2.rating ) as score
','
WITH {thing} as t SET t.score = {score}
', {batchSize:10000, parallel:true})

動態建立/更新Map

儘管Cypher為列表提供了相當遍歷的操作,如range, collect, unwind, reduce, extract, filter, size等,但Map在有的時候也是需要進行建立和更改的。

apoc.map.*提供了一系列的方法來簡化這個過程。

通過其他資料建立Map:

RETURN apoc.map.fromPairs([["alice",38],["bob",42],...​])
// {alice:38, bob: 42, ...}

RETURN apoc.map.fromLists(["alice","bob",...],[38,42])
// {alice:38, bob: 42, ...}

// groups nodes, relationships, maps by key, good for quick lookups by that key
RETURN apoc.map.groupBy([{name:"alice",gender:"female"},{name:"bob",gender:"male"}],"gender")
// {female:{name:"alice",gender:"female"}, male:{name:"bob",gender:"male"}}

RETURN apoc.map.groupByMulti([{name:"alice",gender:"female"},{name:"bob",gender:"male"},{name:"Jane",gender:"female"}],"gender")
// {female:[{name:"alice",gender:"female"},{name:"jane",gender:"female"}], male:[{name:"bob",gender:"male"}]}

更新Map:

RETURN apoc.map.merge({alice: 38},{bob:42})
// {alice:38, bob: 42}

RETURN apoc.map.setKey({alice:38},"bob",42)
// {alice:38, bob: 42}

RETURN apoc.map.removeKey({alice:38, bob: 42},"alice")
// {bob: 42}

RETURN apoc.map.removeKey({alice:38, bob: 42},["alice","bob","charlie"])
// {}

// remove the given keys and values, good for data from load-csv/json/jdbc/xml
RETURN apoc.map.clean({name: "Alice", ssn:2324434, age:"n/a", location:""},["ssn"],["n/a",""])
// {name:"Alice"}

結論

通過上面這些方式,我能夠快速的執行更新操作。當然,你也可以組合這些方法,來實現更復雜的操作。