Source code for stoqlib.database.expr

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4

## Copyright (C) 2013 Async Open Source <>
## All rights reserved
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit:
## Author(s): Stoq Team <>

"""Database expressions.

This contains a list of expressions that are unsupported by Storm.
Most of them are specific to PostgreSQL

from storm.expr import (Expr, NamedFunc, PrefixExpr, SQL, ComparableExpr,
                        compile as expr_compile, FromExpr, Undef, EXPR,
                        is_safe_token, BinaryOper, SetExpr)

[docs]class Age(NamedFunc): """Given two datetimes, defines how the first is older than the second""" # __slots__ = () name = "AGE"
[docs]class Round(NamedFunc): """Rounds takes two arguments, first is numeric and second is integer, first one is the number to be round and the second is the requested precision. """ # See __slots__ = () name = "ROUND"
[docs]class NullIf(NamedFunc): """Returns null if first argument matches second argument e.g. NULLIF(x, '') could be written in python like (read None as NULL): x if x != '' else None """ # See __slots__ = () name = "NULLIF"
[docs]class Date(NamedFunc): """Extract the date part of a timestamp""" # # FIXME: This is actually an operator __slots__ = () name = "DATE"
[docs]class DateTrunc(NamedFunc): """Truncates a part of a datetime""" # __slots__ = () name = "DATE_TRUNC"
[docs]class Distinct(NamedFunc): # # FIXME: This is actually an operator __slots__ = () name = "DISTINCT"
[docs]class Field(SQL): def __init__(self, table, column): SQL.__init__(self, '%s.%s' % (table, column))
[docs]class Interval(PrefixExpr): """Defines a datetime interval""" # __slots__ = () prefix = "INTERVAL"
[docs]class TransactionTimestamp(NamedFunc): """Current date and time at the start of the current transaction""" # __slots__ = () name = "TRANSACTION_TIMESTAMP" date = lambda: None # pylint
[docs]class StatementTimestamp(NamedFunc): """Current date and time at the start of the current statement""" # __slots__ = () name = "STATEMENT_TIMESTAMP" date = lambda: None # pylint
[docs]class CharLength(NamedFunc): """The size of the char, just like len() in python""" # __slots__ = () name = "CHAR_LENGTH"
[docs]class LPad(NamedFunc): """Fill up the string to length by prepending the characters fill""" # __slots__ = () name = "LPAD"
[docs]class SplitPart(NamedFunc): """Split string on delimiter and return the given field""" # __slots__ = () name = "split_part"
[docs]class ArrayAgg(NamedFunc): __slots__ = () name = "array_agg"
[docs]class Contains(BinaryOper): __slots__ = () oper = " @> "
[docs]class IsContainedBy(BinaryOper): __slots__ = () oper = " <@ "
@expr_compile.when(Contains, IsContainedBy)
[docs]def compile_contains(expr_compile, expr, state): # We currently support only the first argument as a list. expr1 = "ARRAY[%s]" % ",".join(expr_compile(i, state) for i in expr.expr1) return '%s%s%s' % (expr1, expr.oper, expr_compile(expr.expr2, state))
[docs]class NotIn(BinaryOper): __slots__ = () oper = " NOT IN "
[docs]def compile_in(expr_compile, expr, state): expr1 = expr_compile(expr.expr1, state) state.precedence = 0 # We're forcing parenthesis here. return "%s %s (%s)" % (expr1, expr.oper, expr_compile(expr.expr2, state))
[docs]class StoqNormalizeString(NamedFunc): """This removes accents and other modifiers from a charater, it's similar to NLKD normailzation in unicode, but it is run inside the database. Note, this is very slow and should be avoided. In the future this will be replaced by fulltext search which does normalization in a cheaper way. """ # See functions.sql __slots__ = () name = "stoq_normalize_string"
[docs]class Case(ComparableExpr): """Works like a Python's if-then-else clause. .. line-block:: CASE WHEN <condition> THEN <result> [WHEN <condition> THEN <result>] END """ # # FIXME: Support several when clauses. __slots__ = ("condition", "result", "else_") prefix = "(unknown)" def __init__(self, condition, result, else_=None): self.condition = condition self.result = result self.else_ = else_
[docs]def compile_case(compile, expr, state): stmt = "CASE WHEN %s THEN %s" % (expr_compile(expr.condition, state), expr_compile(expr.result, state)) if expr.else_ is not None: stmt += ' ELSE ' + expr_compile(expr.else_, state) stmt += ' END' return stmt
[docs]class Trim(ComparableExpr): """Remove the longest string containing the given characters.""" # __slots__ = ("op", "character", "column") prefix = "(unknown)" def __init__(self, op, character, column): self.op = op self.character = character self.column = column
[docs]def compile_trim(compile, expr, state): return "TRIM(%s %s FROM %s)" % ( expr.op, expr_compile(expr.character, state), expr_compile(expr.column, state))
[docs]class Concat(Expr): """Concatenates string together using the || operator.""" # __slots__ = ("inputs",) prefix = "(unknown)" def __init__(self, *inputs): self.inputs = inputs
[docs]def compile_concat(compile, expr, state): return " || ".join(expr_compile(input_, state) for input_ in expr.inputs)
[docs]class Between(Expr): """Check if value is between start and end""" # __slots__ = ('value', 'start', 'end') def __init__(self, value, start, end): self.value = value self.start = start self.end = end
[docs]def compile_between(compile, expr, state): return ' %s BETWEEN %s AND %s ' % ( expr_compile(expr.value, state), expr_compile(expr.start, state), expr_compile(expr.end, state))
[docs]class GenerateSeries(FromExpr): __slots__ = ('start', 'end', 'step') def __init__(self, start, end, step=Undef): self.start = start self.end = end self.step = step
[docs]def compile_generate_series(compile, expr, state): state.push("context", EXPR) if expr.step is Undef: expr = 'generate_series(%s, %s)' % (expr_compile(expr.start, state), expr_compile(expr.end, state)) else: expr = 'generate_series(%s, %s, %s)' % (expr_compile(expr.start, state), expr_compile(expr.end, state), expr_compile(expr.step, state)) state.pop() return expr
[docs]class UnionAll(SetExpr): """Union all the results UNION is to UNION ALL what a python's set is to a list. UNION will remove duplicates from the resulting rows while UNION ALL will just join all data, making it a little bit faster but possibly with more rows. """ __slots__ = () oper = " UNION ALL "
expr_compile.set_precedence(10, UnionAll)
[docs]def is_sql_identifier(identifier): return (not expr_compile.is_reserved_word(identifier) and is_safe_token(identifier))
[docs]class Over(ComparableExpr): """Check if value is between start and end Usage: Over(attr, [partitions], [order by]) e.g.: Considering the query: SELECT sale.total_amount OVER (ORDER BY sale.confirm_date DESC) FROM sale; The window function gets described as: Over(Sale.total_amount, [], [Desc(Sale.confirm_date)]) """ __slots__ = ('attribute', 'partitions', 'orders') def __init__(self, attribute, partitions=None, orders=None): self.attribute = attribute self.partitions = partitions self.orders = orders
[docs]def compile_over(compile, expr, state): result = ' %s OVER (' % expr_compile(expr.attribute, state) if expr.partitions: partitions = ', '.join(expr_compile(i, state) for i in expr.partitions) result += 'PARTITION BY %s ' % partitions if expr.orders: orders = ', '.join(expr_compile(i, state) for i in expr.orders) result += 'ORDER BY %s ' % orders result += ')' return result