#
# Copyright (c) 2006, 2007 Canonical
#
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from bisect import insort_left, bisect_left
import weakref
import sys
from storm.exceptions import PropertyPathError
from storm.info import get_obj_info, get_cls_info
from storm.expr import Column, Undef
from storm.variables import (
Variable, VariableFactory, BoolVariable, IntVariable, FloatVariable,
DecimalVariable, RawStrVariable, UnicodeVariable, DateTimeVariable,
DateVariable, TimeVariable, TimeDeltaVariable, UUIDVariable,
PickleVariable, JSONVariable, ListVariable, EnumVariable)
__all__ = ["Property", "SimpleProperty",
"Bool", "Int", "Float", "Decimal", "RawStr", "Unicode",
"DateTime", "Date", "Time", "TimeDelta", "UUID", "Enum",
"Pickle", "JSON", "List", "PropertyRegistry"]
class Property(object):
def __init__(self, name=None, primary=False,
variable_class=Variable, variable_kwargs={}):
self._name = name
self._primary = primary
self._variable_class = variable_class
self._variable_kwargs = variable_kwargs
def __get__(self, obj, cls=None):
if obj is None:
return self._get_column(cls)
obj_info = get_obj_info(obj)
if cls is None:
# Don't get obj.__class__ because we don't trust it
# (might be proxied or whatever).
cls = obj_info.cls_info.cls
column = self._get_column(cls)
return obj_info.variables[column].get()
def __set__(self, obj, value):
obj_info = get_obj_info(obj)
# Don't get obj.__class__ because we don't trust it
# (might be proxied or whatever).
column = self._get_column(obj_info.cls_info.cls)
obj_info.variables[column].set(value)
def __delete__(self, obj):
obj_info = get_obj_info(obj)
# Don't get obj.__class__ because we don't trust it
# (might be proxied or whatever).
column = self._get_column(obj_info.cls_info.cls)
obj_info.variables[column].delete()
def _detect_attr_name(self, used_cls):
self_id = id(self)
for cls in used_cls.__mro__:
for attr, prop in cls.__dict__.items():
if id(prop) == self_id:
return attr
raise RuntimeError("Property used in an unknown class")
def _get_column(self, cls):
# Cache per-class column values in the class itself, to avoid
# holding a strong reference to it here, and thus rendering
# classes uncollectable in certain situations (e.g. subclasses
# where the property is stored in the base).
try:
# Use class dictionary explicitly to get sensible
# results on subclasses.
column = cls.__dict__["_storm_columns"].get(self)
except KeyError:
cls._storm_columns = {}
column = None
if column is None:
attr = self._detect_attr_name(cls)
if self._name is None:
name = attr
else:
name = self._name
column = PropertyColumn(self, cls, attr, name, self._primary,
self._variable_class,
self._variable_kwargs)
cls._storm_columns[self] = column
return column
class PropertyColumn(Column):
def __init__(self, prop, cls, attr, name, primary,
variable_class, variable_kwargs):
Column.__init__(self, name, cls, primary,
VariableFactory(variable_class, column=self,
validator_attribute=attr,
**variable_kwargs))
self.cls = cls # Used by references
# Copy attributes from the property to avoid one additional
# function call on each access.
for attr in ["__get__", "__set__", "__delete__"]:
setattr(self, attr, getattr(prop, attr))
class SimpleProperty(Property):
variable_class = None
def __init__(self, name=None, primary=False, **kwargs):
kwargs["value"] = kwargs.pop("default", Undef)
kwargs["value_factory"] = kwargs.pop("default_factory", Undef)
Property.__init__(self, name, primary, self.variable_class, kwargs)
class Bool(SimpleProperty):
variable_class = BoolVariable
class Int(SimpleProperty):
variable_class = IntVariable
class Float(SimpleProperty):
variable_class = FloatVariable
class Decimal(SimpleProperty):
variable_class = DecimalVariable
class RawStr(SimpleProperty):
variable_class = RawStrVariable
# OBSOLETE RawStr was Chars in 0.9. This will die soon.
Chars = RawStr
class Unicode(SimpleProperty):
variable_class = UnicodeVariable
class DateTime(SimpleProperty):
variable_class = DateTimeVariable
class Date(SimpleProperty):
variable_class = DateVariable
class Time(SimpleProperty):
variable_class = TimeVariable
class TimeDelta(SimpleProperty):
variable_class = TimeDeltaVariable
class UUID(SimpleProperty):
variable_class = UUIDVariable
class Pickle(SimpleProperty):
variable_class = PickleVariable
class JSON(SimpleProperty):
variable_class = JSONVariable
class List(SimpleProperty):
variable_class = ListVariable
def __init__(self, name=None, **kwargs):
if "default" in kwargs:
raise ValueError("'default' not allowed for List. "
"Use 'default_factory' instead.")
type = kwargs.pop("type", None)
if type is None:
type = Property()
kwargs["item_factory"] = VariableFactory(type._variable_class,
**type._variable_kwargs)
SimpleProperty.__init__(self, name, **kwargs)
class Enum(SimpleProperty):
"""Enumeration property, allowing used values to differ from stored ones.
For instance::
class Class(Storm):
prop = Enum(map={"one": 1, "two": 2})
obj.prop = "one"
assert obj.prop == "one"
obj.prop = 1 # Raises error.
Another example::
class Class(Storm):
prop = Enum(map={"one": 1, "two": 2}, set_map={"um": 1})
obj.prop = "um"
assert obj.prop is "one"
obj.prop = "one" # Raises error.
"""
variable_class = EnumVariable
def __init__(self, name=None, primary=False, **kwargs):
set_map = dict(kwargs.pop("map"))
get_map = dict((value, key) for key, value in set_map.items())
if "set_map" in kwargs:
set_map = dict(kwargs.pop("set_map"))
kwargs["get_map"] = get_map
kwargs["set_map"] = set_map
SimpleProperty.__init__(self, name, primary, **kwargs)
class PropertyRegistry(object):
"""
An object which remembers the Storm properties specified on
classes, and is able to translate names to these properties.
"""
def __init__(self):
self._properties = []
def get(self, name, namespace=None):
"""Translate a property name path to the actual property.
This method accepts a property name like C{"id"} or C{"Class.id"}
or C{"module.path.Class.id"}, and tries to find a unique
class/property with the given name.
When the C{namespace} argument is given, the registry will be
able to disambiguate names by choosing the one that is closer
to the given namespace. For instance C{get("Class.id", "a.b.c")}
will choose C{a.Class.id} rather than C{d.Class.id}.
"""
key = ".".join(reversed(name.split(".")))+"."
i = bisect_left(self._properties, (key,))
l = len(self._properties)
best_props = []
if namespace is None:
while i < l and self._properties[i][0].startswith(key):
path, prop_ref = self._properties[i]
prop = prop_ref()
if prop is not None:
best_props.append((path, prop))
i += 1
else:
namespace_parts = ("." + namespace).split(".")
best_path_info = (0, sys.maxint)
while i < l and self._properties[i][0].startswith(key):
path, prop_ref = self._properties[i]
prop = prop_ref()
if prop is None:
i += 1
continue
path_parts = path.split(".")
path_parts.reverse()
common_prefix = 0
for part, ns_part in zip(path_parts, namespace_parts):
if part == ns_part:
common_prefix += 1
else:
break
path_info = (-common_prefix, len(path_parts)-common_prefix)
if path_info < best_path_info:
best_path_info = path_info
best_props = [(path, prop)]
elif path_info == best_path_info:
best_props.append((path, prop))
i += 1
if not best_props:
raise PropertyPathError("Path '%s' matches no known property."
% name)
elif len(best_props) > 1:
paths = [".".join(reversed(path.split(".")[:-1]))
for path, prop in best_props]
raise PropertyPathError("Path '%s' matches multiple "
"properties: %s" %
(name, ", ".join(paths)))
return best_props[0][1]
def add_class(self, cls):
"""Register properties of C{cls} so that they may be found by C{get()}.
"""
suffix = cls.__module__.split(".")
suffix.append(cls.__name__)
suffix.reverse()
suffix = ".%s." % ".".join(suffix)
cls_info = get_cls_info(cls)
for attr in cls_info.attributes:
prop = cls_info.attributes[attr]
prop_ref = weakref.KeyedRef(prop, self._remove, None)
pair = (attr+suffix, prop_ref)
prop_ref.key = pair
insort_left(self._properties, pair)
def add_property(self, cls, prop, attr_name):
"""Register property of C{cls} so that it may be found by C{get()}.
"""
suffix = cls.__module__.split(".")
suffix.append(cls.__name__)
suffix.reverse()
suffix = ".%s." % ".".join(suffix)
prop_ref = weakref.KeyedRef(prop, self._remove, None)
pair = (attr_name+suffix, prop_ref)
prop_ref.key = pair
insort_left(self._properties, pair)
def clear(self):
"""Clean up all properties in the registry.
Used by tests.
"""
del self._properties[:]
def _remove(self, ref):
self._properties.remove(ref.key)
class PropertyPublisherMeta(type):
"""A metaclass that associates subclasses with Storm L{PropertyRegistry}s.
"""
def __init__(self, name, bases, dict):
if not hasattr(self, "_storm_property_registry"):
self._storm_property_registry = PropertyRegistry()
elif hasattr(self, "__storm_table__"):
self._storm_property_registry.add_class(self)