Source code for stoqlib.lib.threadutils

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4

##
## Copyright (C) 2013 Async Open Source
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
##

""" Utilities to work with threading """

import ctypes
import threading

import glib


[docs]def terminate_thread(thread): # From http://code.activestate.com/recipes/496960-thread2-killable-threads/ """Terminates a python thread from another thread. :param thread: a thread """ if not thread: return if not thread.isAlive(): return exc = ctypes.py_object(SystemExit) res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread.ident), exc) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) raise SystemError("PyThreadState_SetAsyncExc failed")
[docs]def threadit(func, *args, **kwargs): """Run func on a separated thread""" t = threading.Thread(target=func, args=args, kwargs=kwargs) t.daemon = True t.start() return t
[docs]def schedule_in_main_thread(func, *args): """Schedules a function to be run in the main thread. It will as soon as the mainloop schedules it, normally it happens within a few ms. """ glib.idle_add(func, *args)