1. 程式人生 > >Spring Boot 中直接操作 hbase 修改賬戶餘額,實現行級鎖(類似於版本號控制)

Spring Boot 中直接操作 hbase 修改賬戶餘額,實現行級鎖(類似於版本號控制)

應用場景

近期開發中遇到 直接修改hbase資料 ,用Phoenix 查詢出來的資料  型別不一致的 問題。

因修改的是使用者的賬戶餘額,涉及到錢的問題都不是小問題。初次想法使用tephra事務,但官網說目前還是 Beta版本的,感興趣的可以研究研究。

所以考慮直接操作hbase資料庫,但是如果用Phoenix查詢的話 型別會不一致,

比如 :Phoenix 中的int型的 1 ,在hbase中是1'  。導致讀取出來的資料不一致。

解決方案 

框架 :maven + Spring Boot + Mybatis + Phoenix + hbase

軟體環境:eclipse + JDK8

直接操作hbase ,用Phoenix查詢的時候 需要轉型 ,程式碼如下:

1、pom檔案 引入依賴

<!--      引入自己專案中所需要的Spring Boot 相關依賴      -->

                        <dependency>
   <groupId>org.springframework.data</groupId>
   <artifactId>spring-data-hadoop</artifactId>
   <version>2.5.0.RELEASE</version>
</dependency>

                        <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.13.1-HBase-1.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>

</dependency>

2、hbase-dev.properties 開發環境中 新增 hbase 配置

#HBase
spring.data.hbase.zkQuorum=192.168.110.97:2181,192.168.110.98:2181,192.168.110.99:2181
spring.data.hbase.zkBasePath=/hbase
spring.data.hbase.rootDir=file:///opt/hbase/hbase-1.3.2

3、建立 HbaseProperties檔案 對應 配置檔案配置,程式碼如下:

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "spring.data.hbase")
@Getter
@Setter
public class HbaseProperties {
    // Addresses of all registered ZK servers.
    private String zkQuorum;


    // Location of HBase home directory
    private String rootDir;


    // Root node of this cluster in ZK.
    private String zkBasePath;


}

4、建立HbaseConfig檔案 宣告一個 bean ,程式碼如下:

import lombok.Getter;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.hadoop.hbase.HbaseTemplate;


@Configuration
@PropertySource("classpath:hbase-${spring.profiles.active}.properties")
@EnableConfigurationProperties(HbaseProperties.class)
@Getter
public class HbaseConfig {

@Autowired
    private HbaseProperties hbaseProperties;

    @Bean(name = "hbaseTemplate")
    public HbaseTemplate hbaseTemplate() {
        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", this.hbaseProperties.getZkQuorum());
        configuration.set("hbase.rootdir", this.hbaseProperties.getRootDir());
        configuration.set("zookeeper.znode.parent", this.hbaseProperties.getZkBasePath());
        return new HbaseTemplate(configuration);
    }

}

5、建立 BalancePayService檔案應用 ,程式碼如下:

import java.io.IOException;
import java.util.Date;

import javax.annotation.Resource;

import jline.internal.Log;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import cn.harvetech.giant.trip.common.bean.po.Organization;
import cn.harvetech.giant.trip.common.bean.po.TripOrder;
import cn.harvetech.giant.trip.common.bean.po.TripOrderRefund;
import cn.harvetech.giant.trip.common.bean.vo.Result;
import cn.harvetech.giant.trip.common.enums.OrderStatusEnum;
import cn.harvetech.giant.trip.common.mapper.OrganizationMapper;
import cn.harvetech.giant.trip.common.service.TradeService;
import cn.harvetech.giant.trip.common.service.TripOrderRefundService;


/**
 * 餘額支付 和退款
 */
@Service
public class BalancePayService {

@Resource
private OrganizationService organizationService;
@Resource
private TripOrderService tripOrderService;
@Resource
private TradeService tradeService;
    @Resource
    private TripOrderRefundService tripOrderRefundService;
@Resource

private OrganizationMapper organizationMapper;

@Resource(name = "hbaseTemplate")
    private HbaseTemplate hbaseTemplate;

private final static String TRIP_ORGANIZATION = "trip_organization";
private final static String family = "prop";
private final static String qualifier = "balance";



/**
* 餘額支付
* @param orderId
* @return 
* 時間: 2018年4月17日 上午10:36:03
* 描述: 採用 hbase 原生的 checkAndPut實現行級鎖
*/
@Transactional
public Result<String> toPay(String orderId){
String tradeStatus = "GK_PAYING";//預設集客餘額支付中
TripOrder order = tripOrderService.selectByPrimaryKey(orderId);

if(order.getOrderStatus() >= OrderStatusEnum.ORDERSTATUS_200.getCode() 
&& order.getOrderStatus() <= OrderStatusEnum.ORDERSTATUS_801.getCode() ){
return Result.fail("訂單已支付");
}

Organization organization = organizationService.selectByRowKey(order.getOrganizationId());
Long balance = organization.getBalance();//當前賬戶餘額
Long payAmount = order.getPtPayAmount();
Long restBalance = balance.longValue() - payAmount.longValue();

if(restBalance.longValue() < 0){
return Result.fail("餘額不足,請選擇其他支付方式");
}

boolean result = false;
try {
result = changeBalance(order.getOrganizationId(), balance, restBalance, 1, payAmount);
} catch (Exception e) {
tradeStatus = "GK_PAY_FAIL";
Log.error(e.getMessage());
}

if(result == true){// 支付成功 == 修改資料成功
tradeStatus = "GK_PAY_SUCCESS";
}else if(result == false){
tradeStatus = "GK_PAY_FAIL";
}

// 記錄支付後的資訊
tripOrderService.savePayInfo(order.getOrderNo(), order.getOrderNo(), tradeStatus, order);
return Result.withBuild().data(tradeStatus).msg("data中的值:GK_PAY_SUCCESS(支付成功) | GK_PAY_FAIL(支付失敗) | GK_PAYING(支付中)").build();

}

/**
* 餘額退款
* @param orderId訂單ID
* @param refundAmount退款金額
* @param refundNo退款單號
* @return 
* 時間: 2018年4月12日 下午3:43:56
* 描述:採用 hbase 原生的 checkAndPut實現行級鎖
*/
@Transactional
public Result balanceRefund(String orderId, Long refundAmount,  String refundNo, String reason){
TripOrder order = tripOrderService.selectByPrimaryKey(orderId);
if(null == order){
return Result.fail("查不到訂單資訊");
}
String refundStatus = "GK_REFUNDING";//預設退款中

Organization organization = organizationMapper.selectByRowKey(order.getOrganizationId());
if(null == organization){
return Result.fail("查不到機構資訊");
}

Long balance = organization.getBalance();//當前賬戶餘額
Long restBalance = balance.longValue() + refundAmount.longValue();

boolean result = false;
try {
result = changeBalance(order.getOrganizationId(), balance, restBalance, 2, refundAmount);
} catch (Exception e) {
refundStatus = "GK_REFUND_FAIL";
Log.error(e.getMessage());
}

if(result == true){
refundStatus = "GK_REFUND_SUCCESS";
}else{
refundStatus = "GK_REFUND_FAIL";
}

// 實現系統業務邏輯....

return Result.success(refundStatus,"GK_REFUND_SUCCESS:退款成功 | GK_REFUND_FAIL:退款失敗 | GK_REFUNDING:退款中");
}


/**
* 修改餘額,直接遞迴 呼叫hbase 
* @param rowkeyrowkey機構表主鍵
* @param oldAmount機構當前餘額
* @param newAmount修改後的餘額
* @param type操作型別 :1:支付/扣款,2:退款/充值
* @param price金額(支付/扣款或退款/充值 的金額)
* @return 
* 時間: 2018年5月4日 上午10:55:00
* 描述:
* @throws Exception 
*/
public boolean changeBalance(String rowkey, Long oldAmount, Long newAmount, Integer type, Long price) throws Exception{

boolean result = checkAndPut(rowkey, oldAmount, newAmount);
int count = 1;

if(result == false){//修改失敗(餘額發生改變),則查詢一次餘額,然後呼叫本方法執行遞迴

while (count <= 10){//限制遞迴次數

count ++;
Long balance = get(rowkey);//當前餘額

if(type == 1){// 支付或扣款
newAmount = balance.longValue() - price.longValue();
}else{//退款或充值
newAmount = balance.longValue() + price.longValue();
}

if(balance.longValue() <= 0 || newAmount.longValue() < 0){// 如果當前餘額小於等於零 或  當前餘額不足支付,則  直接返回支付失敗
return false;
}

result = changeBalance(rowkey, balance, newAmount, type, price);


return false;
}

return true;
}


/**
* 檢查並修改餘額 --> 直接呼叫hbase
* @param rowkeyrowkey機構表主鍵
* @param oldAmount機構當前餘額
* @param newAmount修改後的餘額
* @return
* @throws Exception 
* 時間: 2018年5月3日 上午11:57:20
* 描述:
*/
@SuppressWarnings("deprecation")
public boolean checkAndPut(String rowkey, Long oldAmount, Long newAmount) throws Exception {
// 構造一個put物件,引數就是rowkey
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(family), // 列族
Bytes.toBytes(qualifier), // 列名
longToBytes_phoenix(newAmount) // 值
);

// 插入資料
boolean result = getTable().checkAndPut(Bytes.toBytes(rowkey),
Bytes.toBytes(family), Bytes.toBytes(qualifier),
longToBytes_phoenix(oldAmount), put);

return result;
}

/**
* 查詢餘額
* @param rowkeyrowkey機構表主鍵
* @return balance 餘額欄位值
* @throws Exception 
* 時間: 2018年5月4日 上午10:50:42
* 描述:
*/
public Long get(String rowkey) throws Exception{
//構造一個Get物件
Get get = new Get(Bytes.toBytes(rowkey));
//查詢資料
org.apache.hadoop.hbase.client.Result r = getTable().get(get);
byte[] data = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Long balance = bytesToLong_phoenix(data);

return balance;
}


/**
* 獲取hbase表
* @return 
* 時間: 2018年5月2日 下午6:19:54
* 描述:
* @throws IOException 
*/
private HTable getTable() throws IOException {
return new HTable(hbaseTemplate.getConfiguration(), TRIP_ORGANIZATION);
}

//------------    以下是Phoenix 和 hbase 的 int 和 Long 型別的轉換    ----------------------------------------
public int bytesToInt_phoenix(byte[] bytes) {
int n = 0;
for (int i = 0; i < 4; i++) {
n <<= 8;
n ^= bytes[i] & 0xFF;
}
n = n ^ 0x80000000;
return n;
}


public byte[] intToBytes_phoenix(int val) {
val = val ^ 0x80000000;
byte[] b = new byte[4];
for (int i = 3; i > 0; i--) {
b[i] = (byte) val;
val >>= 8;
}
b[0] = (byte) val;
return b;
}


public long bytesToLong_phoenix(byte[] bytes) {
long n = 0;
for (int i = 0; i < 8; i++) {
n <<= 8;
n ^= bytes[i] & 0xFF;
}
n = n ^ 0x8000000000000000l;
return n;
}


public byte[] longToBytes_phoenix(long val) {
val = val ^ 0x8000000000000000l;
byte[] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) val;
val >>= 8;
}
b[0] = (byte) val;
return b;

}

}

7、測試 :經測可用,並已實施到各個專案中

調研的時候 寫過一個1到10的for迴圈 直接修改hbase表,可以完美實現 行級鎖功能,第一個修改的資料返回true ,其餘的都是false。

因業務需求 餘額支付 除 程式異常或 伺服器宕機 外,其餘情況全是支付成功。所以,在以上程式碼中 使用了遞迴。請參見以上程式碼。