1. 程式人生 > >C# 通過Thrift 1 操作 HBase

C# 通過Thrift 1 操作 HBase

什麼是Thrift?

Thrift是一種RPC(遠端過程呼叫)軟體框架,用來進行可擴充套件且跨語言的服務的開發。它結合了功能強大的軟體堆疊和程式碼生成引擎,以構建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 這些程式語言間無縫結合的、高效的服務。

C#通過Thrift訪問HBase

  • HBase提供了Thrift服務端
  • C#可執行程式可建立Thrift客戶端連線到HBase。

這裡寫圖片描述

準備

下載Thrift

下載HBase安裝包

生成C#的Thrift客戶端

1.建立一個資料夾用於存放所有的材料,本文示例:workspace

2.解壓thrift-0.9.3.tar.gz,複製lib/csharp中找到workspace。根據需要修改.Net framework 3.5到 4.0或以上,並編譯得到Thrift.dll
這裡寫圖片描述

3.建立測試專案HBaseTest,並引用Thrift.dll

4.解壓HBase的安裝包,從安裝包中複製HBase.thrift檔案到workspace

\hbase-1.2.2\hbase-thrift\src\main\resources
\org\apache\hadoop\hbase\thrift\hbase.thrift

注意: HBase有兩個版本的thrift,分別是thriftthrift2。我們這裡講解的是的thrift

5.啟用cmd命令列模式,進入workspace,執行以下命令生成程式碼:

thrift-0.9.3.exe -gen csharp hbase.thrift

執行後會生成gen-csharp目錄,裡面包含了HBase的Thrift客戶端程式碼。
這裡寫圖片描述

6.將生成的程式碼加入到HBaseTest專案中。
這裡寫圖片描述

開發

AbstractHBaseThriftService 抽象服務

這裡,我們實際上是對HBase Thrift客戶端Java API實踐中的Java程式碼進行了翻譯,改寫成C#語言的相關操作。我們在客戶端,進行了一層抽象,更加便於傳遞各種引數,抽象類為AbstractHBaseThriftService,該類實現程式碼如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Thrift.Transport;
using Thrift.Protocol;

namespace HBaseTest
{
    /// <summary>
    /// HBaseThrift客戶端抽象服務
    /// </summary>
    public abstract class AbstractHBaseThriftService
    {
        protected static readonly string CHARSET = "UTF-8";
        private string host = "localhost";
        private int port = 9090;
        private readonly TTransport transport;
        protected readonly Hbase.Client client;

        public AbstractHBaseThriftService() : 
            this("localhost", 9090)
        {

        }

        public AbstractHBaseThriftService(string host, int port)
        {
            this.host = host;
            this.port = port;
            transport = new TSocket(host, port);
            TProtocol protocol = new TBinaryProtocol(transport, true, true);
            client = new Hbase.Client(protocol);
        }

        /// <summary>
        /// 開啟通訊通道
        /// </summary>
        public void Open()
        {
            if (transport != null)
            {
                transport.Open();
            }
        }

        /// <summary>
        /// 關閉通訊通道
        /// </summary>
        public void Close()
        {
            if (transport != null)
            {
                transport.Close();
            }
        }

        /// <summary>
        /// 獲取HBase資料所有使用者表
        /// </summary>
        /// <returns></returns>
        public abstract List<string> GetTables();

        /// <summary>
        /// 更新資料
        /// </summary>
        /// <param name="table"></param>
        /// <param name="rowKey"></param>
        /// <param name="writeToWal"></param>
        /// <param name="fieldName"></param>
        /// <param name="fieldValue"></param>
        /// <param name="attributes"></param>
        public abstract void Update(
            string table, 
            string rowKey, 
            bool writeToWal,
            string fieldName, 
            string fieldValue, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="rowKey"></param>
        /// <param name="writeToWal"></param>
        /// <param name="fieldNameValues"></param>
        /// <param name="attributes"></param>
        public abstract void Update(
            string table,
            string rowKey, 
            bool writeToWal,
            Dictionary<string, string> fieldNameValues, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 刪除表中單元格
        /// </summary>
        /// <param name="table">表名</param>
        /// <param name="rowKey">行健</param>
        /// <param name="writeToWal"></param>
        /// <param name="column">列族</param>
        /// <param name="attributes">屬性</param>
        public abstract void DeleteCell(
            string table,
            string rowKey, 
            bool writeToWal,
            string column, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 刪除表中指定單元格
        /// </summary>
        /// <param name="table">表名</param>
        /// <param name="rowKey">行健</param>
        /// <param name="writeToWal"></param>
        /// <param name="columns">列族</param>
        /// <param name="attributes">屬性</param>
        public abstract void DeleteCells(
            string table, 
            string rowKey, 
            bool writeToWal,
            List<string> columns, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 刪除行
        /// </summary>
        /// <param name="table">表名</param>
        /// <param name="rowKey">行健</param>
        /// <param name="attributes">屬性</param>
        public abstract void DeleteRow(
            string table, 
            string rowKey,
            Dictionary<string, string> attributes);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="startRow"></param>
        /// <param name="columns"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract int ScannerOpen(
            string table, 
            string startRow,
            List<string> columns,
            Dictionary<string, string> attributes);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="startRow"></param>
        /// <param name="stopRow"></param>
        /// <param name="columns"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract int ScannerOpen(
            string table, 
            string startRow, 
            string stopRow, 
            List<string> columns,
            Dictionary<string, string> attributes);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="startAndPrefix"></param>
        /// <param name="columns"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract int ScannerOpenWithPrefix(
            string table, 
            string startAndPrefix,
            List<string> columns, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="startRow"></param>
        /// <param name="columns"></param>
        /// <param name="timestamp"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract int ScannerOpenTs(
            string table, 
            string startRow,
            List<string> columns, 
            long timestamp, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="startRow"></param>
        /// <param name="stopRow"></param>
        /// <param name="columns"></param>
        /// <param name="timestamp"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract int ScannerOpenTs(
            string table, 
            string startRow, 
            string stopRow,
            List<string> columns, 
            long timestamp, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 掃描器獲取行列表
        /// </summary>
        /// <param name="id"></param>
        /// <param name="nbRows"></param>
        /// <returns></returns>
        public abstract List<TRowResult> ScannerGetList(int id, int nbRows);

        /// <summary>
        /// 掃描器獲取行資料
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        public abstract List<TRowResult> ScannerGet(int id);

        /// <summary>
        /// 獲取指定行
        /// </summary>
        /// <param name="table"></param>
        /// <param name="row"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract List<TRowResult> GetRow(
            string table, 
            string row,
            Dictionary<string, string> attributes);

        /// <summary>
        /// 批量獲取列族
        /// </summary>
        /// <param name="table"></param>
        /// <param name="rows"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract List<TRowResult> GetRows(
            string table,
            List<string> rows, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 批量獲取指定列族的行
        /// </summary>
        /// <param name="table"></param>
        /// <param name="rows"></param>
        /// <param name="columns"></param>
        /// <param name="attributes"></param>
        /// <returns></returns>
        public abstract List<TRowResult> GetRowsWithColumns(
            string table,
            List<string> rows, 
            List<string> columns, 
            Dictionary<string, string> attributes);

        /// <summary>
        /// 關閉掃描器
        /// </summary>
        /// <param name="id"></param>
        public abstract void ScannerClose(int id);

        /// <summary>
        /// 迭代結果
        /// </summary>
        /// <param name="result"></param>
        public abstract void IterateResults(TRowResult result);

    }
}

Thrift服務的基本功能

  • 建立到Thrift服務的連線:Open()
  • 獲取到HBase中的所有表名:GetTables()
  • 更新HBase表記錄:Update()
  • 刪除HBase表中一行的記錄的資料(cell):DeleteCell()和DeleCells()
  • 刪除HBase表中一行記錄:deleteRow()
  • 開啟一個Scanner,返回id:ScannerOpen()ScannerOpenWithPrefix()ScannerOpenTs();然後用返回的id迭代記錄:ScannerGetList()ScannerGet()
  • 獲取一行記錄結果:GetRow()GetRows()GetRowsWithColumns()
  • 關閉一個Scanner:ScannerClose()
  • 迭代結果,用於除錯:IterateResults()

HBaseThriftService 抽象服務

根據抽象HBaseThrift抽象服務定義,我們的一個實現如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HBaseTest
{
    /// <summary>
    /// HBaseThrift服務
    /// </summary>
    public class HBaseThriftService : AbstractHBaseThriftService
    {
        public HBaseThriftService() 
            : this("localhost", 9090)
        {

        }

        public HBaseThriftService(string host, int port) 
            : base(host, port)
        {

        }

        /// <inheriated-doc />
        public override List<string> GetTables()
        {
            List<byte[]> tables = client.getTableNames();
            List<String> list = new List<String>();
            foreach (byte[] table in tables)
            {
                list.Add(Decode(table));
            }
            return list;
        }

        /// <inheriated-doc />
        public override void Update(string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            Mutation mutation = new Mutation();
            mutation.IsDelete = false;
            mutation.WriteToWAL = writeToWal;
            mutation.Column = Encode(fieldName);
            mutation.Value = Encode(fieldValue);
            mutations.Add(mutation);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        /// <inheriated-doc />
        public override void Update(string table, string rowKey, bool writeToWal, Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            foreach (KeyValuePair<String, String> pair in fieldNameValues)
            {
                Mutation mutation = new Mutation();
                mutation.IsDelete = false;
                mutation.WriteToWAL = writeToWal;
                mutation.Column = Encode(pair.Key);
                mutation.Value = Encode(pair.Value);
                mutations.Add(mutation);
            }
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        /// <inheriated-doc />
        public override void DeleteCell(string table, string rowKey, bool writeToWal, string column, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            Mutation mutation = new Mutation();
            mutation.IsDelete = true;
            mutation.WriteToWAL = writeToWal;
            mutation.Column = Encode(column);
            mutations.Add(mutation);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        /// <inheriated-doc />
        public override void DeleteCells(string table, string rowKey, bool writeToWal, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            foreach (string column in columns)
            {
                Mutation mutation = new Mutation();
                mutation.IsDelete = true;
                mutation.WriteToWAL = writeToWal;
                mutation.Column = Encode(column);
                mutations.Add(mutation);
            }
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        /// <inheriated-doc />
        public override void DeleteRow(string table, string rowKey, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            client.deleteAllRow(tableName, row, encodedAttributes);
        }

        /// <inheriated-doc />
        public override int ScannerOpen(string table, string startRow, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes);
        }

        /// <inheriated-doc />
        public override int ScannerOpen(string table, string startRow, string stopRow, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            byte[] stop = Encode(stopRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes);
        }

        /// <inheriated-doc />
        public override int ScannerOpenWithPrefix(string table, string startAndPrefix, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] prefix = Encode(startAndPrefix);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes);
        }

        /// <inheriated-doc />
        public override int ScannerOpenTs(string table, string startRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes);
        }

        /// <inheriated-doc />
        public override int ScannerOpenTs(string table, string startRow, string stopRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            byte[] stop = Encode(stopRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes);
        }

        /// <inheriated-doc />
        public override List<TRowResult> ScannerGetList(int id, int nbRows)
        {
            return client.scannerGetList(id, nbRows);
        }

        /// <inheriated-doc />
        public override List<TRowResult> ScannerGet(int id)
        {
            return client.scannerGet(id);
        }

        /// <inheriated-doc />
        public override List<TRowResult> GetRow(string table, string row, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] startRow = Encode(row);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.getRow(tableName, startRow, encodedAttributes);
        }

        /// <inheriated-doc />
        public override List<TRowResult> GetRows(string table, List<string> rows, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            List<byte[]> encodedRows = EncodeStringList(rows);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.getRows(tableName, encodedRows, encodedAttributes);
        }

        /// <inheriated-doc />
        public override List<TRowResult> GetRowsWithColumns(string table, List<string> rows, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            List<byte[]> encodedRows = EncodeStringList(rows);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes);
        }

        /// <inheriated-doc />
        public override void ScannerClose(int id)
        {
            client.scannerClose(id);
        }

        /// <inheriated-doc />
        public override void IterateResults(TRowResult result)
        {
            foreach (KeyValuePair<byte[], TCell> pair in result.Columns)
            {
                Console.WriteLine("\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value));
            }
        }

        /// <inheriated-doc />
        private String Decode(byte[] bs)
        {
            return UTF8Encoding.Default.GetString(bs);
        }

        /// <inheriated-doc />
        private byte[] Encode(String str)
        {
            return UTF8Encoding.Default.GetBytes(str);
        }

        /// <inheriated-doc />
        private Dictionary<byte[], byte[]> EncodeAttributes(Dictionary<String, String> attributes)
        {
            Dictionary<byte[], byte[]> encodedAttributes = new Dictionary<byte[], byte[]>();
            foreach (KeyValuePair<String, String> pair in attributes)
            {
                encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value));
            }
            return encodedAttributes;
        }

        /// <inheriated-doc />
        private List<byte[]> EncodeStringList(List<String> strings)
        {
            List<byte[]> list = new List<byte[]>();
            if (strings != null)
            {
                foreach (String str in strings)
                {
                    list.Add(Encode(str));
                }
            }
            return list;
        }
    }
}

HBaseThrift 測試用例

針對HBase的Thrift介面,我們做了測試用例如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HBaseTest
{
    class Test
    {
        private readonly AbstractHBaseThriftService thriftService;

        public Test(String host, int port)
        {
            thriftService = new HBaseThriftService(host, port);
            thriftService.Open();
        }

        public Test() : this("master", 9090)
        {

        }

        static String RandomlyBirthday()
        {
            Random r = new Random();
            int year = 1900 + r.Next(100);
            int month = 1 + r.Next(12);
            int date = 1 + r.Next(30);
            return year + "-" + month.ToString().PadLeft(2, '0') + "-" + date.ToString().PadLeft(2, '0');
        }

        static String RandomlyGender()
        {
            Random r = new Random();
            int flag = r.Next(2);
            return flag == 0 ? "M" : "F";
        }

        static String RandomlyUserType()
        {
            Random r = new Random();
            int flag = 1 + r.Next(10);
            return flag.ToString();
        }

        public void Clo