1. 程式人生 > >藥品監管系統架構揭祕:海量溯源資料儲存與查詢

藥品監管系統架構揭祕:海量溯源資料儲存與查詢

前言

在剛剛過去的2018年,“毒疫苗”事件再次觸及了大眾的敏感神經,因為十年前的“毒奶粉”事件還歷歷在目。我們急需建立一個全國性的藥品(食品)監控追蹤體系。與此同時,近年來隨著國家對醫藥行業的大力支援,中國的醫療事業也出現了跨越式的發展,大量的新型藥品上市,極大的豐富了患者和消費者的選擇範圍。大量的藥品在市面上流通,產生了大量的狀態資料,且這類資料在爆發式的增長。如何高效的儲存和溯源藥品狀態資料已經成為一個行業難題。傳統方案常常採用比如MySQL資料庫分庫分表的方式,但是這個方案在開發、運維、可擴充套件性都有不少弊端。

業界開始越來越多的使用分散式的NoSQL方案來解決大資料的問題。比如阿里健康基於表格儲存(Tablestore)推出了“碼上放心” 藥品監管碼查詢功能,解決了大眾的藥品查詢需求。這僅僅是第一步,建立一個完善全國性藥品追蹤體系是一個艱鉅而漫長的任務。借用網上的一句話,最終我們要實現藥品的:“來源可查,去向可追,責任可究”。

 

 

圖1 碼上放心 溯源截圖

在整個藥品監管體系中,藥品本身的管理和藥品軌跡溯源是藥品監管體系的兩大核心功能,本篇文章主要是介紹使用表格儲存的Timestream模型快速高效的實現這兩類功能。

核心需求

藥品元資料

藥品的元資料是指藥品在上市之前的在國家藥品監督管理局(CFDA)備案資訊,記錄了藥品名稱、分類、成分、批次、臨床一期、二期、N期測試資料、自研或進口等詳細資訊,多達幾十個欄位。

 

圖2 藥品元資料

使用者會通過頁面或者APP的方式瀏覽和查詢藥品資訊,這需要應用提供多種組合的查詢方式,比如:

  1. 按照藥品名稱查詢
    :比如查詢“阿莫西林”為關鍵字的藥品列表。
  2. 按照生產企業名稱查詢:比如之前的疫苗事件,我們可以查詢生產企業為“長春長生生物科技股份有限公司”的藥品列表。
  3. 按照時間維度,查詢一個時間範圍的資料:比如查詢某個藥企在2017年~2018年生產的抗生素批次。
  4. 按照某個地域或者範圍查詢:比如患者可以通過頁面,搜尋自己附近5公里內特定感冒藥。又比如,我們在面對自然災害時,我們可以使用Geo功能,查詢最近範圍的應急藥品,緊急調往災區。

上面只是列舉的一些典型查詢場景,藥品備案資訊中擁有大量的欄位,使用者會從多個查詢維度查詢資料。因此在保證效能的前提下,提供豐富的查詢功能成為元資料管理的主要技術難點。

狀態資料

藥品的狀態資料是指藥品在生產、流通過程中產生的狀態資料,比如藥品的原材料流通、藥企生產藥品過程中的狀態、運輸過程的軌跡、醫院藥店儲存和使用資料等。

 

圖3 常見狀態資料

藥品流通會產生大量的狀態資料,這些資料需要持續的記錄下來,後續才可以做到真正的藥品溯源。我們先來羅列一下藥品狀態資料:

  1. 藥企的狀態資料:這裡主要指藥品依賴的原材料溯源資訊和生產過程的環境資料。這些資料幫助企業監控藥品生產狀態,幫助藥監局審計藥品生產過程,在溯源過程中,結合元資料資訊,可以讓使用者對藥品有一個更全面的瞭解。
  2. 運輸的軌跡狀態資料:這個主要指藥品的運輸的產生的軌跡、儲存容器高溫低溫異常事件。“軌跡溯源”可以基於這些資料實現。
  3. 藥店、醫院的庫存資料:這個主要指藥品在相關的醫藥機構流轉和庫存資訊等,比如上面的“附近藥品”查詢就可以基於這個資料實現。

從上面的資料來源可知,一盒簡單的藥品在到送到患者手上之前,會有大量的流通環節,每個環節都會產生大量的狀態資料。同時,中國市場藥品的規模在萬億人民幣級別,並且伴隨每年有將近一成的增長,是全球第二大醫藥市場。要滿足如此巨大的規模下的狀態資料的儲存,極高的寫入吞吐、海量儲存規模、可控的儲存成本成為必須要解決的問題。

解決方案

 

 

圖4 MySQL分庫分表 vs Tablestore

從對藥品元資料管理和狀態資料溯源的總結可知,要滿足以上的功能和效能需求,單機已經無法滿足要求,需要使用分散式的方案。一般傳統的方案會採用MySQL分庫分表的方案,但是這個方案在實際生產和運維中面臨不少問題,比如:

  1. 擴容不方便,需要做資料的重新分佈。
  2. 分佈鍵變更很麻煩,分佈鍵需要謹慎選擇。
  3. SQL限制多,功能缺失多,無法充分發揮MySQL自身的優勢。
  4. 傳統的關係模型新增欄位需要極大的成本,嚴重阻礙使用者業務的擴充套件。
  5. 由於單個節點是孤立的節點,需要提供主備來保障資料的可靠性。無法像分散式的NoSQL一樣實現自動的故障恢復,需要一個DBA來及時維護庫的狀態。
  6. 無法提供靈活的多欄位查詢,只能依賴二級索引和全表掃描Fliter實現多維查詢功能,效率相對較低。
  7. 無法做到計算和儲存分離,使用者很難做到計算和儲存均衡匹配,導致資源浪費。
  8. 無法原生支援Geo查詢。

總結來看,從理論上能滿足以上的功能需求,但是要想真正在生產中使用和維護好這套儲存系統,只能說“想愛你並不容易”。在這種大資料的OLTP的場景下,業界一般選用分散式的NoSQL方案。因此我們推薦使用Tablestore一站式的解決以上問題。Tablestore是一款阿里自研的分散式NoSQL服務,提供多元索引支援豐富的查詢需求,支撐超大規模的併發訪問和低延遲的效能,可以很好的解決藥品元資料管理和溯源的需求。

Timestream

Timestream是表格儲存推出的最新資料模型,這個模型針對時序資料、軌跡資料、溯源資料,定義了一套簡單清晰易用的API,細節可以參考《Tablestore Timestream:為海量時序資料儲存設計的全新資料模型》。

在我們列舉的藥品監管場景中,藥品的元資料可以非常簡單的抽象為Timestream的元資料(Meta),狀態資料抽象為Timestream的Data資料。本文作為一個實戰文章,因此使用Timestream模型來快速高效的實現以上兩個功能。

從上面的Timestream介紹文章可知,Timestream擁有幾個核心概念,分別是:Name, Tag, Attribute, Timestamp, Point(Fields)。我們羅列一個表格,展示怎麼將藥品的相關資料對映到Timestream的模型中,如圖所示:

 

 

圖5 模型轉換圖

  • 分類(Name)+識別符號(Tag): 這兩個欄位唯一決定一個藥品資料。
  • 元資料(Attribute): 藥品的相關屬性,當藥品在登記在案時這個資料被持久化儲存。
  • 最新狀態資料(Attribute): 如標題,藥品最新的狀態,比如上面的‘地點’資訊,我們可以建立Geo的索引,使用者地理資訊的查詢。
  • 時間(Timestamp): 狀態資料的發生時間。
  • 軌跡、狀態: 具體的狀態資料,上面只是兩個示例,實際上可以支援非常多的欄位。

接下來我們通過一個可以執行的Demo,向大家展示怎麼使用Timestream API實現元資料管理和溯源功能。

功能實現(Java)

功能列表

寫入

  1. 藥品元資料持久化,將藥品的相關元資料資訊儲存到Tablestore中。
  2. 藥品運輸軌跡持久化,主要是運輸和流轉的軌跡,藥品的實時狀態等,並將Location(位置)作為Geo索引,方便後期的Geo查詢。

查詢

  1. 基本的藥品詳細資訊查詢,主要是根據使用者輸入條件,顯示藥品的元資料。
  2. 藥品的防偽鑑定,結合生產日期,運輸軌跡、銷售狀態和查詢使用者等資料對藥品實行防偽鑑定。
  3. 查詢指定地點範圍內的特定藥品。
  4. 藥品軌跡重放

依賴


Meta表的建立
對於一些固定且有特殊索引需求的欄位,我們在建立Meta表的時候需要單獨指定,比如“生產日期”、地理資訊、狀態資料等。
考慮到後面的擴充套件需求,我們增加一個擴充套件欄位,“extension”,用於儲存未定義的元資料。
以下示例只是給了部分元資料欄位,使用者可以根據自己的需求設定更多的索引欄位。
public void createMetaTable() { List<AttributeIndexSchema> index = new ArrayList<AttributeIndexSchema>(); index.add(new AttributeIndexSchema("produced_date", AttributeIndexSchema.Type.LONG)); index.add(new AttributeIndexSchema("period_of_validity", AttributeIndexSchema.Type.LONG)); index.add(new AttributeIndexSchema("loc", AttributeIndexSchema.Type.GEO_POINT)); index.add(new AttributeIndexSchema("links", AttributeIndexSchema.Type.KEYWORD)); index.add(new AttributeIndexSchema("status", AttributeIndexSchema.Type.KEYWORD)); index.add(new AttributeIndexSchema("extension", AttributeIndexSchema.Type.KEYWORD).setIsArray(true)); db.createMetaTable(index); }
Data表的建立
這個比較簡單,只需要設定表名即可。因為我們是Schema Free的體系,不需要預先指定列,在寫入的時候指定即可。
public void createDataTable() { db.createDataTable(conf.getDataTableName()); }
錄入藥品元資料和狀態資料
元資料匯入,我們將一個本地的csv檔案中的資料匯入到資料庫中
public void importMeta() throws IOException { TimestreamMetaTable metaTable = db.metaTable(); String [] fileHeader = {"分類", "名稱", "監管號", "受理號", "生產日期", "有效日期", "註冊分類", "申請型別", "企業名稱", "任務型別"}; String csvFile = conf.getMetaFile(); CSVFormat format = CSVFormat.DEFAULT.withHeader(fileHeader).withIgnoreHeaderCase().withTrim(); Reader reader = Files.newBufferedReader(Paths.get(csvFile)); CSVParser csvParser = new CSVParser(reader, format); for (CSVRecord r : csvParser.getRecords()) { TimestreamIdentifier identifier = new TimestreamIdentifier.Builder(r.get("分類")) .addTag("名稱", r.get("名稱")) .addTag("監管號", r.get("監管號")) .build(); TimestreamMeta meta = new TimestreamMeta(identifier); meta.addAttribute("produced_date", r.get("生產日期")); meta.addAttribute("period_of_validity", r.get("有效日期")); List<String> extension = new ArrayList(); extension.add("受理號=" + r.get("受理號")); extension.add("註冊分類=" + r.get("註冊分類")); extension.add("申請型別=" + r.get("申請型別")); extension.add("企業名稱=" + r.get("企業名稱")); extension.add("任務型別=" + r.get("任務型別")); meta.addAttribute("extension", new Gson().toJson(extension)); metaTable.put(meta); System.out.println(meta.toString()); } }
狀態資料匯入,這裡loc, links,status在Meta和Data都儲存了一次,Meta表中儲存主要是做後續的索引查詢,Data表中儲存主要是做
public void importData() throws Exception { TimestreamMetaTable metaTable = db.metaTable(); TimestreamDataTable dataTable = db.dataTable(conf.getDataTableName()); String [] fileHeader = {"分類", "名稱", "監管號", "生產日期", "位置", "環節", "狀態"}; String csvFile = conf.getDataFile(); CSVFormat format = CSVFormat.DEFAULT.withHeader(fileHeader).withIgnoreHeaderCase().withTrim(); Reader reader = Files.newBufferedReader(Paths.get(csvFile)); CSVParser csvParser = new CSVParser(reader, format); for (CSVRecord r : csvParser.getRecords()) { TimestreamIdentifier identifier = new TimestreamIdentifier.Builder(r.get("分類")) .addTag("名稱", r.get("名稱")) .addTag("監管號", r.get("監管號")) .build(); TimestreamMeta meta = new TimestreamMeta(identifier); String loc = toLocationString(r.get("位置")); String links = r.get("環節"); String status = r.get("狀態"); meta.addAttribute("loc", loc); meta.addAttribute("links", links); meta.addAttribute("status", status); metaTable.update(meta); Point point = new Point.Builder(this.getTimestamp(r, "生產日期"), TimeUnit.MILLISECONDS) .addField("loc", loc) .addField("links", links) .addField("status", status) .build(); dataTable.asyncWrite(identifier, point); System.out.println(point.toString()); } dataTable.flush(); }

多維度查詢藥品溯源資訊
1. 基本的藥品詳細資訊查詢,主要是根據使用者輸入條件,顯示藥品的元資料。我們這裡根據藥品分類、藥品名稱、生產企業來查詢藥品。
Filter filter = and( Name.equal("中藥"), Tag.equal("名稱", "複方阿膠"),http://Attribute.in("extension", new String[]{"企業名稱=山東****也有限公司"}) ); Iterator<TimestreamMeta> iter = metaTable.filter(filter).fetchAll(); while (iter.hasNext()) { TimestreamMeta m = iter.next(); System.out.println(m); }
2. 藥品的防偽鑑定,結合生產日期,運輸軌跡、銷售狀態和查詢使用者等資料對藥品實行防偽鑑定。我們這裡輸入名稱和藥品監管碼。
Filter filter = and( Name.equal("中藥"), Tag.equal("名稱", "複方阿膠"), Tag.equal("監管號", "8160000000000019") ); Iterator<TimestreamMeta> iter = metaTable.filter(filter).selectAttributes("status").fetchAll(); while (iter.hasNext()) { TimestreamMeta m = iter.next(); System.out.println(m.getAttributeAsString("status")); } // 從查詢的結果來看,藥品處於召回中,有使用風險
3. 查詢指定地點範圍內的特定藥品。比如查詢使用者5KM範圍的“阿莫西林”。
Filter filter = and( Name.equal("化藥"), Tag.prefix("名稱", "阿莫西林"), Attribute.inGeoDistance("loc", "31.6533906593,103.8427768645", 5 * 1000) ); Iterator<TimestreamMeta> iter = metaTable.filter(filter).fetchAll(); while (iter.hasNext()) { TimestreamMeta m = iter.next(); System.out.println(m); }
4. 藥品軌跡重放,遍歷指定藥品的一個軌跡溯源資訊。
TimestreamIdentifier identifier = new TimestreamIdentifier.Builder("化藥") .addTag("名稱", "阿莫西林") .addTag("監管號", "8150000000000000") .build(); Iterator<Point> iter = dataTable.get(identifier).select("loc").fetchAll(); while (iter.hasNext()) { Point p = iter.next(); System.out.println(p); }

 

原文連結

本文為雲棲社群原創內容,未經