# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
##
## Copyright (C) 2007-2013 Async Open Source <http://www.async.com.br>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
"""
Kiwi integration for Stoq/Storm
"""
import re
import threading
import Queue
import glib
import gobject
from kiwi.python import Settable
from kiwi.utils import gsignal
from storm import Undef
from storm.database import Connection, convert_param_marks
from storm.expr import compile, And, Or, Like, Not, Alias, State, Lower
from storm.tracer import trace
import psycopg2
import psycopg2.extensions
from stoqlib.database.expr import Date, StoqNormalizeString
from stoqlib.database.interfaces import ISearchFilter
from stoqlib.database.settings import db_settings
from stoqlib.database.viewable import Viewable
[docs]class QueryState(object):
def __init__(self, search_filter):
"""
Create a new QueryState object.
:param search_filter: search filter this query state is associated with
:type search_filter: :class:`SearchFilter`
"""
self.filter = search_filter
[docs]class NumberQueryState(QueryState):
"""
Create a new NumberQueryState object.
:cvar value: number
"""
(EQUALS,
DIFFERENT) = range(2)
def __init__(self, filter, value, mode=EQUALS):
QueryState.__init__(self, filter)
self.mode = mode
self.value = value
def __repr__(self):
return '<NumberQueryState value=%r>' % (self.value,)
[docs]class NumberIntervalQueryState(QueryState):
"""
Create a new NumberIntervalQueryState object.
:cvar start: number
:cvar end: number
"""
def __init__(self, filter, start, end):
QueryState.__init__(self, filter)
self.start = start
self.end = end
def __repr__(self):
return '<NumberIntervalQueryState start=%r end=%r>' % (self.start, self.end)
[docs]class StringQueryState(QueryState):
"""
Create a new StringQueryState object.
:cvar text: string
"""
(CONTAINS_EXACTLY,
IDENTICAL_TO,
NOT_CONTAINS,
CONTAINS_ALL) = range(4)
def __init__(self, filter, text, mode=CONTAINS_ALL):
QueryState.__init__(self, filter)
self.mode = mode
self.text = text
def __repr__(self):
return '<StringQueryState text=%r>' % (self.text,)
[docs]class DateQueryState(QueryState):
"""
Create a new DateQueryState object.
:cvar date: date
"""
def __init__(self, filter, date):
QueryState.__init__(self, filter)
self.date = date
def __repr__(self):
return '<DateQueryState date=%r>' % (self.date,)
[docs]class DateIntervalQueryState(QueryState):
"""
Create a new DateIntervalQueryState object.
:cvar start: start of interval
:cvar end: end of interval
"""
def __init__(self, filter, start, end):
QueryState.__init__(self, filter)
self.start = start
self.end = end
def __repr__(self):
return '<DateIntervalQueryState start=%r, end=%r>' % (
self.start, self.end)
[docs]class BoolQueryState(QueryState):
"""
Create a new BoolQueryState object.
:cvar value: value of the query state
"""
def __init__(self, filter, value):
QueryState.__init__(self, filter)
self.value = value
def __repr__(self):
return '<BoolQueryState value=%r>' % (self.value)
[docs]class MultiQueryState(QueryState):
"""Query state for objects."""
def __init__(self, filter, values):
super(MultiQueryState, self).__init__(filter)
self.values = values
def __repr__(self):
return '<MultiQueryState values=%r>' % (self.values, )
[docs]class AsyncResultSet(object):
"""Resultset returned by :class:`AsyncQueryOperation`.
This should perform exactly like a
:class:`stoqlib.database.runtime.StoqlibResultSet`. Some methods
that are not defined here will be forwarded to it.
The original resultset can be accessed by :attr:`.resultset`
"""
def __init__(self, resultset, result):
"""
:param resultset: the original
:class:stoqlib.database.runtime.StoqlibResultset`. It will
be used mostly to help constructing the objects on iteration
:param result: the :class:`storm.database.Result` queried by
the query operation
"""
self.resultset = resultset
self._result = result
def __iter__(self):
for values in self._result:
yield self.resultset._load_objects(self._result, values)
def __len__(self):
return self._result.rowcount
def __getattr__(self, attr):
return getattr(self.resultset, attr)
[docs]class AsyncQueryOperation(gobject.GObject):
(STATUS_WAITING,
STATUS_EXECUTING,
STATUS_FINISHED,
STATUS_CANCELLED) = range(4)
gsignal('finish')
def __init__(self, store, resultset, expr):
"""
:param store: database store
:param resultset: resultset that will be used to construct
the result from.
:param expr: query expression to execute
"""
gobject.GObject.__init__(self)
self.status = self.STATUS_WAITING
self.resultset = resultset
self.expr = expr
self._conn = store._connection
self._async_cursor = None
self._async_conn = None
self._statement = None
self._parameters = None
#
# Public API
#
[docs] def execute(self, async_conn):
"""Executes a query within an asyncronous psycopg2 connection
"""
if self.status == self.STATUS_CANCELLED:
return
self.status = self.STATUS_EXECUTING
# Async variant of Connection.execute() in storm/database.py
state = State()
statement = compile(self.expr, state)
stmt = convert_param_marks(statement, "?", "%s")
self._async_cursor = async_conn.cursor()
self._async_conn = async_conn
# This is postgres specific, see storm/databases/postgres.py
self._statement = stmt.encode('utf-8')
self._parameters = tuple(Connection.to_database(state.parameters))
trace("connection_raw_execute", self._conn,
self._async_cursor, self._statement, self._parameters)
self._async_cursor.execute(self._statement,
self._parameters)
# This can happen if another thread cancelled this while the cursor was
# executing. In that case, it is not interested in the retval anymore
if self.status == self.STATUS_CANCELLED:
return
self.status = self.STATUS_FINISHED
glib.idle_add(self._on_finish)
[docs] def get_result(self):
"""Get operation result.
Note that this can only be called when the *finish* signal
has been emitted.
:returns: a :class:`AsyncResultSet` containing the result
"""
assert self.status == self.STATUS_FINISHED
trace("connection_raw_execute_success", self._conn,
self._async_cursor, self._statement, self._parameters)
result = self._conn.result_factory(self._conn,
self._async_cursor)
return AsyncResultSet(self.resultset, result)
[docs] def cancel(self):
"""Cancel the operation scheduling"""
self.status = self.STATUS_CANCELLED
#
# Private
#
def _on_finish(self):
if self.status == self.STATUS_CANCELLED:
return
self.emit('finish')
gobject.type_register(AsyncQueryOperation)
class _OperationExecuter(threading.Thread):
_SINGLETON = None
def __init__(self):
super(_OperationExecuter, self).__init__()
self._conn = psycopg2.connect(db_settings.get_store_dsn())
self._queue = Queue.Queue()
@classmethod
def get_instance(cls):
if cls._SINGLETON is None:
cls._SINGLETON = cls()
cls._SINGLETON.daemon = True
cls._SINGLETON.start()
return cls._SINGLETON
def run(self):
while True:
operation = self._queue.get()
operation.execute(self._conn)
self._queue.task_done()
def schedule(self, operation):
assert isinstance(operation, AsyncQueryOperation)
self._queue.put(operation)
[docs]class QueryExecuter(object):
"""
A QueryExecuter is responsible for taking the state (as in QueryState)
objects from search filters and construct a query.
The query is constructed using storm.
:cvar default_search_limit: The default search limit.
"""
def __init__(self, store=None):
self._columns = {}
self._limit = -1
self.store = store
self.search_spec = None
self._query_callbacks = []
self._filter_query_callbacks = {}
self._query = self._default_query
self.post_result = None
self._operation_executer = _OperationExecuter.get_instance()
# Public API
[docs] def search(self, states=None, resultset=None, limit=None):
"""
Execute a search.
:param resultset: resultset to use, if ``None`` we will
just execute a normal store.find() on the search_spec set in
.set_search_spec()
:param states:
:param limit: use this limit instead of the one defined by set_limit()
"""
if resultset is None:
resultset = self._query(self.store)
resultset = self._parse_states(resultset, states)
limit = limit or self._limit
if limit > 0:
resultset.config(limit=limit)
return resultset
[docs] def search_async(self, states=None, resultset=None, limit=None):
"""
Execute a search asynchronously.
This uses a separate psycopg2 connection which is lazily
created just before executing the first async query.
This method returns an operation for which a signal **finish** is
emitted when the query has finished executing. In that callback,
:meth:`.AsyncQueryOperation.finish` should be called, eg:
>>> from stoqlib.api import api
>>> from stoqlib.domain.person import Person
>>> default_store = api.get_default_store()
>>> resultset = default_store.find(Person)
>>> qe = QueryExecuter(store=default_store)
>>> operation = qe.search_async(resultset=resultset)
>>> def finished(operation, loop):
... operation.get_result()
... # use result
... loop.quit()
Create a loop for testing
>>> loop = glib.MainLoop()
>>> sig_id = operation.connect('finish', finished, loop)
>>> loop.run()
:param states:
:param resultset: a resultset or ``None``
:returns: a query operation
"""
if resultset is None:
resultset = self._query(self.store)
resultset = self._parse_states(resultset, states)
limit = limit or self._limit
if limit > 0:
resultset.config(limit=limit)
operation = AsyncQueryOperation(self.store,
resultset,
resultset._get_select())
self._operation_executer.schedule(operation)
return operation
[docs] def set_limit(self, limit):
"""
Set the maximum number of result items to return in a search query.
:param limit:
"""
self._limit = limit
[docs] def get_limit(self):
return self._limit
[docs] def set_filter_columns(self, search_filter, columns, use_having=False):
"""Set what columns should be filtered for the search_filter
:param columns: Should be a list of column names or properties to be
used in the query. If they are column names (strings), we will call
getattr on the search_spec to get the property for the query construction.
"""
if not ISearchFilter.providedBy(search_filter):
pass
#raise TypeError("search_filter must implement ISearchFilter")
assert not search_filter in self._columns
self._columns[search_filter] = (columns, use_having)
[docs] def set_search_spec(self, search_spec):
"""
Sets the Storm search_spec for this executer
:param search_spec: a Storm search_spec
"""
self.search_spec = search_spec
[docs] def add_query_callback(self, callback):
"""
Adds a generic query callback
:param callback: a callable
"""
if not callable(callback):
raise TypeError
self._query_callbacks.append(callback)
[docs] def add_filter_query_callback(self, search_filter, callback,
use_having=False):
"""
Adds a query callback for the filter search_filter
:param search_filter: a search filter
:param callback: a callable
"""
if not ISearchFilter.providedBy(search_filter):
raise TypeError
if not callable(callback):
raise TypeError
l = self._filter_query_callbacks.setdefault(search_filter, [])
l.append((callback, use_having))
[docs] def set_query(self, callback):
"""
Overrides the default query mechanism.
:param callback: a callable which till take two arguments (query, store)
"""
if callback is None:
callback = self._default_query
elif not callable(callback):
raise TypeError
self._query = callback
[docs] def get_post_result(self, result):
descs, query = self.search_spec.post_search_callback(result)
# This should not be present in the query, since post_search_callback
# should only use aggregate functions.
query.order_by = Undef
query.group_by = Undef
store = self.store
values = store.execute(query).get_one()
assert len(descs) == len(values), (descs, values)
data = {}
for desc, value in zip(descs, list(values)):
data[desc] = value
return Settable(**data)
[docs] def get_ordered_result(self, result, attribute):
if issubclass(self.search_spec, Viewable):
# sorting viewables is not supported with strings, since that
# viewables can query more than one search_spec at once, and each
# search_spec may have columns with the same name.
if isinstance(attribute, str):
attribute = getattr(self.search_spec, attribute)
return result.order_by(attribute)
# Private API
def _default_query(self, store):
return store.find(self.search_spec)
[docs] def parse_states(self, states):
"""Parses the state given and return a tuple where the first element is
the queries that should be used, and the second is a 'having' that
should be used with the query.
"""
if states is None:
return None, None
search_spec = self.search_spec
if search_spec is None:
raise ValueError("search_spec cannot be None")
queries = []
having = []
for state in states:
search_filter = state.filter
assert state.filter
# Column query
if search_filter in self._columns:
columns, use_having = self._columns[search_filter]
query = self._construct_state_query(search_spec, state, columns)
if query and use_having:
having.append(query)
elif query:
queries.append(query)
# Custom per filter/state query.
elif search_filter in self._filter_query_callbacks:
for callback, use_having in self._filter_query_callbacks[search_filter]:
query = callback(state)
if query and use_having:
having.append(query)
elif query:
queries.append(query)
else:
if (self._query == self._default_query and
not self._query_callbacks):
raise ValueError(
"You need to add a search column or a query callback "
"for filter %s" % (search_filter))
for callback in self._query_callbacks:
query = callback(states)
if query:
queries.append(query)
return queries, having
def _parse_states(self, resultset, states):
queries, having = self.parse_states(states)
if queries:
resultset = resultset.find(And(*queries))
if having:
resultset = resultset.having(And(*having))
return resultset
def _construct_state_query(self, search_spec, state, columns):
queries = []
for column in columns:
query = None
if isinstance(column, str):
table_field = getattr(search_spec, column)
else:
table_field = column
if isinstance(table_field, Alias):
table_field = table_field.expr
if isinstance(state, NumberQueryState):
query = self._parse_number_state(state, table_field)
elif isinstance(state, NumberIntervalQueryState):
query = self._parse_number_interval_state(state, table_field)
elif isinstance(state, StringQueryState):
query = self._parse_string_state(state, table_field)
elif isinstance(state, DateQueryState):
query = self._parse_date_state(state, table_field)
elif isinstance(state, DateIntervalQueryState):
query = self._parse_date_interval_state(state, table_field)
elif isinstance(state, BoolQueryState):
query = self._parse_bool_state(state, table_field)
elif isinstance(state, MultiQueryState):
query = self._parse_multi_query_state(state, table_field)
else:
raise NotImplementedError(state.__class__.__name__)
if query:
queries.append(query)
if queries:
return Or(*queries)
def _parse_number_state(self, state, table_field):
if state.value is None:
return
if state.mode == NumberQueryState.EQUALS:
return table_field == state.value
elif state.mode == NumberQueryState.DIFFERENT:
return table_field != state.value
else:
raise AssertionError
def _parse_number_interval_state(self, state, table_field):
queries = []
if state.start:
queries.append(table_field >= state.start)
if state.end:
queries.append(table_field <= state.end)
if queries:
return And(*queries)
def _parse_string_state(self, state, table_field):
if not state.text.strip():
return
def _like(value):
return Like(StoqNormalizeString(table_field),
StoqNormalizeString(u'%%%s%%' % value.lower()),
case_sensitive=False)
if state.mode == StringQueryState.CONTAINS_ALL:
queries = [_like(word) for word in re.split('[ \n\r]', state.text) if word]
retval = And(*queries)
elif state.mode == StringQueryState.IDENTICAL_TO:
retval = Lower(table_field) == state.text.lower()
elif state.mode == StringQueryState.CONTAINS_EXACTLY:
retval = (_like(state.text.lower()))
elif state.mode == StringQueryState.NOT_CONTAINS:
queries = [Not(_like(word)) for word in state.text.split(' ') if word]
retval = And(*queries)
else: # pragma nocoverage
raise AssertionError
return retval
def _parse_date_state(self, state, table_field):
if state.date:
return Date(table_field) == Date(state.date)
def _parse_date_interval_state(self, state, table_field):
queries = []
if state.start:
queries.append(Date(table_field) >= Date(state.start))
if state.end:
queries.append(Date(table_field) <= Date(state.end))
if queries:
return And(*queries)
def _parse_bool_state(self, state, table_field):
return table_field == state.value
def _parse_multi_query_state(self, state, table_field):
return table_field.is_in(state.values)