1. 程式人生 > >Docker CI: Python 測試 Redis

Docker CI: Python 測試 Redis

Docker CI: Python 測試 Redis

Docker CI: Python 測試 Redis

一、概述

基於 Docker 整合 CI 環境。涉及技術:Linux(Ubuntu 14.04), Docker, Jenkins, Git/Gitlab, Web/Httpbin, Python/Pytest, UI/Selenium, Robotframework, Grid Server, Appium 等。
在這裡插入圖片描述

二、Docker 安裝執行 Redis

# docker pull redis
# docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes
# docker exec -it redis /bin/bash
[email protected]:/data# redis-cli
127.0.0.1:6379> set name allan
OK
127.0.0.1:6379> get name
"allan"

三、Python 安裝 Redis 庫

# pip install redis

三、Python 執行 Redis 的 API

redis-py 的API的使用可以分類為:

  1. 連線方式
  2. 連線池
  3. 操作操作

String 操作:字串、整數和浮點元素
Hash 操作:雜湊。PHP 關聯陣列;Python 字典
List 操作:列表。Python 元組
Set 操作:集合。集合中的元素唯一
Sort Set 操作:有序集合。元素有分值,用於排序

  1. 管道
  2. 釋出訂閱

四、Python 建立 Redis 連線方式

redis-py使用connection pool來管理對一個redis server的所有連線,避免每次建立、釋放連線的開銷。預設,每個Redis例項都會維護一個自己的連線池。可以直接建立一個連線池,然後作為引數 Redis,這樣就可以實現多個Redis例項共享一個連線池。

# coding=utf-8

import os
import redis


class RedisAPI(object):
    def __init__(self):
        self.project_dir = os.getcwd()
        self.project = os.path.join(self.project_dir, 'test_data', 'project.csv')
        self.ip = None
        self.port = None
        self.pool = None


    def get_redis_info(self):
        # Read Project Info from CSV file
        project_info = read_csv_file(self.project)
        self.ip, self.port = project_info[0]['url'], project_info[0]['port']


    def connect_to_redis(self, db=0):
    	"""
	    Returns redis connection.
	
	    Arguments:
	            | db | Redis db Id. |
	    Return: 
	            | rds | redis connection. |
	    Example:
	            | rds = connect_to_redis() |
	
	    """
        if self.ip is None or self.port is None:
            self.get_redis_info()
        return redis.Redis(host=self.ip, port=self.port, db=db)


    def get_redis_pool(self, db=0):
	    """
	    Returns redis from redis pool.
	
	    Arguments:
	            | db | Redis db Id. |
	    Return: 
	            | rds | redis connection from redis pool. |
	    Example:
	            | rds = get_redis_pool() |
	
	    """
        if self.pool is None:
            if self.ip is None or self.port is None:
                self.get_redis_info()
            self.pool = redis.ConnectionPool(host=self.ip, port=self.port, db=db)
        return redis.Redis(connection_pool=self.pool)


	def read_csv_file(self, filename):
	    """
	    Returns the values from the selected csv file.
	
	    Arguments:
	            | filename | Get data from the selected csv file. |
	    Return: 
	            | data | The data will be formed as dict. |
	    Example:
	            | data = read_csv_file(filename) |
	
	    """
	    reload(sys)
	    sys.setdefaultencoding('gbk')
	    
	    with open(filename, 'rb') as csvfile:
	        data = [each for each in csv.DictReader(csvfile)]
	        return data

csv 文件
在這裡插入圖片描述

五、Python 測試 Redis

# coding=utf-8

from RedisAPI import RedisAPI

redis_api = RedisAPI()

r = redis_api.get_redis_pool()
r.set('foo', 'Bar')
print r.get('foo')
print r.get('foo')

rds = redis_api.get_redis_pool()
rds1 = redis_api.get_redis_pool()
rds.set('name', 'allan')
rds1.set('name', 'Sheila')
print rds.get('name')

結果

127.0.0.1:6379>
127.0.0.1:6379> get foo
"Bar"
127.0.0.1:6379> get name
"Sheila"
127.0.0.1:6379>