1. 程式人生 > >複習電商筆記-29-資料匯入和Redis分片

複習電商筆記-29-資料匯入和Redis分片

 

管道-海量資料匯入

由於做效能測試,需要往redis中匯出千萬級的資料。

得知redis-cli工具支援pipeline匯入可以達到最佳效能。測試下500萬條命令匯入耗時43秒。

 

 

格式要求

官方文件:http://redis.io/topics/mass-insert

資料格式要求:

  1. 以*開始
  2. *n     n代表此條命令分成n個部分
  3. 每個部分以\r\n結束

 

set name tony 表達為:

*3\r\n

$3\r\n

set\r\n

$4\r\n

name\r\n

$4\r\n

tony\r\n

注意:此處的\r\n為換行符,不是輸入的字元。

 

 

示例

package redis;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;

import org.junit.Test;

public class TestRedisPipe {
	/**
	 * 格式化成輸入字串
	 */
	private String getString(String... args) {
		StringBuilder sb = new StringBuilder();
		sb.append("*").append(args.length).append("\r\n");
		for (String arg : args) {
			sb.append("$").append(arg.length()).append("\r\n");
			sb.append(arg).append("\r\n");
		}
		return sb.toString();
	}

	@Test
	public void initFile2() {
		Long startTime = System.currentTimeMillis();
		String file = "d:\\d.txt";
		BufferedWriter w = null;
		StringBuilder sb = new StringBuilder();
		try {
	w = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "utf-8"));
			for(int i=100000000 ;i < 100100000;i++){
			//for (int i = 1; i <= 100; i++) {
				if (i / 30000 == 0) {
					w.flush();
				}
				sb.setLength(0);
				sb.append(this.getString("set", "u" + i, "name" + i));
//sb.append(this.getString("hmset", "usr" + i, "userid", "usr" + i, "username", "usrname" + i));
				w.append(sb.toString());
			}
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				w.flush();
				w.close();
			} catch (IOException e) {
				e.printStackTrace();
			}

		}
		long endTime = System.currentTimeMillis();
		System.out.println("耗時: "+(endTime - startTime)/1000+" s。");
	}
}

 

 

常見問題

[[email protected] redis]# cat d.txt |redis-cli --pipe

ERR Protocol error: too big mbulk count string

Error writing to the server: Connection reset by peer

檔案太大,和所分配的記憶體大小密切相關,記憶體太少則會導致檔案太大匯入失敗。

 

安裝兩個服務

 

 

開啟6379埠

/sbin/iptables -I INPUT -p tcp --dport 6379 -j ACCEPT

/etc/rc.d/init.d/iptables save     #修改生效

/etc/init.d/iptables status        #檢視配置

 

 

複製改埠無需再次安裝

只需要複製配置檔案,啟動時選擇配置檔案即可。

cd /usr/local/src/redis/redis.2.8.17

cp redis.conf redis6380.conf

vi redis6380.conf    #修改埠為6380

redis-server redis6380.conf

注意:啟動後,會殘留些資料,不完全,必須flushall清除掉。

 

 

簡潔開啟例項

redis-server --port 6300 --daemonize yes

 

 

開放遠端訪問

redis.conf中bind預設繫結127.0.0.1,只有本地可以訪問。

ps -ef |grep redis

root  2545  2532  005:51 pts/0   00:00:07 redis-server *:6379

root  2710  2674  006:14 pts/2   00:00:05 redis-server 127.0.0.1:6479

講bind 127.0.0.1註釋掉,前面加個#即可

root  2545  2532  005:51 pts/0   00:00:07 redis-server *:6379

root  2710  2674  006:14 pts/2   00:00:05 redis-server *:6479

    變成兩個*即可遠端訪問,可以看出預設的redis.conf和複製後的檔案還是有差異的。是個坑啊。

 

Redis分片

訪問redis的驅動包。

使用最為廣泛的是Jedis和Redisson(官方推薦),在企業中採用最多的是Jedis,我們重點學習Jedis。

Jedis官網地址:https://github.com/xetorthio/jedis

 

 

第一個jedis示例

package redis;

import java.util.List;

import redis.clients.jedis.Jedis;

public class TestRedis {
	public static void main(String[] args) {
		//設定連線伺服器IP地址和訪問埠
		Jedis jedis = new Jedis("192.168.115.115",6379);
		
		//單個值
		//jedis.set("test", "456789");				//設定值
		//System.out.println(jedis.get("test"));		//獲取值
		
		//多個值
		//jedis.mset("test1","1","test2","2");
		List<String> oList = jedis.mget("test1","test2");
		for(String s : oList){
			System.out.println(s);
		}
		
		jedis.close();	//關閉
	}
}

命令視窗:

127.0.0.1:6379> keys *
1) "bomb"
127.0.0.1:6379> get bomb
"tnt"
127.0.0.1:6379>

 

 

連線池JedisPool建立jedis連線

package cn.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisPoolDemo {

    public static void main(String[] args) {
        // 構建連線池配置資訊
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 設定最大連線數
        jedisPoolConfig.setMaxTotal(200);

        // 構建連線池
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379);

        // 從連線池中獲取連線
        Jedis jedis = jedisPool.getResource();

        // 讀取資料
        System.out.println(jedis.get("bomb"));

        // 將連線還回到連線池中
        jedisPool.returnResource(jedis);

        // 釋放連線池
        jedisPool.close();
    }
}

 

 

分片ShardedJedisPool

實現分散式快取,Redis多個節點的透明訪問

@Test	//分片
	public void shard(){
	
		
		//構造各個節點連結資訊,host和port
		List<JedisShardInfo> infoList = new ArrayList<JedisShardInfo>();
		JedisShardInfo info1 = new JedisShardInfo("192.168.163.200",6379);
		//info1.setPassword("123456");
		infoList.add(info1);
		JedisShardInfo info2 = new JedisShardInfo("192.168.163.200",6380);
		infoList.add(info2);
		JedisShardInfo info3 = new JedisShardInfo("192.168.163.200",6381);
		infoList.add(info3);
		
		//分片jedis
		
		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxTotal(500);	//最大連結數
		
		ShardedJedisPool pool = new ShardedJedisPool(config, infoList);
		//ShardedJedis jedis = new ShardedJedis(infoList);
		ShardedJedis jedis = pool.getResource();	//從pool中獲取
		for(int i=0;i<10;i++){
			jedis.set("n"+i, "t"+i);
		}
		System.out.println(jedis.get("n9"));
		jedis.close();
	}

 

 

資料傾斜

3個節點,可以看到n為key時會發生資料傾斜,而換成text就緩解很多。

	redis			CRC16
name+I		43/29/27		38/26/35
text+I		29/34/36		28/35/36

CRC16hash測試

package redis;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
public class Crc16Mod {

	@Test
	public void runCrc() {
		for (int i = 1; i < 100; i++) {
			System.out.println(this.getCrc(("name" + i).getBytes()) % 3);
		}
	}

	private static Integer getCrc(byte[] data) {
		int high;
		int flag;

		// 16位暫存器,所有數位均為1
		int wcrc = 0xffff;
		for (int i = 0; i < data.length; i++) {
			// 16 位暫存器的高位位元組
			high = wcrc >> 8;
			// 取被校驗串的一個位元組與 16 位暫存器的高位位元組進行“異或”運算
			wcrc = high ^ data[i];

			for (int j = 0; j < 8; j++) {
				flag = wcrc & 0x0001;
				// 把這個 16 暫存器向右移一位
				wcrc = wcrc >> 1;
				// 若向右(標記位)移出的數位是 1,則生成多項式 1010 0000 0000 0001 和這個暫存器進行“異或”運算
				if (flag == 1)
					wcrc ^= 0xa001;
			}
		}

		// return Integer.toHexString(wcrc);
		return wcrc;
	}
}