複習電商筆記-29-資料匯入和Redis分片
管道-海量資料匯入
由於做效能測試,需要往redis中匯出千萬級的資料。
得知redis-cli工具支援pipeline匯入可以達到最佳效能。測試下500萬條命令匯入耗時43秒。
格式要求
官方文件:http://redis.io/topics/mass-insert
資料格式要求:
- 以*開始
- *n n代表此條命令分成n個部分
- 每個部分以\r\n結束
set name tony 表達為:
*3\r\n $3\r\n set\r\n $4\r\n name\r\n $4\r\n tony\r\n
注意:此處的\r\n為換行符,不是輸入的字元。
示例
package redis; import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import org.junit.Test; public class TestRedisPipe { /** * 格式化成輸入字串 */ private String getString(String... args) { StringBuilder sb = new StringBuilder(); sb.append("*").append(args.length).append("\r\n"); for (String arg : args) { sb.append("$").append(arg.length()).append("\r\n"); sb.append(arg).append("\r\n"); } return sb.toString(); } @Test public void initFile2() { Long startTime = System.currentTimeMillis(); String file = "d:\\d.txt"; BufferedWriter w = null; StringBuilder sb = new StringBuilder(); try { w = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "utf-8")); for(int i=100000000 ;i < 100100000;i++){ //for (int i = 1; i <= 100; i++) { if (i / 30000 == 0) { w.flush(); } sb.setLength(0); sb.append(this.getString("set", "u" + i, "name" + i)); //sb.append(this.getString("hmset", "usr" + i, "userid", "usr" + i, "username", "usrname" + i)); w.append(sb.toString()); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { try { w.flush(); w.close(); } catch (IOException e) { e.printStackTrace(); } } long endTime = System.currentTimeMillis(); System.out.println("耗時: "+(endTime - startTime)/1000+" s。"); } }
常見問題
[[email protected] redis]# cat d.txt |redis-cli --pipe
ERR Protocol error: too big mbulk count string
Error writing to the server: Connection reset by peer
檔案太大,和所分配的記憶體大小密切相關,記憶體太少則會導致檔案太大匯入失敗。
安裝兩個服務
開啟6379埠
/sbin/iptables -I INPUT -p tcp --dport 6379 -j ACCEPT /etc/rc.d/init.d/iptables save #修改生效 /etc/init.d/iptables status #檢視配置
複製改埠無需再次安裝
只需要複製配置檔案,啟動時選擇配置檔案即可。
cd /usr/local/src/redis/redis.2.8.17
cp redis.conf redis6380.conf
vi redis6380.conf #修改埠為6380
redis-server redis6380.conf
注意:啟動後,會殘留些資料,不完全,必須flushall清除掉。
簡潔開啟例項
redis-server --port 6300 --daemonize yes
開放遠端訪問
redis.conf中bind預設繫結127.0.0.1,只有本地可以訪問。
ps -ef |grep redis
root 2545 2532 005:51 pts/0 00:00:07 redis-server *:6379
root 2710 2674 006:14 pts/2 00:00:05 redis-server 127.0.0.1:6479
講bind 127.0.0.1註釋掉,前面加個#即可
root 2545 2532 005:51 pts/0 00:00:07 redis-server *:6379
root 2710 2674 006:14 pts/2 00:00:05 redis-server *:6479
變成兩個*即可遠端訪問,可以看出預設的redis.conf和複製後的檔案還是有差異的。是個坑啊。
Redis分片
訪問redis的驅動包。
使用最為廣泛的是Jedis和Redisson(官方推薦),在企業中採用最多的是Jedis,我們重點學習Jedis。
Jedis官網地址:https://github.com/xetorthio/jedis
第一個jedis示例
package redis;
import java.util.List;
import redis.clients.jedis.Jedis;
public class TestRedis {
public static void main(String[] args) {
//設定連線伺服器IP地址和訪問埠
Jedis jedis = new Jedis("192.168.115.115",6379);
//單個值
//jedis.set("test", "456789"); //設定值
//System.out.println(jedis.get("test")); //獲取值
//多個值
//jedis.mset("test1","1","test2","2");
List<String> oList = jedis.mget("test1","test2");
for(String s : oList){
System.out.println(s);
}
jedis.close(); //關閉
}
}
命令視窗:
127.0.0.1:6379> keys *
1) "bomb"
127.0.0.1:6379> get bomb
"tnt"
127.0.0.1:6379>
連線池JedisPool建立jedis連線
package cn.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisPoolDemo {
public static void main(String[] args) {
// 構建連線池配置資訊
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 設定最大連線數
jedisPoolConfig.setMaxTotal(200);
// 構建連線池
JedisPool jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379);
// 從連線池中獲取連線
Jedis jedis = jedisPool.getResource();
// 讀取資料
System.out.println(jedis.get("bomb"));
// 將連線還回到連線池中
jedisPool.returnResource(jedis);
// 釋放連線池
jedisPool.close();
}
}
分片ShardedJedisPool
實現分散式快取,Redis多個節點的透明訪問
@Test //分片
public void shard(){
//構造各個節點連結資訊,host和port
List<JedisShardInfo> infoList = new ArrayList<JedisShardInfo>();
JedisShardInfo info1 = new JedisShardInfo("192.168.163.200",6379);
//info1.setPassword("123456");
infoList.add(info1);
JedisShardInfo info2 = new JedisShardInfo("192.168.163.200",6380);
infoList.add(info2);
JedisShardInfo info3 = new JedisShardInfo("192.168.163.200",6381);
infoList.add(info3);
//分片jedis
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(500); //最大連結數
ShardedJedisPool pool = new ShardedJedisPool(config, infoList);
//ShardedJedis jedis = new ShardedJedis(infoList);
ShardedJedis jedis = pool.getResource(); //從pool中獲取
for(int i=0;i<10;i++){
jedis.set("n"+i, "t"+i);
}
System.out.println(jedis.get("n9"));
jedis.close();
}
資料傾斜
3個節點,可以看到n為key時會發生資料傾斜,而換成text就緩解很多。
redis CRC16
name+I 43/29/27 38/26/35
text+I 29/34/36 28/35/36
CRC16hash測試
package redis;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
public class Crc16Mod {
@Test
public void runCrc() {
for (int i = 1; i < 100; i++) {
System.out.println(this.getCrc(("name" + i).getBytes()) % 3);
}
}
private static Integer getCrc(byte[] data) {
int high;
int flag;
// 16位暫存器,所有數位均為1
int wcrc = 0xffff;
for (int i = 0; i < data.length; i++) {
// 16 位暫存器的高位位元組
high = wcrc >> 8;
// 取被校驗串的一個位元組與 16 位暫存器的高位位元組進行“異或”運算
wcrc = high ^ data[i];
for (int j = 0; j < 8; j++) {
flag = wcrc & 0x0001;
// 把這個 16 暫存器向右移一位
wcrc = wcrc >> 1;
// 若向右(標記位)移出的數位是 1,則生成多項式 1010 0000 0000 0001 和這個暫存器進行“異或”運算
if (flag == 1)
wcrc ^= 0xa001;
}
}
// return Integer.toHexString(wcrc);
return wcrc;
}
}