Source code for queries.pool

"""
Connection Pooling

"""
import datetime
import logging
import os
import threading
import time
import weakref

import psycopg2

LOGGER = logging.getLogger(__name__)

DEFAULT_IDLE_TTL = 60
DEFAULT_MAX_SIZE = int(os.environ.get('QUERIES_MAX_POOL_SIZE', 1))


[docs]class Connection(object): """Contains the handle to the connection, the current state of the connection and methods for manipulating the state of the connection. """ _lock = threading.Lock() def __init__(self, handle): self.handle = handle self.used_by = None self.executions = 0 self.exceptions = 0
[docs] def close(self): """Close the connection :raises: ConnectionBusyError """ LOGGER.debug('Connection %s closing', self.id) if self.busy: raise ConnectionBusyError(self) with self._lock: if not self.handle.closed: try: self.handle.close() except psycopg2.InterfaceError as error: LOGGER.error('Error closing socket: %s', error)
@property def closed(self): """Return if the psycopg2 connection is closed. :rtype: bool """ return self.handle.closed != 0 @property def busy(self): """Return if the connection is currently executing a query or is locked by a session that still exists. :rtype: bool """ if self.handle.isexecuting(): return True elif self.used_by is None: return False return not self.used_by() is None @property def executing(self): """Return if the connection is currently executing a query :rtype: bool """ return self.handle.isexecuting()
[docs] def free(self): """Remove the lock on the connection if the connection is not active :raises: ConnectionBusyError """ LOGGER.debug('Connection %s freeing', self.id) if self.handle.isexecuting(): raise ConnectionBusyError(self) with self._lock: self.used_by = None LOGGER.debug('Connection %s freed', self.id)
@property def id(self): """Return id of the psycopg2 connection object :rtype: int """ return id(self.handle)
[docs] def lock(self, session): """Lock the connection, ensuring that it is not busy and storing a weakref for the session. :param queries.Session session: The session to lock the connection with :raises: ConnectionBusyError """ if self.busy: raise ConnectionBusyError(self) with self._lock: self.used_by = weakref.ref(session) LOGGER.debug('Connection %s locked', self.id)
@property def locked(self): """Return if the connection is currently exclusively locked :rtype: bool """ return self.used_by is not None
[docs]class Pool(object): """A connection pool for gaining access to and managing connections""" _lock = threading.Lock() idle_start = None idle_ttl = DEFAULT_IDLE_TTL max_size = DEFAULT_MAX_SIZE def __init__(self, pool_id, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE, time_method=None): self.connections = {} self._id = pool_id self.idle_ttl = idle_ttl self.max_size = max_size self.time_method = time_method or time.time def __contains__(self, connection): """Return True if the pool contains the connection""" return id(connection) in self.connections def __len__(self): """Return the number of connections in the pool""" return len(self.connections)
[docs] def add(self, connection): """Add a new connection to the pool :param connection: The connection to add to the pool :type connection: psycopg2.extensions.connection :raises: PoolFullError """ if id(connection) in self.connections: raise ValueError('Connection already exists in pool') if len(self.connections) == self.max_size: LOGGER.warning('Race condition found when adding new connection') try: connection.close() except (psycopg2.Error, psycopg2.Warning) as error: LOGGER.error('Error closing the conn that cant be used: %s', error) raise PoolFullError(self) with self._lock: self.connections[id(connection)] = Connection(connection) LOGGER.debug('Pool %s added connection %s', self.id, id(connection))
@property def busy_connections(self): """Return a list of active/busy connections :rtype: list """ return [c for c in self.connections.values() if c.busy and not c.closed]
[docs] def clean(self): """Clean the pool by removing any closed connections and if the pool's idle has exceeded its idle TTL, remove all connections. """ LOGGER.debug('Cleaning the pool') for connection in [self.connections[k] for k in self.connections if self.connections[k].closed]: LOGGER.debug('Removing %s', connection.id) self.remove(connection.handle) if self.idle_duration > self.idle_ttl: self.close() LOGGER.debug('Pool %s cleaned', self.id)
[docs] def close(self): """Close the pool by closing and removing all of the connections""" for cid in list(self.connections.keys()): self.remove(self.connections[cid].handle) LOGGER.debug('Pool %s closed', self.id)
@property def closed_connections(self): """Return a list of closed connections :rtype: list """ return [c for c in self.connections.values() if c.closed]
[docs] def connection_handle(self, connection): """Return a connection object for the given psycopg2 connection :param connection: The connection to return a parent for :type connection: psycopg2.extensions.connection :rtype: Connection """ return self.connections[id(connection)]
@property def executing_connections(self): """Return a list of connections actively executing queries :rtype: list """ return [c for c in self.connections.values() if c.executing]
[docs] def free(self, connection): """Free the connection from use by the session that was using it. :param connection: The connection to free :type connection: psycopg2.extensions.connection :raises: ConnectionNotFoundError """ LOGGER.debug('Pool %s freeing connection %s', self.id, id(connection)) try: self.connection_handle(connection).free() except KeyError: raise ConnectionNotFoundError(self.id, id(connection)) if self.idle_connections == list(self.connections.values()): with self._lock: self.idle_start = self.time_method() LOGGER.debug('Pool %s freed connection %s', self.id, id(connection))
[docs] def get(self, session): """Return an idle connection and assign the session to the connection :param queries.Session session: The session to assign :rtype: psycopg2.extensions.connection :raises: NoIdleConnectionsError """ idle = self.idle_connections if idle: connection = idle.pop(0) connection.lock(session) if self.idle_start: with self._lock: self.idle_start = None return connection.handle raise NoIdleConnectionsError(self.id)
@property def id(self): """Return the ID for this pool :rtype: str """ return self._id @property def idle_connections(self): """Return a list of idle connections :rtype: list """ return [c for c in self.connections.values() if not c.busy and not c.closed] @property def idle_duration(self): """Return the number of seconds that the pool has had no active connections. :rtype: float """ if self.idle_start is None: return 0 return self.time_method() - self.idle_start @property def is_full(self): """Return True if there are no more open slots for connections. :rtype: bool """ return len(self.connections) >= self.max_size
[docs] def lock(self, connection, session): """Explicitly lock the specified connection :type connection: psycopg2.extensions.connection :param connection: The connection to lock :param queries.Session session: The session to hold the lock """ cid = id(connection) try: self.connection_handle(connection).lock(session) except KeyError: raise ConnectionNotFoundError(self.id, cid) else: if self.idle_start: with self._lock: self.idle_start = None LOGGER.debug('Pool %s locked connection %s', self.id, cid)
@property def locked_connections(self): """Return a list of all locked connections :rtype: list """ return [c for c in self.connections.values() if c.locked]
[docs] def remove(self, connection): """Remove the connection from the pool :param connection: The connection to remove :type connection: psycopg2.extensions.connection :raises: ConnectionNotFoundError :raises: ConnectionBusyError """ cid = id(connection) if cid not in self.connections: raise ConnectionNotFoundError(self.id, cid) self.connection_handle(connection).close() with self._lock: del self.connections[cid] LOGGER.debug('Pool %s removed connection %s', self.id, cid)
[docs] def report(self): """Return a report about the pool state and configuration. :rtype: dict """ return { 'connections': { 'busy': len(self.busy_connections), 'closed': len(self.closed_connections), 'executing': len(self.executing_connections), 'idle': len(self.idle_connections), 'locked': len(self.busy_connections) }, 'exceptions': sum([c.exceptions for c in self.connections.values()]), 'executions': sum([c.executions for c in self.connections.values()]), 'full': self.is_full, 'idle': { 'duration': self.idle_duration, 'ttl': self.idle_ttl }, 'max_size': self.max_size }
[docs] def shutdown(self): """Forcefully shutdown the entire pool, closing all non-executing connections. :raises: ConnectionBusyError """ with self._lock: for cid in list(self.connections.keys()): if self.connections[cid].executing: raise ConnectionBusyError(cid) if self.connections[cid].locked: self.connections[cid].free() self.connections[cid].close() del self.connections[cid]
[docs] def set_idle_ttl(self, ttl): """Set the idle ttl :param int ttl: The TTL when idle """ with self._lock: self.idle_ttl = ttl
[docs] def set_max_size(self, size): """Set the maximum number of connections :param int size: The maximum number of connections """ with self._lock: self.max_size = size
[docs]class PoolManager(object): """The connection pool object implements behavior around connections and their use in queries.Session objects. We carry a pool id instead of the connection URI so that we will not be carrying the URI in memory, creating a possible security issue. """ _lock = threading.Lock() _pools = {} def __contains__(self, pid): """Returns True if the pool exists :param str pid: The pool id to check for :rtype: bool """ return pid in self.__class__._pools
[docs] @classmethod def instance(cls): """Only allow a single PoolManager instance to exist, returning the handle for it. :rtype: PoolManager """ if not hasattr(cls, '_instance'): with cls._lock: cls._instance = cls() return cls._instance
[docs] @classmethod def add(cls, pid, connection): """Add a new connection and session to a pool. :param str pid: The pool id :type connection: psycopg2.extensions.connection :param connection: The connection to add to the pool """ with cls._lock: cls._ensure_pool_exists(pid) cls._pools[pid].add(connection)
[docs] @classmethod def clean(cls, pid): """Clean the specified pool, removing any closed connections or stale locks. :param str pid: The pool id to clean """ with cls._lock: try: cls._ensure_pool_exists(pid) except KeyError: LOGGER.debug('Pool clean invoked against missing pool %s', pid) return cls._pools[pid].clean() cls._maybe_remove_pool(pid)
[docs] @classmethod def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE, time_method=None): """Create a new pool, with the ability to pass in values to override the default idle TTL and the default maximum size. A pool's idle TTL defines the amount of time that a pool can be open without any sessions before it is removed. A pool's max size defines the maximum number of connections that can be added to the pool to prevent unbounded open connections. :param str pid: The pool ID :param int idle_ttl: Time in seconds for the idle TTL :param int max_size: The maximum pool size :param callable time_method: Override the use of :py:meth:`time.time` method for time values. :raises: KeyError """ if pid in cls._pools: raise KeyError('Pool %s already exists' % pid) with cls._lock: LOGGER.debug("Creating Pool: %s (%i/%i)", pid, idle_ttl, max_size) cls._pools[pid] = Pool(pid, idle_ttl, max_size, time_method)
[docs] @classmethod def free(cls, pid, connection): """Free a connection that was locked by a session :param str pid: The pool ID :param connection: The connection to remove :type connection: psycopg2.extensions.connection """ with cls._lock: LOGGER.debug('Freeing %s from pool %s', id(connection), pid) cls._ensure_pool_exists(pid) cls._pools[pid].free(connection)
[docs] @classmethod def get(cls, pid, session): """Get an idle, unused connection from the pool. Once a connection has been retrieved, it will be marked as in-use until it is freed. :param str pid: The pool ID :param queries.Session session: The session to assign to the connection :rtype: psycopg2.extensions.connection """ with cls._lock: cls._ensure_pool_exists(pid) return cls._pools[pid].get(session)
[docs] @classmethod def get_connection(cls, pid, connection): """Return the specified :class:`~queries.pool.Connection` from the pool. :param str pid: The pool ID :param connection: The connection to return for :type connection: psycopg2.extensions.connection :rtype: queries.pool.Connection """ with cls._lock: return cls._pools[pid].connection_handle(connection)
[docs] @classmethod def has_connection(cls, pid, connection): """Check to see if a pool has the specified connection :param str pid: The pool ID :param connection: The connection to check for :type connection: psycopg2.extensions.connection :rtype: bool """ with cls._lock: cls._ensure_pool_exists(pid) return connection in cls._pools[pid]
[docs] @classmethod def has_idle_connection(cls, pid): """Check to see if a pool has an idle connection :param str pid: The pool ID :rtype: bool """ with cls._lock: cls._ensure_pool_exists(pid) return bool(cls._pools[pid].idle_connections)
[docs] @classmethod def is_full(cls, pid): """Return a bool indicating if the specified pool is full :param str pid: The pool id :rtype: bool """ with cls._lock: cls._ensure_pool_exists(pid) return cls._pools[pid].is_full
[docs] @classmethod def lock(cls, pid, connection, session): """Explicitly lock the specified connection in the pool :param str pid: The pool id :type connection: psycopg2.extensions.connection :param connection: The connection to add to the pool :param queries.Session session: The session to hold the lock """ with cls._lock: cls._ensure_pool_exists(pid) cls._pools[pid].lock(connection, session)
[docs] @classmethod def remove(cls, pid): """Remove a pool, closing all connections :param str pid: The pool ID """ with cls._lock: cls._ensure_pool_exists(pid) cls._pools[pid].close() del cls._pools[pid]
[docs] @classmethod def remove_connection(cls, pid, connection): """Remove a connection from the pool, closing it if is open. :param str pid: The pool ID :param connection: The connection to remove :type connection: psycopg2.extensions.connection :raises: ConnectionNotFoundError """ cls._ensure_pool_exists(pid) cls._pools[pid].remove(connection)
[docs] @classmethod def set_idle_ttl(cls, pid, ttl): """Set the idle TTL for a pool, after which it will be destroyed. :param str pid: The pool id :param int ttl: The TTL for an idle pool """ with cls._lock: cls._ensure_pool_exists(pid) cls._pools[pid].set_idle_ttl(ttl)
[docs] @classmethod def set_max_size(cls, pid, size): """Set the maximum number of connections for the specified pool :param str pid: The pool to set the size for :param int size: The maximum number of connections """ with cls._lock: cls._ensure_pool_exists(pid) cls._pools[pid].set_max_size(size)
[docs] @classmethod def shutdown(cls): """Close all connections on in all pools""" for pid in list(cls._pools.keys()): cls._pools[pid].shutdown() LOGGER.info('Shutdown complete, all pooled connections closed')
[docs] @classmethod def size(cls, pid): """Return the number of connections in the pool :param str pid: The pool id :rtype int """ with cls._lock: cls._ensure_pool_exists(pid) return len(cls._pools[pid])
[docs] @classmethod def report(cls): """Return the state of the all of the registered pools. :rtype: dict """ return { 'timestamp': datetime.datetime.utcnow().isoformat(), 'process': os.getpid(), 'pools': dict([(i, p.report()) for i, p in cls._pools.items()]) }
@classmethod def _ensure_pool_exists(cls, pid): """Raise an exception if the pool has yet to be created or has been removed. :param str pid: The pool ID to check for :raises: KeyError """ if pid not in cls._pools: raise KeyError('Pool %s has not been created' % pid) @classmethod def _maybe_remove_pool(cls, pid): """If the pool has no open connections, remove it :param str pid: The pool id to clean """ if not len(cls._pools[pid]): del cls._pools[pid]
class QueriesException(Exception): """Base Exception for all other Queries exceptions""" pass class ConnectionException(QueriesException): def __init__(self, cid): self.cid = cid class PoolException(QueriesException): def __init__(self, pid): self.pid = pid class PoolConnectionException(PoolException): def __init__(self, pid, cid): self.pid = pid self.cid = cid class ActivePoolError(PoolException): """Raised when removing a pool that has active connections""" def __str__(self): return 'Pool %s has at least one active connection' % self.pid class ConnectionBusyError(ConnectionException): """Raised when trying to lock a connection that is already busy""" def __str__(self): return 'Connection %s is busy' % self.cid class ConnectionNotFoundError(PoolConnectionException): """Raised if a specific connection is not found in the pool""" def __str__(self): return 'Connection %s not found in pool %s' % (self.cid, self.pid) class NoIdleConnectionsError(PoolException): """Raised if a pool does not have any idle, open connections""" def __str__(self): return 'Pool %s has no idle connections' % self.pid class PoolFullError(PoolException): """Raised when adding a connection to a pool that has hit max-size""" def __str__(self): return 'Pool %s is at its maximum capacity' % self.pid