python redis链接建立实现分析

发布时间:2019-09-23 16:57:59编辑:auto阅读(1748)

      今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
    在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:

    redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
    implementation of the Redis protocol.Connection and Pipeline derive from thisimplementing how the commands are sent and received to the Redis server

    使用的方法:

     r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
     r.xxxx()

    有了ConnectionPool这个类之后,可以使用如下方法

    pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
    r = redis.Redis(connection_pool=pool)

    这里Redis是StrictRedis的子类
    简单分析如下:
    在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:

    class StrictRedis(object):
    ........
        def __init__(self, host='localhost', port=6379,
                     db=0, password=None, socket_timeout=None,
                     socket_connect_timeout=None,
                     socket_keepalive=None, socket_keepalive_options=None,
                     connection_pool=None, unix_socket_path=None,
                     encoding='utf-8', encoding_errors='strict',
                     charset=None, errors=None,
                     decode_responses=False, retry_on_timeout=False,
                     ssl=False, ssl_keyfile=None, ssl_certfile=None,
                     ssl_cert_reqs=None, ssl_ca_certs=None):
             if not connection_pool:
                 ..........
                  connection_pool = ConnectionPool(**kwargs)
             self.connection_pool = connection_pool

    在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

       # COMMAND EXECUTION AND PROTOCOL PARSING
        def execute_command(self, *args, **options):
            "Execute a command and return a parsed response"
            pool = self.connection_pool
            command_name = args[0]
            connection = pool.get_connection(command_name, **options)  #调用ConnectionPool.get_connection方法获取一个连接
            try:
                connection.send_command(*args)  #命令执行,这里为Connection.send_command
                return self.parse_response(connection, command_name, **options)
            except (ConnectionError, TimeoutError) as e:
                connection.disconnect()
                if not connection.retry_on_timeout and isinstance(e, TimeoutError):
                    raise
                connection.send_command(*args)  
                return self.parse_response(connection, command_name, **options)
            finally:
                pool.release(connection)  #调用ConnectionPool.release释放连接

    在来看看ConnectionPool类:

         class ConnectionPool(object):  
           ...........
        def __init__(self, connection_class=Connection, max_connections=None,
                     **connection_kwargs):   #类初始化时调用构造函数
            max_connections = max_connections or 2 ** 31
            if not isinstance(max_connections, (int, long)) or max_connections < 0:  #判断输入的max_connections是否合法
                raise ValueError('"max_connections" must be a positive integer')
            self.connection_class = connection_class  #设置对应的参数
            self.connection_kwargs = connection_kwargs
            self.max_connections = max_connections
            self.reset()  #初始化ConnectionPool 时的reset操作
        def reset(self):
            self.pid = os.getpid()
            self._created_connections = 0  #已经创建的连接的计数器
            self._available_connections = []   #声明一个空的数组,用来存放可用的连接
            self._in_use_connections = set()  #声明一个空的集合,用来存放已经在用的连接
            self._check_lock = threading.Lock()
    .......
        def get_connection(self, command_name, *keys, **options):  #在连接池中获取连接的方法
            "Get a connection from the pool"
            self._checkpid()
            try:
                connection = self._available_connections.pop()  #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
                会直接调用make_connection方法
            except IndexError:
                connection = self.make_connection()
            self._in_use_connections.add(connection)   #向代表正在使用的连接的集合中添加元素
            return connection   
        def make_connection(self): #在_available_connections数组为空时获取连接调用的方法
            "Create a new connection"
            if self._created_connections >= self.max_connections:   #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
                raise ConnectionError("Too many connections")
            self._created_connections += 1   #把代表已经创建的连接的数值+1
            return self.connection_class(**self.connection_kwargs)     #返回有效的连接,默认为Connection(**self.connection_kwargs)
        def release(self, connection):  #释放连接,链接并没有断开,只是存在链接池中
            "Releases the connection back to the pool"
            self._checkpid()
            if connection.pid != self.pid:
                return
            self._in_use_connections.remove(connection)   #从集合中删除元素
            self._available_connections.append(connection) #并添加到_available_connections 的数组中
        def disconnect(self): #断开所有连接池中的链接
            "Disconnects all connections in the pool"
            all_conns = chain(self._available_connections,
                              self._in_use_connections)
            for connection in all_conns:
                connection.disconnect()

    execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:

    class Connection(object):
        "Manages TCP communication to and from a Redis server"
        def __del__(self):   #对象删除时的操作,调用disconnect释放连接
            try:
                self.disconnect()
            except Exception:
                pass

    核心的链接建立方法是通过socket模块实现:

        def _connect(self):
            err = None
            for res in socket.getaddrinfo(self.host, self.port, 0,
                                          socket.SOCK_STREAM):
                family, socktype, proto, canonname, socket_address = res
                sock = None
                try:
                    sock = socket.socket(family, socktype, proto)
                    # TCP_NODELAY
                    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                    # TCP_KEEPALIVE
                    if self.socket_keepalive:   #构造函数中默认 socket_keepalive=False,因此这里默认为短连接
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                        for k, v in iteritems(self.socket_keepalive_options):
                            sock.setsockopt(socket.SOL_TCP, k, v)
                    # set the socket_connect_timeout before we connect
                    sock.settimeout(self.socket_connect_timeout)  #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式
                    # connect
                    sock.connect(socket_address)
                    # set the socket_timeout now that we're connected
                    sock.settimeout(self.socket_timeout)  #构造函数中默认socket_timeout=None
                    return sock
                except socket.error as _:
                    err = _
                    if sock is not None:
                        sock.close()
    .....

    关闭链接的方法:

        def disconnect(self):
            "Disconnects from the Redis server"
            self._parser.on_disconnect()
            if self._sock is None:
                return
            try:
                self._sock.shutdown(socket.SHUT_RDWR)  #先shutdown再close
                self._sock.close()
            except socket.error:
                pass
            self._sock = None

            
    可以小结如下
    1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
    2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
    3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。

关键字