"""
Connection Pooling
"""
import datetime
import logging
import os
import threading
import time
import weakref
import psycopg2
LOGGER = logging.getLogger(__name__)
DEFAULT_IDLE_TTL = 60
DEFAULT_MAX_SIZE = int(os.environ.get('QUERIES_MAX_POOL_SIZE', 1))
[docs]class Connection(object):
"""Contains the handle to the connection, the current state of the
connection and methods for manipulating the state of the connection.
"""
_lock = threading.Lock()
def __init__(self, handle):
self.handle = handle
self.used_by = None
self.executions = 0
self.exceptions = 0
[docs] def close(self):
"""Close the connection
:raises: ConnectionBusyError
"""
LOGGER.debug('Connection %s closing', self.id)
if self.busy and not self.closed:
raise ConnectionBusyError(self)
with self._lock:
if not self.handle.closed:
try:
self.handle.close()
except psycopg2.InterfaceError as error:
LOGGER.error('Error closing socket: %s', error)
@property
def closed(self):
"""Return if the psycopg2 connection is closed.
:rtype: bool
"""
return self.handle.closed != 0
@property
def busy(self):
"""Return if the connection is currently executing a query or is locked
by a session that still exists.
:rtype: bool
"""
if self.handle.isexecuting():
return True
elif self.used_by is None:
return False
return not self.used_by() is None
@property
def executing(self):
"""Return if the connection is currently executing a query
:rtype: bool
"""
return self.handle.isexecuting()
[docs] def free(self):
"""Remove the lock on the connection if the connection is not active
:raises: ConnectionBusyError
"""
LOGGER.debug('Connection %s freeing', self.id)
if self.handle.isexecuting():
raise ConnectionBusyError(self)
with self._lock:
self.used_by = None
LOGGER.debug('Connection %s freed', self.id)
@property
def id(self):
"""Return id of the psycopg2 connection object
:rtype: int
"""
return id(self.handle)
[docs] def lock(self, session):
"""Lock the connection, ensuring that it is not busy and storing
a weakref for the session.
:param queries.Session session: The session to lock the connection with
:raises: ConnectionBusyError
"""
if self.busy:
raise ConnectionBusyError(self)
with self._lock:
self.used_by = weakref.ref(session)
LOGGER.debug('Connection %s locked', self.id)
@property
def locked(self):
"""Return if the connection is currently exclusively locked
:rtype: bool
"""
return self.used_by is not None
[docs]class Pool(object):
"""A connection pool for gaining access to and managing connections"""
_lock = threading.Lock()
idle_start = None
idle_ttl = DEFAULT_IDLE_TTL
max_size = DEFAULT_MAX_SIZE
def __init__(self,
pool_id,
idle_ttl=DEFAULT_IDLE_TTL,
max_size=DEFAULT_MAX_SIZE,
time_method=None):
self.connections = {}
self._id = pool_id
self.idle_ttl = idle_ttl
self.max_size = max_size
self.time_method = time_method or time.time
def __contains__(self, connection):
"""Return True if the pool contains the connection"""
return id(connection) in self.connections
def __len__(self):
"""Return the number of connections in the pool"""
return len(self.connections)
[docs] def add(self, connection):
"""Add a new connection to the pool
:param connection: The connection to add to the pool
:type connection: psycopg2.extensions.connection
:raises: PoolFullError
"""
if id(connection) in self.connections:
raise ValueError('Connection already exists in pool')
if len(self.connections) == self.max_size:
LOGGER.warning('Race condition found when adding new connection')
try:
connection.close()
except (psycopg2.Error, psycopg2.Warning) as error:
LOGGER.error('Error closing the conn that cant be used: %s',
error)
raise PoolFullError(self)
with self._lock:
self.connections[id(connection)] = Connection(connection)
LOGGER.debug('Pool %s added connection %s', self.id, id(connection))
@property
def busy_connections(self):
"""Return a list of active/busy connections
:rtype: list
"""
return [c for c in self.connections.values()
if c.busy and not c.closed]
[docs] def clean(self):
"""Clean the pool by removing any closed connections and if the pool's
idle has exceeded its idle TTL, remove all connections.
"""
LOGGER.debug('Cleaning the pool')
for connection in [self.connections[k] for k in self.connections if
self.connections[k].closed]:
LOGGER.debug('Removing %s', connection.id)
self.remove(connection.handle)
if self.idle_duration > self.idle_ttl:
self.close()
LOGGER.debug('Pool %s cleaned', self.id)
[docs] def close(self):
"""Close the pool by closing and removing all of the connections"""
for cid in list(self.connections.keys()):
self.remove(self.connections[cid].handle)
LOGGER.debug('Pool %s closed', self.id)
@property
def closed_connections(self):
"""Return a list of closed connections
:rtype: list
"""
return [c for c in self.connections.values() if c.closed]
[docs] def connection_handle(self, connection):
"""Return a connection object for the given psycopg2 connection
:param connection: The connection to return a parent for
:type connection: psycopg2.extensions.connection
:rtype: Connection
"""
return self.connections[id(connection)]
@property
def executing_connections(self):
"""Return a list of connections actively executing queries
:rtype: list
"""
return [c for c in self.connections.values() if c.executing]
[docs] def free(self, connection):
"""Free the connection from use by the session that was using it.
:param connection: The connection to free
:type connection: psycopg2.extensions.connection
:raises: ConnectionNotFoundError
"""
LOGGER.debug('Pool %s freeing connection %s', self.id, id(connection))
try:
self.connection_handle(connection).free()
except KeyError:
raise ConnectionNotFoundError(self.id, id(connection))
if self.idle_connections == list(self.connections.values()):
with self._lock:
self.idle_start = self.time_method()
LOGGER.debug('Pool %s freed connection %s', self.id, id(connection))
[docs] def get(self, session):
"""Return an idle connection and assign the session to the connection
:param queries.Session session: The session to assign
:rtype: psycopg2.extensions.connection
:raises: NoIdleConnectionsError
"""
idle = self.idle_connections
if idle:
connection = idle.pop(0)
connection.lock(session)
if self.idle_start:
with self._lock:
self.idle_start = None
return connection.handle
raise NoIdleConnectionsError(self.id)
@property
def id(self):
"""Return the ID for this pool
:rtype: str
"""
return self._id
@property
def idle_connections(self):
"""Return a list of idle connections
:rtype: list
"""
return [c for c in self.connections.values()
if not c.busy and not c.closed]
@property
def idle_duration(self):
"""Return the number of seconds that the pool has had no active
connections.
:rtype: float
"""
if self.idle_start is None:
return 0
return self.time_method() - self.idle_start
@property
def is_full(self):
"""Return True if there are no more open slots for connections.
:rtype: bool
"""
return len(self.connections) >= self.max_size
[docs] def lock(self, connection, session):
"""Explicitly lock the specified connection
:type connection: psycopg2.extensions.connection
:param connection: The connection to lock
:param queries.Session session: The session to hold the lock
"""
cid = id(connection)
try:
self.connection_handle(connection).lock(session)
except KeyError:
raise ConnectionNotFoundError(self.id, cid)
else:
if self.idle_start:
with self._lock:
self.idle_start = None
LOGGER.debug('Pool %s locked connection %s', self.id, cid)
@property
def locked_connections(self):
"""Return a list of all locked connections
:rtype: list
"""
return [c for c in self.connections.values() if c.locked]
[docs] def remove(self, connection):
"""Remove the connection from the pool
:param connection: The connection to remove
:type connection: psycopg2.extensions.connection
:raises: ConnectionNotFoundError
:raises: ConnectionBusyError
"""
cid = id(connection)
if cid not in self.connections:
raise ConnectionNotFoundError(self.id, cid)
self.connection_handle(connection).close()
with self._lock:
del self.connections[cid]
LOGGER.debug('Pool %s removed connection %s', self.id, cid)
[docs] def report(self):
"""Return a report about the pool state and configuration.
:rtype: dict
"""
return {
'connections': {
'busy': len(self.busy_connections),
'closed': len(self.closed_connections),
'executing': len(self.executing_connections),
'idle': len(self.idle_connections),
'locked': len(self.busy_connections)
},
'exceptions': sum([c.exceptions
for c in self.connections.values()]),
'executions': sum([c.executions
for c in self.connections.values()]),
'full': self.is_full,
'idle': {
'duration': self.idle_duration,
'ttl': self.idle_ttl
},
'max_size': self.max_size
}
[docs] def shutdown(self):
"""Forcefully shutdown the entire pool, closing all non-executing
connections.
:raises: ConnectionBusyError
"""
with self._lock:
for cid in list(self.connections.keys()):
if self.connections[cid].executing:
raise ConnectionBusyError(cid)
if self.connections[cid].locked:
self.connections[cid].free()
self.connections[cid].close()
del self.connections[cid]
[docs] def set_idle_ttl(self, ttl):
"""Set the idle ttl
:param int ttl: The TTL when idle
"""
with self._lock:
self.idle_ttl = ttl
[docs] def set_max_size(self, size):
"""Set the maximum number of connections
:param int size: The maximum number of connections
"""
with self._lock:
self.max_size = size
[docs]class PoolManager(object):
"""The connection pool object implements behavior around connections and
their use in queries.Session objects.
We carry a pool id instead of the connection URI so that we will not be
carrying the URI in memory, creating a possible security issue.
"""
_lock = threading.Lock()
_pools = {}
def __contains__(self, pid):
"""Returns True if the pool exists
:param str pid: The pool id to check for
:rtype: bool
"""
return pid in self.__class__._pools
[docs] @classmethod
def instance(cls):
"""Only allow a single PoolManager instance to exist, returning the
handle for it.
:rtype: PoolManager
"""
if not hasattr(cls, '_instance'):
with cls._lock:
cls._instance = cls()
return cls._instance
[docs] @classmethod
def add(cls, pid, connection):
"""Add a new connection and session to a pool.
:param str pid: The pool id
:type connection: psycopg2.extensions.connection
:param connection: The connection to add to the pool
"""
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].add(connection)
[docs] @classmethod
def clean(cls, pid):
"""Clean the specified pool, removing any closed connections or
stale locks.
:param str pid: The pool id to clean
"""
with cls._lock:
try:
cls._ensure_pool_exists(pid)
except KeyError:
LOGGER.debug('Pool clean invoked against missing pool %s', pid)
return
cls._pools[pid].clean()
cls._maybe_remove_pool(pid)
[docs] @classmethod
def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE,
time_method=None):
"""Create a new pool, with the ability to pass in values to override
the default idle TTL and the default maximum size.
A pool's idle TTL defines the amount of time that a pool can be open
without any sessions before it is removed.
A pool's max size defines the maximum number of connections that can
be added to the pool to prevent unbounded open connections.
:param str pid: The pool ID
:param int idle_ttl: Time in seconds for the idle TTL
:param int max_size: The maximum pool size
:param callable time_method: Override the use of :py:meth:`time.time`
method for time values.
:raises: KeyError
"""
if pid in cls._pools:
raise KeyError('Pool %s already exists' % pid)
with cls._lock:
LOGGER.debug("Creating Pool: %s (%i/%i)", pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size, time_method)
[docs] @classmethod
def free(cls, pid, connection):
"""Free a connection that was locked by a session
:param str pid: The pool ID
:param connection: The connection to remove
:type connection: psycopg2.extensions.connection
"""
with cls._lock:
LOGGER.debug('Freeing %s from pool %s', id(connection), pid)
cls._ensure_pool_exists(pid)
cls._pools[pid].free(connection)
[docs] @classmethod
def get(cls, pid, session):
"""Get an idle, unused connection from the pool. Once a connection has
been retrieved, it will be marked as in-use until it is freed.
:param str pid: The pool ID
:param queries.Session session: The session to assign to the connection
:rtype: psycopg2.extensions.connection
"""
with cls._lock:
cls._ensure_pool_exists(pid)
return cls._pools[pid].get(session)
[docs] @classmethod
def get_connection(cls, pid, connection):
"""Return the specified :class:`~queries.pool.Connection` from the
pool.
:param str pid: The pool ID
:param connection: The connection to return for
:type connection: psycopg2.extensions.connection
:rtype: queries.pool.Connection
"""
with cls._lock:
return cls._pools[pid].connection_handle(connection)
[docs] @classmethod
def has_connection(cls, pid, connection):
"""Check to see if a pool has the specified connection
:param str pid: The pool ID
:param connection: The connection to check for
:type connection: psycopg2.extensions.connection
:rtype: bool
"""
with cls._lock:
cls._ensure_pool_exists(pid)
return connection in cls._pools[pid]
[docs] @classmethod
def has_idle_connection(cls, pid):
"""Check to see if a pool has an idle connection
:param str pid: The pool ID
:rtype: bool
"""
with cls._lock:
cls._ensure_pool_exists(pid)
return bool(cls._pools[pid].idle_connections)
[docs] @classmethod
def is_full(cls, pid):
"""Return a bool indicating if the specified pool is full
:param str pid: The pool id
:rtype: bool
"""
with cls._lock:
cls._ensure_pool_exists(pid)
return cls._pools[pid].is_full
[docs] @classmethod
def lock(cls, pid, connection, session):
"""Explicitly lock the specified connection in the pool
:param str pid: The pool id
:type connection: psycopg2.extensions.connection
:param connection: The connection to add to the pool
:param queries.Session session: The session to hold the lock
"""
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].lock(connection, session)
[docs] @classmethod
def remove(cls, pid):
"""Remove a pool, closing all connections
:param str pid: The pool ID
"""
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].close()
del cls._pools[pid]
[docs] @classmethod
def remove_connection(cls, pid, connection):
"""Remove a connection from the pool, closing it if is open.
:param str pid: The pool ID
:param connection: The connection to remove
:type connection: psycopg2.extensions.connection
:raises: ConnectionNotFoundError
"""
cls._ensure_pool_exists(pid)
cls._pools[pid].remove(connection)
[docs] @classmethod
def set_idle_ttl(cls, pid, ttl):
"""Set the idle TTL for a pool, after which it will be destroyed.
:param str pid: The pool id
:param int ttl: The TTL for an idle pool
"""
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].set_idle_ttl(ttl)
[docs] @classmethod
def set_max_size(cls, pid, size):
"""Set the maximum number of connections for the specified pool
:param str pid: The pool to set the size for
:param int size: The maximum number of connections
"""
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].set_max_size(size)
[docs] @classmethod
def shutdown(cls):
"""Close all connections on in all pools"""
for pid in list(cls._pools.keys()):
cls._pools[pid].shutdown()
LOGGER.info('Shutdown complete, all pooled connections closed')
[docs] @classmethod
def size(cls, pid):
"""Return the number of connections in the pool
:param str pid: The pool id
:rtype int
"""
with cls._lock:
cls._ensure_pool_exists(pid)
return len(cls._pools[pid])
[docs] @classmethod
def report(cls):
"""Return the state of the all of the registered pools.
:rtype: dict
"""
return {
'timestamp': datetime.datetime.utcnow().isoformat(),
'process': os.getpid(),
'pools': dict([(i, p.report()) for i, p in cls._pools.items()])
}
@classmethod
def _ensure_pool_exists(cls, pid):
"""Raise an exception if the pool has yet to be created or has been
removed.
:param str pid: The pool ID to check for
:raises: KeyError
"""
if pid not in cls._pools:
raise KeyError('Pool %s has not been created' % pid)
@classmethod
def _maybe_remove_pool(cls, pid):
"""If the pool has no open connections, remove it
:param str pid: The pool id to clean
"""
if not len(cls._pools[pid]):
del cls._pools[pid]
class QueriesException(Exception):
"""Base Exception for all other Queries exceptions"""
pass
class ConnectionException(QueriesException):
def __init__(self, cid):
self.cid = cid
class PoolException(QueriesException):
def __init__(self, pid):
self.pid = pid
class PoolConnectionException(PoolException):
def __init__(self, pid, cid):
self.pid = pid
self.cid = cid
class ActivePoolError(PoolException):
"""Raised when removing a pool that has active connections"""
def __str__(self):
return 'Pool %s has at least one active connection' % self.pid
class ConnectionBusyError(ConnectionException):
"""Raised when trying to lock a connection that is already busy"""
def __str__(self):
return 'Connection %s is busy' % self.cid
class ConnectionNotFoundError(PoolConnectionException):
"""Raised if a specific connection is not found in the pool"""
def __str__(self):
return 'Connection %s not found in pool %s' % (self.cid, self.pid)
class NoIdleConnectionsError(PoolException):
"""Raised if a pool does not have any idle, open connections"""
def __str__(self):
return 'Pool %s has no idle connections' % self.pid
class PoolFullError(PoolException):
"""Raised when adding a connection to a pool that has hit max-size"""
def __str__(self):
return 'Pool %s is at its maximum capacity' % self.pid