Source code for stoqlib.lib.threadutils
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
##
## Copyright (C) 2013 Async Open Source
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
##
""" Utilities to work with threading """
import ctypes
import threading
import glib
[docs]def terminate_thread(thread):
# From http://code.activestate.com/recipes/496960-thread2-killable-threads/
"""Terminates a python thread from another thread.
:param thread: a thread
"""
if not thread:
return
if not thread.isAlive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
[docs]def threadit(func, *args, **kwargs):
"""Run func on a separated thread"""
t = threading.Thread(target=func, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
[docs]def schedule_in_main_thread(func, *args):
"""Schedules a function to be run in the main thread.
It will as soon as the mainloop schedules it, normally
it happens within a few ms.
"""
glib.idle_add(func, *args)