Source code for stoqlib.lib.pluginmanager

# -*- Mode: Python; coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4

##
## Copyright (C) 2007 Async Open Source <http://www.async.com.br>
## All rights reserved
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
##

import atexit
import glob
import hashlib
import io
import logging
import os
import shutil
import sys
import tempfile
from zipfile import ZipFile, BadZipfile, is_zipfile

from kiwi.desktopparser import DesktopParser
from kiwi.component import get_utility, provide_utility
from zope.interface import implementer

from stoqlib.database.exceptions import SQLError
from stoqlib.database.runtime import get_default_store, new_store
from stoqlib.domain.plugin import InstalledPlugin, PluginEgg
from stoqlib.lib.interfaces import IPlugin, IPluginManager
from stoqlib.lib.kiwilibrary import library
from stoqlib.lib.message import error
from stoqlib.lib.osutils import get_system_locale
from stoqlib.lib.settings import get_settings
from stoqlib.lib.translation import stoqlib_gettext as _

log = logging.getLogger(__name__)


[docs]class PluginError(Exception): pass
[docs]class PluginDescription(object): def __init__(self, filename, is_egg=False): config = DesktopParser() if is_egg: self.plugin_path = filename msg = "%s is not a valid egg file" % filename assert is_zipfile(filename), msg with ZipFile(filename, "r") as egg: filename = [f for f in egg.namelist() if f.endswith('plugin')][0] plugin_file = egg.open(filename) config.readfp(plugin_file) else: plugin_path = os.path.dirname(os.path.dirname(filename)) self.plugin_path = plugin_path config.read(filename) self.name = unicode(os.path.basename(filename).split('.')[0]) self.entry = config.get('Plugin', 'Module') self.filename = filename if config.has_option('Plugin', 'Dependencies'): self.dependencies = [ unicode(dependency.strip()) for dependency in config.get('Plugin', 'Dependencies').split(',')] else: self.dependencies = [] if config.has_option('Plugin', 'Replaces'): self.replaces = [ unicode(replace.strip()) for replace in config.get('Plugin', 'Replaces').split(',')] else: self.replaces = [] settings = get_settings() lang = settings.get('user-locale', None) if not lang: lang = get_system_locale() self.long_name = config.get_locale('Plugin', 'Name', lang) self.description = config.get_locale('Plugin', 'Description', lang) @property def dirname(self): return os.path.dirname(self.filename)
@implementer(IPluginManager)
[docs]class PluginManager(object): """A class responsible for administrating plugins This class is responsible for administrating plugins, like, controlling which one is available/installed/actived or not. @important: Never instantialize this class. Always use """ def __init__(self): self._eggs_cache = None self._reload()
[docs] def get_installed_plugins_names(self, store=None): """A list of names of all installed plugins""" return InstalledPlugin.get_plugin_names(store or get_default_store())
# # Properties # @property def egg_plugins_names(self): """A list of names of all plugins installed as eggs""" default_store = get_default_store() return [p.plugin_name for p in default_store.find(PluginEgg)] @property def available_plugins_names(self): """A list of names of all available plugins""" return list(self._plugin_descriptions.keys()) @property def installed_plugins_names(self): """A list of names of all installed plugins as a getter This getter should be avoided, and should be replaced by get_installed_plugins_names(). A more generic implementation of this. """ return self.get_installed_plugins_names() @property def active_plugins_names(self): """A list of names of all active plugins""" return list(self._active_plugins.keys()) # # Private # def _reload(self): self._plugins = {} self._active_plugins = {} self._plugin_descriptions = {} self._create_eggs_cache() self._read_plugin_descriptions() def _create_eggs_cache(self): self._eggs_cache = tempfile.mkdtemp(prefix='stoq', suffix='eggs') log.info("Eggs cache created in %s", self._eggs_cache) # Now extract all eggs from the database and put it where stoq know # how to load them default_store = get_default_store() for plugin_egg in default_store.find(PluginEgg): plugin_name = plugin_egg.plugin_name log.info("Creating egg cache for plugin %r" % (plugin_name, )) egg_filename = '{}.egg'.format(plugin_name) with open(os.path.join(self._eggs_cache, egg_filename), 'wb') as f: f.write(plugin_egg.egg_content) atexit.register( lambda: shutil.rmtree(self._eggs_cache, ignore_errors=True)) def _get_external_plugins_paths(self): # This is the dir containing stoq/kiwi/stoqdrivers/etc checkout = os.path.dirname(library.get_root()) # If there's n foobar plugin on the checkout, it will expand to find: # CHECKOUT/<git_repository>/foobar/foobar.plugin for filename in glob.iglob(os.path.join(checkout, '*', '*', '*.plugin')): # In the example above, the path here is expected to be on # <git_repository>, not on <git_repository>/foobar/ yield os.path.dirname(os.path.dirname(filename)) def _read_plugin_descriptions(self): # Development plugins on the same checkout paths = [os.path.join(library.get_root(), 'plugins')] # Plugins from PluginEgg paths.append(self._eggs_cache) if library.get_resource_exists('stoq', 'plugins'): paths.append(library.get_resource_filename('stoq', 'plugins')) paths.extend(list(self._get_external_plugins_paths())) for path in paths: for filename in glob.iglob(os.path.join(path, '*', '*.plugin')): self._register_plugin_description(filename) for filename in glob.iglob(os.path.join(path, '*.egg')): self._register_plugin_description(filename, is_egg=True) def _register_plugin_description(self, filename, is_egg=False): desc = PluginDescription(filename, is_egg) self._plugin_descriptions[desc.name] = desc def _import_plugin(self, plugin_desc): log.info("Loading plugin %s" % (plugin_desc.name, )) plugin_path = plugin_desc.plugin_path if plugin_path not in sys.path: sys.path.append(plugin_path) # FIXME: Use setuptools entry points when we can __import__(os.path.basename(plugin_desc.dirname), globals(), locals(), [plugin_desc.entry]) assert plugin_desc.name in self._plugins, (plugin_desc.name, self._plugins) # # Public API #
[docs] def download_plugin(self, plugin_name): """Download a plugin from webservice :param plugin_name: the name of the plugin to download :returns: a deferred """ from stoqlib.lib.webservice import WebService default_store = get_default_store() existing_egg = default_store.find(PluginEgg, plugin_name=plugin_name).one() md5sum = existing_egg and existing_egg.egg_md5sum webapi = WebService() r = webapi.download_plugin(plugin_name, md5sum=md5sum) try: response = r.get_response() except Exception as e: return False, _("Failed to do the request: %s" % (e, )) code = response.status_code if code == 204: msg = _("No update needed. The plugin is already up to date.") log.info(msg) return True, msg if code != 200: return_messages = { 400: _("Plugin not available for this stoq version"), 401: _("The instance is not authorized to download the plugin"), 404: _("Plugin does not exist"), 405: _("This instance has not acquired the specified plugin"), } msg = return_messages.get(code, str(code)) log.warning(msg) return False, msg try: with io.BytesIO() as f: f.write(response.content) with ZipFile(f) as egg: if egg.testzip() is not None: raise BadZipfile md5sum = unicode(hashlib.md5(f.getvalue()).hexdigest()) with new_store() as store: existing_egg = store.find(PluginEgg, plugin_name=plugin_name).one() if existing_egg is not None: existing_egg.egg_content = f.getvalue() existing_egg.egg_md5sum = md5sum else: PluginEgg( store=store, plugin_name=plugin_name, egg_md5sum=md5sum, egg_content=f.getvalue(), ) except BadZipfile: return False, _("The downloaded plugin is corrupted") self._reload() return True, _("Plugin download successful")
[docs] def get_plugin(self, plugin_name): """Returns a plugin by it's name :param plugin_name: the plugin's name :returns: the :class:`IPlugin` implementation of the plugin """ if not plugin_name in self._plugin_descriptions: raise PluginError("Plugin %s not found. Available ones are: %s" % ( plugin_name, ', '.join(self.available_plugins_names))) if not plugin_name in self._plugins: self._import_plugin(self._plugin_descriptions[plugin_name]) return self._plugins[plugin_name]
[docs] def get_description_by_name(self, plugin_name): """Returns the plugin's description given a plugin's name :returns: the :class:`PluginDescription` for the plugin """ return self._plugin_descriptions.get(plugin_name)
[docs] def register_plugin(self, plugin): """Registers a plugin on manager This needs to be called by every plugin, or else, the manager won't know it's existence. It's usually a good idea to use :class:`register_plugin` function on plugin code, so the plugin will be registered as soon as it's module gets read by python. :param plugin: the :class:`IPlugin` implementation of the plugin """ if not IPlugin.providedBy(plugin): raise TypeError("The object %s does not implement IPlugin " "interface" % (plugin, )) self._plugins[plugin.name] = plugin
[docs] def activate_plugin(self, plugin_name): """Activates a plugin This will activate the C{plugin}, calling it's C{activate} method and possibly doing some extra logic (e.g. logging). :param important: Always activate a plugin using this method because the manager keeps track of all active plugins. Else you probably will activate the same plugin twice, and that probably won't be good :) :param plugin: the :class:`IPlugin` implementation of the plugin """ if self.is_active(plugin_name): raise PluginError("Plugin %s is already active" % (plugin_name, )) dependencies = self._plugin_descriptions[plugin_name].dependencies for dependency in dependencies: if not self.is_active(dependency): self.activate_plugin(dependency) plugin = self.get_plugin(plugin_name) log.info("Activating plugin %s" % (plugin_name, )) plugin.activate() self._active_plugins[plugin_name] = plugin
[docs] def pre_install_plugin(self, store, plugin_name): """Pre Install Plugin Registers an intention to activate a plugin, that will require further actions to enable when running stoq later, like downloading the plugin from stoq.link. """ if plugin_name in self.installed_plugins_names: raise PluginError("Plugin %s is already enabled." % (plugin_name, )) InstalledPlugin(store=store, plugin_name=plugin_name, plugin_version=None)
[docs] def install_plugin(self, store, plugin_name): """Install and enable a plugin @important: Calling this makes a plugin installed, but, it's your responsability to activate it! :param plugin: the :class:`IPlugin` implementation of the plugin """ # Try to get the plugin first. If it was't registered yet, # PluginError will be raised. plugin = self.get_plugin(plugin_name) if plugin_name in self.installed_plugins_names: raise PluginError("Plugin %s is already enabled." % (plugin_name, )) dependencies = self._plugin_descriptions[plugin_name].dependencies for dependency in dependencies: if not self.is_installed(dependency): self.install_plugin(store, dependency) InstalledPlugin.create(store, plugin_name) # FIXME: We should not be doing this commit here, but by not doing so, # ``` # migration = plugin.get_migration() # ``` # Would not find any plugin (as it uses the default store), to allow # `plugin.get_migration()` to accept a custom store, we would have to # change all the plugins `get_migration` method. # # An alternate solution to this would be to manually set the correct # plugin for `migration`: # # migration._plugin = store.find(InstalledPlugin, # plugin_name=plugin_name).one() # # Along with passing the store to `migration.apply_all_patches()` # # But it will be dirty and will probably be removed once the definitive # solution (change `plugin.get_migration()`) is implemented store.commit(close=False) migration = plugin.get_migration() if migration: try: migration.apply_all_patches() except SQLError as e: # This means a lock could not be obtained. Warn user about this # and let stoq restart, that the schema will be upgraded # correctly error('Não foi possível terminar a instalação do plugin. ' 'Por favor reinicie todas as instancias do Stoq que ' 'estiver executando (%s)' % (e, ))
[docs] def activate_installed_plugins(self): """Activate all installed plugins A helper method to activate all installed plugins in just one call, without having to get and activate one by one. """ available_plugins = self.available_plugins_names installed_plugins = self.installed_plugins_names replace_dict = {} for plugin_name in available_plugins: for replace in self._plugin_descriptions[plugin_name].replaces: replace_list = replace_dict.setdefault(replace, []) replace_list.append(plugin_name) for plugin_name in installed_plugins: if any(will_replace in installed_plugins for will_replace in replace_dict.get(plugin_name, [])): continue if not plugin_name in available_plugins: raise AssertionError( "Plugin %r not found on the system. " "Available plugins: %r" % (plugin_name, available_plugins)) if not self.is_active(plugin_name): self.activate_plugin(plugin_name)
[docs] def is_active(self, plugin_name): """Returns if a plugin with a certain name is active or not. :returns: True if the given plugin name is active, False otherwise. """ return plugin_name in self.active_plugins_names
[docs] def is_any_active(self, plugin_names): """Check if any of the plugin names are active. :param plugin_names: a list of plugin names to check """ return any(self.is_active(name) for name in plugin_names)
[docs] def is_installed(self, plugin_name, store=False): """Returns if a plugin with a certain name is installed or not :returns: True if the given plugin name is active, False otherwise. """ return plugin_name in self.get_installed_plugins_names(store)
[docs]def register_plugin(plugin_class): """Registers a plugin on IPluginManager Just a convenience function that can be added at the end of each plugin class definition to register it on manager. :param plugin_class: class to register, must implement :class:`IPlugin` """ manager = get_plugin_manager() manager.register_plugin(plugin_class())
[docs]def get_plugin_manager(): """Provides and returns the plugin manager @attention: Try to always use this instead of getting the utillity by hand, as that could not have been provided before. :returns: an :class:`PluginManager` instance """ manager = get_utility(IPluginManager, None) if not manager: manager = PluginManager() provide_utility(IPluginManager, manager) return manager