Source code for stoqlib.database.runtime

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4

##
## Copyright (C) 2005-2013 Async Open Source
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
##
""" Runtime routines for applications"""

from collections import namedtuple
import logging
import sys
import warnings
import weakref
import os

from kiwi.component import get_utility, provide_utility
from storm import Undef
from storm.expr import SQL, Avg
from storm.info import get_obj_info
from storm.store import Store, ResultSet
from storm.tracer import trace

from stoqlib.database.exceptions import InterfaceError, OperationalError
from stoqlib.database.interfaces import (
    ICurrentBranch,
    ICurrentBranchStation, ICurrentUser)
from stoqlib.database.expr import is_sql_identifier
from stoqlib.database.orm import ORMObject
from stoqlib.database.properties import Identifier
from stoqlib.database.settings import db_settings
from stoqlib.database.viewable import Viewable
from stoqlib.exceptions import DatabaseError, LoginError
from stoqlib.lib.decorators import public
from stoqlib.lib.interfaces import IAppInfo
from stoqlib.lib.message import error, yesno
from stoqlib.lib.translation import stoqlib_gettext
from stoqlib.net.socketutils import get_hostname

_ = stoqlib_gettext
log = logging.getLogger(__name__)

#: the default store, considered read-only in Stoq
_default_store = None

#: list of global stores used by the application,
#: should not be used by anything except autoreload_object()
_stores = weakref.WeakSet()


[docs]def autoreload_object(obj, obj_store=False): """Autoreload object in any other existing store. This will go through every open store and see if the object is alive in the store. If it is, it will be marked for autoreload the next time its used. :param obj_store: if we should also autoreload the current store of the object """ for store in _stores: if not obj_store and Store.of(obj) is store: continue alive = store._alive.get((obj.__class__, (obj.id,))) if alive: # Just to make sure its not modified before reloading it, otherwise, # we would lose the changes assert not store._is_dirty(get_obj_info(obj)) store.autoreload(alive)
[docs]class StoqlibResultSet(ResultSet): # FIXME: Remove. See bug 4985 def __nonzero__(self): warnings.warn("use self.is_empty()", DeprecationWarning, stacklevel=2) return not self.is_empty()
[docs] def avg(self, attribute): # ResultSet.avg() is not used because storm returns it as a float return self._aggregate(Avg, attribute)
[docs] def set_viewable(self, viewable): """Configures this result set to load the results as instances of the given viewable. :param viewable: A :class:`Viewable <stoqlib.database.viewable.Viewable>` """ self._viewable = viewable # ResultSet needs this to create the query correctly self._tables = viewable.tables if viewable.group_by: self.group_by(*viewable.group_by) if viewable.having: self.having(viewable.having)
def _load_viewable(self, values): """Converts the result of this result set into an instance of the configured viewable. """ instance = self._viewable() # This will be removed later instance._store = self._store identifiers = [] for attr, value in zip(self._viewable.cls_attributes, values): if type(value) is Identifier: identifiers.append(value) setattr(instance, attr, value) branch = getattr(instance, 'branch', None) if branch: for i in identifiers: i.prefix = branch.acronym or '' return instance def _load_objects(self, result, values): # Overwrite the default _load_objects so we can convert the results to # viewable instances (if necessary) values = super(StoqlibResultSet, self)._load_objects(result, values) if hasattr(self, '_viewable'): values = self._load_viewable(values) return values
[docs] def find(self, *args, **kwargs): # We only need this workaround if we are querying a viewable and the # viewable has a group_by workaround_needed = hasattr(self, '_viewable') and self._group_by is not Undef if workaround_needed: # Storm is not letting us call store.find(Viewable, args1).find(args2), # but it should be possible, since that the same as writing # store.find(Viewable, And(args1, args2)) group_by = self._group_by[:] self._group_by = Undef resultset = super(StoqlibResultSet, self).find(*args, **kwargs) resultset._group_by = group_by self._group_by = group_by return resultset return super(StoqlibResultSet, self).find(*args, **kwargs)
def _load_fast_object(self, named_tuples, values): objects = [] values_start = values_end = 0 for nt in named_tuples: if nt is None: # This means its an single expression values_end += 1 objects.append(values[values_start]) else: values_end += len(nt._fields) objects.append(nt(*values[values_start:values_end])) values_start = values_end if self._find_spec.is_tuple: return tuple(objects) else: return objects[0]
[docs] def fast_iter(self): # First build all named tuples named_tuples = [] for is_expr, info in self._find_spec._cls_spec_info: if is_expr: named_tuples.append(None) else: named_tuples.append(namedtuple(info.cls.__name__, [i.name for i in info.columns])) is_viewable = hasattr(self, '_viewable') # Then interate over the results bypassing storm object creation for values in self._store._connection.execute(self._get_select()): value = self._load_fast_object(named_tuples, values) if is_viewable: value = self._load_viewable(value) yield value
[docs]class StoqlibStore(Store): """The Stoqlib Store. This is the Stoqlib API to access a database. It represents more or less a database transaction, after modifying an object you need to either :meth:`.commit` or :meth:`.rollback` the store. The primary way of querying object from a store is via the :meth:`.find` method, but you can also use :meth:`.Store.get` if you know the id of the object. find returns a ResultSet, see the Storm documentation for information about that. Objects needs to be added to a store. This can either be done via :meth:`StoqlibStore.add` or passing in the store parameter to a ORMObject/Domain object. If you want to delete an object you use :meth:`StoqlibStore.remove` You normally create a store using :func:`.new_store`, it needs to be :meth:`close` when you're done or a database connection will be leaked. See also: `storm manual <https://storm.canonical.com/Manual>`__ `storm tutorial <https://storm.canonical.com/Tutorial>`__ :attribute retval: The return value of a operation this transaction is covering. Usually a domain object that was modified. By default it's ``True``, but can be set to ``False`` to do a rollback instead of a commit on :meth:`stoqlib.api.StoqApi.trans` """ _result_set_factory = StoqlibResultSet def __init__(self, database=None, cache=None): """ Creates a new store :param database: the database to connect to or ``None`` :param cache: storm cache to use or ``None`` """ self._committing = False self._savepoints = [] self._pending_count = [0] self.retval = True self.obsolete = False if database is None: database = get_default_store().get_database() Store.__init__(self, database=database, cache=cache) _stores.add(self) trace('transaction_create', self) self._setup_application_name() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): rv = self.retval if exc_type is None else False self.committed = self.confirm(commit=rv) self.close() def _set_dirty(self, obj_info): # Store calls _set_dirty when any object inside it gets modified. # We use this to count if any change happened inside the actual savepoint self._pending_count[-1] += 1 super(StoqlibStore, self)._set_dirty(obj_info)
[docs] def find(self, cls_spec, *args, **kwargs): # Overwrite the default find method so we can support querying our own # viewables. If the cls_spec is a Viewable, we first get the real # cls_spec from the viewable and after the query is executed, the # results will be converted in instances of the viewable viewable = None if not isinstance(cls_spec, tuple): try: is_viewable = issubclass(cls_spec, Viewable) except TypeError: is_viewable = False if is_viewable: args = list(args) viewable = cls_spec # Get the actual class spec for the viewable cls_spec = viewable.cls_spec if viewable.clause: args.append(viewable.clause) # kwargs are based on the properties of the viewable. We need to convert # it to the properties of the real tables. if viewable and kwargs: for key in kwargs.copy(): args.append(getattr(viewable, key) == kwargs.pop(key)) resultset = super(StoqlibStore, self).find(cls_spec, *args, **kwargs) if viewable: resultset.set_viewable(viewable) return resultset
[docs] def get_lock_database_query(self): """ Fetch a database query that needs to be executed to lock the database, suitable for applying migration patches. :returns: a database query in string form """ res = self.execute( "SELECT tablename FROM pg_tables WHERE schemaname = 'public'") tables = ', '.join([i[0] for i in res.get_all()]) res.close() if not tables: return '' return 'LOCK TABLE %s IN ACCESS EXCLUSIVE MODE NOWAIT;' % tables
[docs] def lock_database(self): """Tries to lock the database. Raises an DatabaseError if the locking has failed (ie, other clients are using the database). """ try: # Locking requires a transaction to work, but this conection does # not begin one explicitly self.execute('BEGIN TRANSACTION') self.execute(self.get_lock_database_query()) except OperationalError as e: raise DatabaseError("ERROR: Could not obtain lock: %s" % (e, ))
[docs] def unlock_database(self): """Unlock a previously locked database.""" self.execute('ROLLBACK')
[docs] def table_exists(self, table_name): """Check if a table exists :param table_name: name of the table to check for :returns: ``True`` if the table exists """ res = self.execute( SQL("SELECT COUNT(relname) FROM pg_class WHERE relname = ?", # FIXME: Figure out why this is not comming as unicode (unicode(table_name), ))) return res.get_one()[0]
[docs] def list_references(self, column): """Returns a list of columns that reference the givem column This will return a list of tuples (source table, source column, dest table, dest column, update, delete) where: - source table and column: The column that reference the given column - dest table and column: The referenced column (the same as the given column argument) - update : The ON UPDATE action for the reference. 'a' for 'NO ACTION', 'c' for CASCADE - delete: The same as update. """ table_name = unicode(column.cls.__storm_table__) column_name = unicode(column.name) query = """ SELECT DISTINCT src_pg_class.relname AS srctable, src_pg_attribute.attname AS srccol, ref_pg_class.relname AS reftable, ref_pg_attribute.attname AS refcol, pg_constraint.confupdtype, pg_constraint.confdeltype FROM pg_constraint JOIN pg_class AS src_pg_class ON src_pg_class.oid = pg_constraint.conrelid JOIN pg_class AS ref_pg_class ON ref_pg_class.oid = pg_constraint.confrelid JOIN pg_attribute AS src_pg_attribute ON src_pg_class.oid = src_pg_attribute.attrelid JOIN pg_attribute AS ref_pg_attribute ON ref_pg_class.oid = ref_pg_attribute.attrelid, generate_series(0,10) pos(n) WHERE contype = 'f' AND ref_pg_class.relname = ? AND ref_pg_attribute.attname = ? AND src_pg_attribute.attnum = pg_constraint.conkey[n] AND ref_pg_attribute.attnum = pg_constraint.confkey[n] AND NOT src_pg_attribute.attisdropped AND NOT ref_pg_attribute.attisdropped ORDER BY src_pg_class.relname, src_pg_attribute.attname """ return self.execute(query, (table_name, column_name)).get_all()
[docs] def quote_query(self, query, args=()): """Prepare a query for executing it. This is suitable for serializing a query to disk so we can pass it in to a database command line tool. It basically just escaped the arguments and generates a query that can be executed :param query: the database query, a string :param args: args that are to be escaped. :returns: database statement """ cursor = self._connection.build_raw_cursor() # mogrify is only available in psycopg2 stmt = cursor.mogrify(query, args) cursor.close() return stmt
[docs] def maybe_remove(self, obj): """Maybe remove an object from the database This will depend on the parameter SYNCHRONIZED_MODE. When working with synchronized databases, we should be very carefull when removing objects, since they will not be removed from the remote database (at least until we fix bug 5581) """ from stoqlib.lib.parameters import sysparam if not sysparam.get_bool('SYNCHRONIZED_MODE'): self.remove(obj)
[docs] def get_pending_count(self): """Get the quantity of pending changes Every time :meth:`.add_created_object`, :meth:`.add_deleted_object` or :meth:`.add_modified_object` gets called, this will increase by 1. Note that this is in sync with savepoints so, if before doing a savepoint there was 10 pending changes, then 2 more are done, when rolling back to it it will be 10 again. The same applies to a full rollback where this will go to 0. """ return sum(self._pending_count)
@public(since="1.5.0")
[docs] def commit(self, close=False): """Commits a database. This needs to be done to submit the actually inserts to the database. :param close: If ``True``, the store will also be closed after committed. """ self._check_obsolete() self._committing = True # the cache will be cleared when commiting, so store them here # and autoreload them after commit touched_objs = [] for obj_info in self._cache.get_cached(): obj = obj_info.get_obj() if obj is not None: touched_objs.append(obj) super(StoqlibStore, self).commit() trace('transaction_commit', self) self._pending_count = [0] self._savepoints = [] # Reload objects on all other opened stores for obj in touched_objs: autoreload_object(obj) if close: self.close() self._committing = False
[docs] def flush(self): """Flush the transaction to the database This will transform all modifications done on domain objs in an sql command and execute them on the database. Note that this will execute the sql on the transaction, but only will be commited when :meth:`.commit` is called. """ super(StoqlibStore, self).flush() # We only call 'before-commited' when flush is being called by commit if not self._committing: return # We need to block implicit flushes here since if a lot of objects are # updated at once, and those objects fetch other objects from the # databas during the before-commited hook, the store.get call would # trigger another flush and that would end up in an maximum recursion # depth error. self.block_implicit_flushes() for obj_info in self._cache.get_cached(): obj_info.event.emit("before-commited") self.unblock_implicit_flushes() # If objs got dirty when calling the hooks, flush again if self._dirty: self.flush()
@public(since="1.5.0")
[docs] def rollback(self, name=None, close=True): """Rollback the transaction :param name: If supplied limit changes to the last savepoint :param close: If ``True``, the connection will also be closed and will not be available for use anymore. If False, only a rollback is done and it will still be possible to use it for other queries. """ self._check_obsolete() if name: self.rollback_to_savepoint(name) else: super(StoqlibStore, self).rollback() # If we rollback completely, we need to clear all savepoints self._savepoints = [] self._pending_count = [0] # Rolling back resets the application name. self._setup_application_name() # sqlobject closes the connection after a rollback if close: self.close()
@public(since="1.5.0")
[docs] def close(self): """Close the store. Closes the socket that represents that database connection, this needs to be called when you finished using the store. """ trace('transaction_close', self) self._check_obsolete() super(StoqlibStore, self).close() self.obsolete = True
@public(since="1.5.0")
[docs] def fetch(self, obj): """Fetches an existing object in the context of this store. This is useful to 'move' an object from one store to another. :param obj: object to fetch :returns: the object in the context of this store """ self._check_obsolete() if obj is None: return None if isinstance(obj, Viewable): return self.find(type(obj), id=obj.id).one() elif isinstance(obj, ORMObject): return self.get(type(obj), obj.id) else: raise TypeError("obj must be a ORMObject or a Viewable, not %r" % (obj, ))
[docs] def remove(self, obj): """Remove an objet from the store The associated row will be deleted from the database. """ # Overwrite store.remove so we can emit our own event for when the # object is goin to be deleted (but before anything is actually modified) obj_info = get_obj_info(obj) obj_info.event.emit("before-removed") super(StoqlibStore, self).remove(obj)
[docs] def savepoint(self, name): """Creates a database savepoint. This can be rolled back to using :meth:`.rollback_to_savepoint`. :param name: name of the savepoint """ self._check_obsolete() if not is_sql_identifier(name): raise ValueError("Invalid savepoint name: %r" % name) self.execute('SAVEPOINT %s' % name) self._savepoints.append(name) self._pending_count.append(0)
[docs] def rollback_to_savepoint(self, name): """Rollsback the store to a previous savepoint that was saved using :meth:`.savepoint` :param name: savepoint to move back to """ self._check_obsolete() if not is_sql_identifier(name): raise ValueError("Invalid savepoint name: %r" % name) if not name in self._savepoints: raise ValueError("Unknown savepoint: %r" % name) self.execute('ROLLBACK TO SAVEPOINT %s' % name) for savepoint in reversed(self._savepoints[:]): self._savepoints.remove(savepoint) self._pending_count.pop() if savepoint == name: break # Objects may have changed in this transaction. # Make sure to autorelad the original values after the rollback for obj_info in self._cache.get_cached(): self.autoreload(obj_info.get_obj())
[docs] def savepoint_exists(self, name): """Checks if the given savepoint's name exists :param name: the name of the savepoint :returns: ``True`` if the savepoint exists on this store, ``False`` otherwise. """ return name in self._savepoints
[docs] def confirm(self, commit): """Encapsulated method for committing/aborting changes in models. :param commit: True for commit, False for rollback :returns: True if it was committed, False otherwise """ # Allow False/None if commit: self.commit() else: self.rollback(close=False) return commit
# # Private # def _setup_application_name(self): """Sets a friendly name for postgres connection This name will appear when selecting from pg_stat_activity, for instance, and will allow to better debug the queries (specially when there is a deadlock) """ try: appinfo = get_utility(IAppInfo) except Exception: appname = 'stoq' else: appname = appinfo.get('name') or 'stoq' self.execute("SET application_name = '%s - %s - %s'" % ( (appname.lower(), get_hostname(), os.getpid()))) def _check_obsolete(self): if self.obsolete: raise InterfaceError("This transaction has already been closed")
[docs]def get_default_store(): """This function returns the default/primary store. Notice that this store is considered read-only inside Stoqlib applications. Only transactions can modify objects and should be created using new_store(). This store should not be closed, it will only close when we the application is shutdown. :returns: default store """ if _default_store is None: set_default_store(db_settings.create_store()) # We intentionally leave this open, it's the default # store and should only be closed when we close the # application return _default_store
[docs]def set_default_store(store): """This sets a new default store and closes the existing one if any. This is only called during Startup and should not be used elsewhere :param store: the new store to set """ global _default_store if store is None and _default_store is not None: _default_store.close() _default_store = store
[docs]def new_store(): """ Create a new transaction. :returns: a transaction """ log.debug('Creating a new transaction in %s()' % sys._getframe(1).f_code.co_name) return StoqlibStore()
# # User methods # def _register_branch_station(caller_store, station_name): import gtk from stoqlib.lib.parameters import sysparam if not sysparam.get_bool('DEMO_MODE'): fmt = _(u"The computer '%s' is not registered to the Stoq " u"server at %s.\n\n" u"Do you want to register it " u"(requires administrator access) ?") if not yesno(fmt % (station_name, db_settings.address), gtk.RESPONSE_YES, _(u"Register computer"), _(u"Quit")): raise SystemExit from stoqlib.gui.utils.login import LoginHelper h = LoginHelper(username="admin") try: user = h.validate_user() except LoginError as e: error(str(e)) if not user: error(_("Must login as 'admin'")) from stoqlib.domain.station import BranchStation with new_store() as store: branch = sysparam.get_object(store, 'MAIN_COMPANY') station = BranchStation.create(store, branch=branch, name=station_name) return caller_store.fetch(station)
[docs]def set_current_branch_station(store, station_name): """Registers the current station and the branch of the station as the current branch for the system :param store: a store :param station_name: name of the station to register """ # This is called from stoq-daemon, which doesn't know about Branch yet from stoqlib.lib.parameters import sysparam from stoqlib.domain.person import Branch Branch # pylint: disable=W0104 if station_name is None: station_name = get_hostname() station_name = unicode(station_name) from stoqlib.domain.station import BranchStation station = store.find(BranchStation, name=station_name).one() if station is None: station = _register_branch_station(store, station_name) if not station.is_active: error(_("The computer <u>%s</u> is not active in Stoq") % station_name, _("To solve this, open the administrator application " "and re-activate this computer.")) provide_utility(ICurrentBranchStation, station, replace=True) main_company = sysparam.get_object(store, 'MAIN_COMPANY') if not station.branch and main_company: with new_store() as commit_store: commit_station = commit_store.fetch(station) commit_station.branch = commit_store.fetch(main_company) # The station may still not be associated with a branch when creating an # empty database if station.branch: provide_utility(ICurrentBranch, station.branch, replace=True)
@public(since="1.5.0")
[docs]def get_current_user(store): """Fetch the user which is currently logged into the system or None None means that there are no utilities available which in turn should only happens during startup, for example when creating a new database or running the migration script, at that point no users are logged in :param store: a store :returns: currently logged in user or None :rtype: a LoginUser or ``None`` """ user = get_utility(ICurrentUser, None) if user is not None: return store.fetch(user)
@public(since="1.5.0")
[docs]def get_current_branch(store): """Fetches the current branch company. :param store: a store :returns: the current branch :rtype: a branch or ``None`` """ branch = get_utility(ICurrentBranch, None) if branch is not None: return store.fetch(branch)
@public(since="1.5.0")
[docs]def get_current_station(store): """Fetches the current station (computer) which we are running on :param store: a store :param: current station :rtype: BranchStation or ``None`` """ station = get_utility(ICurrentBranchStation, None) if station is not None: return store.fetch(station)