"""
Tornado Session Adapter
Use Queries asynchronously within the Tornado framework.
Example Use:
.. code:: python
class NameListHandler(web.RequestHandler):
def initialize(self):
self.session = queries.TornadoSession(pool_max_size=60)
@gen.coroutine
def get(self):
data = yield self.session.query('SELECT * FROM names')
if data:
self.finish({'names': data.items()})
data.free()
else:
self.set_status(500, 'Error querying the data')
"""
import logging
import socket
import warnings
from tornado import concurrent, ioloop
from psycopg2 import extras, extensions
import psycopg2
from queries import pool, results, session, utils
LOGGER = logging.getLogger(__name__)
DEFAULT_MAX_POOL_SIZE = 25
[docs]class Results(results.Results):
"""A TornadoSession specific :py:class:`queries.Results` class that adds
the :py:meth:`Results.free <queries.tornado_session.Results.free>` method.
The :py:meth:`Results.free <queries.tornado_session.Results.free>` method
**must** be called to free the connection that the results were generated
on. `Results` objects that are not freed will cause the connections to
remain locked and your application will eventually run out of connections
in the pool.
The following examples illustrate the various behaviors that the
::py:class:`queries.Results <queries.tornado_session.Requests>` class
implements:
**Using Results as an Iterator**
.. code:: python
results = yield session.query('SELECT * FROM foo')
for row in results
print row
results.free()
**Accessing an individual row by index**
.. code:: python
results = yield session.query('SELECT * FROM foo')
print results[1] # Access the second row of the results
results.free()
**Casting single row results as a dict**
.. code:: python
results = yield session.query('SELECT * FROM foo LIMIT 1')
print results.as_dict()
results.free()
**Checking to see if a query was successful**
.. code:: python
sql = "UPDATE foo SET bar='baz' WHERE qux='corgie'"
results = yield session.query(sql)
if results:
print 'Success'
results.free()
**Checking the number of rows by using len(Results)**
.. code:: python
results = yield session.query('SELECT * FROM foo')
print '%i rows' % len(results)
results.free()
"""
def __init__(self, cursor, cleanup, fd):
self.cursor = cursor
self._cleanup = cleanup
self._fd = fd
self._freed = False
[docs] def free(self):
"""Release the results and connection lock from the TornadoSession
object. This **must** be called after you finish processing the results
from :py:meth:`TornadoSession.query <queries.TornadoSession.query>` or
:py:meth:`TornadoSession.callproc <queries.TornadoSession.callproc>`
or the connection will not be able to be reused by other asynchronous
requests.
"""
self._freed = True
self._cleanup(self.cursor, self._fd)
def __del__(self):
if not self._freed:
LOGGER.warning('Auto-freeing result on deletion')
self.free()
[docs]class TornadoSession(session.Session):
"""Session class for Tornado asynchronous applications. Uses
:py:func:`tornado.gen.coroutine` to wrap API methods for use in Tornado.
Utilizes connection pooling to ensure that multiple concurrent asynchronous
queries do not block each other. Heavily trafficked services will require
a higher ``max_pool_size`` to allow for greater connection concurrency.
:py:meth:`TornadoSession.query <queries.TornadoSession.query>` and
:py:meth:`TornadoSession.callproc <queries.TornadoSession.callproc>` must
call :py:meth:`Results.free <queries.tornado_session.Results.free>`
:param str uri: PostgreSQL connection URI
:param psycopg2.extensions.cursor: The cursor type to use
:param int pool_idle_ttl: How long idle pools keep connections open
:param int pool_max_size: The maximum size of the pool to use
"""
def __init__(self, uri=session.DEFAULT_URI,
cursor_factory=extras.RealDictCursor,
pool_idle_ttl=pool.DEFAULT_IDLE_TTL,
pool_max_size=DEFAULT_MAX_POOL_SIZE,
io_loop=None):
"""Connect to a PostgreSQL server using the module wide connection and
set the isolation level.
:param str uri: PostgreSQL connection URI
:param psycopg2.extensions.cursor: The cursor type to use
:param int pool_idle_ttl: How long idle pools keep connections open
:param int pool_max_size: The maximum size of the pool to use
:param tornado.ioloop.IOLoop io_loop: IOLoop instance to use
"""
self._connections = dict()
self._cleanup_callback = None
self._cursor_factory = cursor_factory
self._futures = dict()
self._ioloop = io_loop or ioloop.IOLoop.current()
self._pool_manager = pool.PoolManager.instance()
self._pool_max_size = pool_max_size
self._pool_idle_ttl = pool_idle_ttl
self._uri = uri
self._ensure_pool_exists()
def _ensure_pool_exists(self):
"""Create the pool in the pool manager if it does not exist."""
if self.pid not in self._pool_manager:
self._pool_manager.create(self.pid, self._pool_idle_ttl,
self._pool_max_size, self._ioloop.time)
@property
def connection(self):
"""Do not use this directly with Tornado applications
:return:
"""
return None
@property
def cursor(self):
return None
[docs] def callproc(self, name, args=None):
"""Call a stored procedure asynchronously on the server, passing in the
arguments to be passed to the stored procedure, yielding the results
as a :py:class:`Results <queries.tornado_session.Results>` object.
You **must** free the results that are returned by this method to
unlock the connection used to perform the query. Failure to do so
will cause your Tornado application to run out of connections.
:param str name: The stored procedure name
:param list args: An optional list of procedure arguments
:rtype: Results
:raises: queries.DataError
:raises: queries.DatabaseError
:raises: queries.IntegrityError
:raises: queries.InternalError
:raises: queries.InterfaceError
:raises: queries.NotSupportedError
:raises: queries.OperationalError
:raises: queries.ProgrammingError
"""
return self._execute('callproc', name, args)
[docs] def query(self, sql, parameters=None):
"""Issue a query asynchronously on the server, mogrifying the
parameters against the sql statement and yielding the results
as a :py:class:`Results <queries.tornado_session.Results>` object.
You **must** free the results that are returned by this method to
unlock the connection used to perform the query. Failure to do so
will cause your Tornado application to run out of connections.
:param str sql: The SQL statement
:param dict parameters: A dictionary of query parameters
:rtype: Results
:raises: queries.DataError
:raises: queries.DatabaseError
:raises: queries.IntegrityError
:raises: queries.InternalError
:raises: queries.InterfaceError
:raises: queries.NotSupportedError
:raises: queries.OperationalError
:raises: queries.ProgrammingError
"""
return self._execute('execute', sql, parameters)
[docs] def validate(self):
"""Validate the session can connect or has open connections to
PostgreSQL. As of ``1.10.3``
.. deprecated:: 1.10.3
As of 1.10.3, this method only warns about Deprecation
:rtype: bool
"""
warnings.warn(
'All functionality removed from this method', DeprecationWarning)
def _connect(self):
"""Connect to PostgreSQL, either by reusing a connection from the pool
if possible, or by creating the new connection.
:rtype: psycopg2.extensions.connection
:raises: pool.NoIdleConnectionsError
"""
future = concurrent.Future()
# Attempt to get a cached connection from the connection pool
try:
connection = self._pool_manager.get(self.pid, self)
self._connections[connection.fileno()] = connection
future.set_result(connection)
# Add the connection to the IOLoop
self._ioloop.add_handler(connection.fileno(),
self._on_io_events,
ioloop.IOLoop.WRITE)
except pool.NoIdleConnectionsError:
self._create_connection(future)
return future
def _create_connection(self, future):
"""Create a new PostgreSQL connection
:param tornado.concurrent.Future future: future for new conn result
"""
LOGGER.debug('Creating a new connection for %s', self.pid)
# Create a new PostgreSQL connection
kwargs = utils.uri_to_kwargs(self._uri)
try:
connection = self._psycopg2_connect(kwargs)
except (psycopg2.Error, OSError, socket.error) as error:
future.set_exception(error)
return
# Add the connection for use in _poll_connection
fd = connection.fileno()
self._connections[fd] = connection
def on_connected(cf):
"""Invoked by the IOLoop when the future is complete for the
connection
:param Future cf: The future for the initial connection
"""
if cf.exception():
self._cleanup_fd(fd, True)
future.set_exception(cf.exception())
else:
try:
# Add the connection to the pool
LOGGER.debug('Connection established for %s', self.pid)
self._pool_manager.add(self.pid, connection)
except (ValueError, pool.PoolException) as err:
LOGGER.exception('Failed to add %r to the pool', self.pid)
self._cleanup_fd(fd)
future.set_exception(err)
return
self._pool_manager.lock(self.pid, connection, self)
# Added in because psycopg2cffi connects and leaves the
# connection in a weird state: consts.STATUS_DATESTYLE,
# returning from Connection._setup without setting the state
# as const.STATUS_OK
if utils.PYPY:
connection.status = extensions.STATUS_READY
# Register the custom data types
self._register_unicode(connection)
self._register_uuid(connection)
# Set the future result
future.set_result(connection)
# Add a future that fires once connected
self._futures[fd] = concurrent.Future()
self._ioloop.add_future(self._futures[fd], on_connected)
# Add the connection to the IOLoop
self._ioloop.add_handler(connection.fileno(),
self._on_io_events,
ioloop.IOLoop.WRITE)
def _execute(self, method, query, parameters=None):
"""Issue a query asynchronously on the server, mogrifying the
parameters against the sql statement and yielding the results
as a :py:class:`Results <queries.tornado_session.Results>` object.
This function reduces duplicate code for callproc and query by getting
the class attribute for the method passed in as the function to call.
:param str method: The method attribute to use
:param str query: The SQL statement or Stored Procedure name
:param list|dict parameters: A dictionary of query parameters
:rtype: Results
:raises: queries.DataError
:raises: queries.DatabaseError
:raises: queries.IntegrityError
:raises: queries.InternalError
:raises: queries.InterfaceError
:raises: queries.NotSupportedError
:raises: queries.OperationalError
:raises: queries.ProgrammingError
"""
future = concurrent.Future()
def on_connected(cf):
"""Invoked by the future returned by self._connect"""
if cf.exception():
future.set_exception(cf.exception())
return
# Get the psycopg2 connection object and cursor
conn = cf.result()
cursor = self._get_cursor(conn)
def completed(qf):
"""Invoked by the IOLoop when the future has completed"""
if qf.exception():
self._incr_exceptions(conn)
err = qf.exception()
LOGGER.debug('Cleaning cursor due to exception: %r', err)
self._exec_cleanup(cursor, conn.fileno())
future.set_exception(err)
else:
self._incr_executions(conn)
value = Results(cursor, self._exec_cleanup, conn.fileno())
future.set_result(value)
# Setup a callback to wait on the query result
self._futures[conn.fileno()] = concurrent.Future()
# Add the future to the IOLoop
self._ioloop.add_future(self._futures[conn.fileno()],
completed)
# Get the cursor, execute the query
func = getattr(cursor, method)
try:
func(query, parameters)
except Exception as error:
future.set_exception(error)
# Ensure the pool exists for the connection
self._ensure_pool_exists()
# Grab a connection to PostgreSQL
self._ioloop.add_future(self._connect(), on_connected)
# Return the future for the query result
return future
def _exec_cleanup(self, cursor, fd):
"""Close the cursor, remove any references to the fd in internal state
and remove the fd from the ioloop.
:param psycopg2.extensions.cursor cursor: The cursor to close
:param int fd: The connection file descriptor
"""
LOGGER.debug('Closing cursor and cleaning %s', fd)
try:
cursor.close()
except (psycopg2.Error, psycopg2.Warning) as error:
LOGGER.debug('Error closing the cursor: %s', error)
self._cleanup_fd(fd)
# If the cleanup callback exists, remove it
if self._cleanup_callback:
self._ioloop.remove_timeout(self._cleanup_callback)
# Create a new cleanup callback to clean the pool of idle connections
self._cleanup_callback = self._ioloop.add_timeout(
self._ioloop.time() + self._pool_idle_ttl + 1,
self._pool_manager.clean, self.pid)
def _cleanup_fd(self, fd, close=False):
"""Ensure the socket socket is removed from the IOLoop, the
connection stack, and futures stack.
:param int fd: The fd # to cleanup
"""
self._ioloop.remove_handler(fd)
if fd in self._connections:
try:
self._pool_manager.free(self.pid, self._connections[fd])
except pool.ConnectionNotFoundError:
pass
if close:
self._connections[fd].close()
del self._connections[fd]
if fd in self._futures:
del self._futures[fd]
def _incr_exceptions(self, conn):
"""Increment the number of exceptions for the current connection.
:param psycopg2.extensions.connection conn: the psycopg2 connection
"""
self._pool_manager.get_connection(self.pid, conn).exceptions += 1
def _incr_executions(self, conn):
"""Increment the number of executions for the current connection.
:param psycopg2.extensions.connection conn: the psycopg2 connection
"""
self._pool_manager.get_connection(self.pid, conn).executions += 1
def _on_io_events(self, fd=None, _events=None):
"""Invoked by Tornado's IOLoop when there are events for the fd
:param int fd: The file descriptor for the event
:param int _events: The events raised
"""
if fd not in self._connections:
LOGGER.warning('Received IO event for non-existing connection')
return
self._poll_connection(fd)
def _poll_connection(self, fd):
"""Check with psycopg2 to see what action to take. If the state is
POLL_OK, we should have a pending callback for that fd.
:param int fd: The socket fd for the postgresql connection
"""
try:
state = self._connections[fd].poll()
except (OSError, socket.error) as error:
self._ioloop.remove_handler(fd)
if fd in self._futures and not self._futures[fd].done():
self._futures[fd].set_exception(
psycopg2.OperationalError('Connection error (%s)' % error)
)
except (psycopg2.Error, psycopg2.Warning) as error:
if fd in self._futures and not self._futures[fd].done():
self._futures[fd].set_exception(error)
else:
if state == extensions.POLL_OK:
if fd in self._futures and not self._futures[fd].done():
self._futures[fd].set_result(True)
elif state == extensions.POLL_WRITE:
self._ioloop.update_handler(fd, ioloop.IOLoop.WRITE)
elif state == extensions.POLL_READ:
self._ioloop.update_handler(fd, ioloop.IOLoop.READ)
elif state == extensions.POLL_ERROR:
self._ioloop.remove_handler(fd)
if fd in self._futures and not self._futures[fd].done():
self._futures[fd].set_exception(
psycopg2.Error('Poll Error'))
def _psycopg2_connect(self, kwargs):
"""Return a psycopg2 connection for the specified kwargs. Extend for
use in async session adapters.
:param dict kwargs: Keyword connection args
:rtype: psycopg2.extensions.connection
"""
kwargs['async'] = True
return psycopg2.connect(**kwargs)