首页 文章

函数调用超时

提问于
浏览
218

我正在调用Python中的一个函数,我知道它可能会停止并迫使我重新启动脚本 .

如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?

13 回答

  • 0

    优秀,易用且可靠的PyPi项目 timeout-decoratorhttps://pypi.org/project/timeout-decorator/

    installation

    pip install timeout-decorator
    

    Usage

    import time
    import timeout_decorator
    
    @timeout_decorator.timeout(5)
    def mytest():
        print "Start"
        for i in range(1,10):
            time.sleep(1)
            print "%d seconds have passed" % i
    
    if __name__ == '__main__':
        mytest()
    
  • 55

    如果在UNIX上运行,则可以使用signal包:

    In [1]: import signal
    
    # Register an handler for the timeout
    In [2]: def handler(signum, frame):
       ...:     print "Forever is over!"
       ...:     raise Exception("end of time")
       ...: 
    
    # This function *may* run for an indetermined time...
    In [3]: def loop_forever():
       ...:     import time
       ...:     while 1:
       ...:         print "sec"
       ...:         time.sleep(1)
       ...:         
       ...:         
    
    # Register the signal function handler
    In [4]: signal.signal(signal.SIGALRM, handler)
    Out[4]: 0
    
    # Define a timeout for your function
    In [5]: signal.alarm(10)
    Out[5]: 0
    
    In [6]: try:
       ...:     loop_forever()
       ...: except Exception, exc: 
       ...:     print exc
       ....: 
    sec
    sec
    sec
    sec
    sec
    sec
    sec
    sec
    Forever is over!
    end of time
    
    # Cancel the timer if the function returned before timeout
    # (ok, mine won't but yours maybe will :)
    In [7]: signal.alarm(0)
    Out[7]: 0
    

    调用 alarm.alarm(10) 后10秒,调用处理程序 . 这引发了一个例外,您可以从常规Python代码中截取 .

    这个模块不适合线程(但那么,谁呢?)

    Note that 因为我们在超时发生时引发异常,它可能最终被捕获并在函数内被忽略,例如一个这样的函数:

    def loop_forever():
        while 1:
            print 'sec'
            try:
                time.sleep(10)
            except:
                continue
    
  • 0

    您可以使用 multiprocessing.Process 来做到这一点 .

    Code

    import multiprocessing
    import time
    
    # bar
    def bar():
        for i in range(100):
            print "Tick"
            time.sleep(1)
    
    if __name__ == '__main__':
        # Start bar as a process
        p = multiprocessing.Process(target=bar)
        p.start()
    
        # Wait for 10 seconds or until process finishes
        p.join(10)
    
        # If thread is still active
        if p.is_alive():
            print "running... let's kill it..."
    
            # Terminate
            p.terminate()
            p.join()
    
  • 44

    如何调用该函数或将其包装在哪里,以便在脚本取消时间超过5秒的情况下将其取消?

    我发布了gist,用装饰器和 threading.Timer 解决了这个问题/问题 . 这是故障 .

    导入和设置兼容性

    它使用Python 2和3进行了测试 . 它也应该在Unix / Linux和Windows下运行 .

    首先是进口 . 无论Python版本如何,这些都试图保持代码一致:

    from __future__ import print_function
    import sys
    import threading
    from time import sleep
    try:
        import thread
    except ImportError:
        import _thread as thread
    

    使用版本无关的代码:

    try:
        range, _print = xrange, print
        def print(*args, **kwargs): 
            flush = kwargs.pop('flush', False)
            _print(*args, **kwargs)
            if flush:
                kwargs.get('file', sys.stdout).flush()            
    except NameError:
        pass
    

    现在我们从标准库中导入了我们的功能 .

    exit_after装饰者

    接下来我们需要一个函数来终止子线程中的 main()

    def quit_function(fn_name):
        # print to stderr, unbuffered in Python 2.
        print('{0} took too long'.format(fn_name), file=sys.stderr)
        sys.stderr.flush() # Python 3 stderr is likely buffered.
        thread.interrupt_main() # raises KeyboardInterrupt
    

    这是装饰者本身:

    def exit_after(s):
        '''
        use as decorator to exit process if 
        function takes longer than s seconds
        '''
        def outer(fn):
            def inner(*args, **kwargs):
                timer = threading.Timer(s, quit_function, args=[fn.__name__])
                timer.start()
                try:
                    result = fn(*args, **kwargs)
                finally:
                    timer.cancel()
                return result
            return inner
        return outer
    

    用法

    这里的用法直接回答了你关于5秒后退出的问题!:

    @exit_after(5)
    def countdown(n):
        print('countdown started', flush=True)
        for i in range(n, -1, -1):
            print(i, end=', ', flush=True)
            sleep(1)
        print('countdown finished')
    

    演示:

    >>> countdown(3)
    countdown started
    3, 2, 1, 0, countdown finished
    >>> countdown(10)
    countdown started
    10, 9, 8, 7, 6, countdown took too long
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 11, in inner
      File "<stdin>", line 6, in countdown
    KeyboardInterrupt
    

    第二个函数调用不会完成,而是进程应该以traceback退出!

    KeyboardInterrupt并不总是停止睡眠线程

    请注意,Windows上的Python 2上的键盘中断并不会总是打扰睡眠,例如:

    @exit_after(1)
    def sleep10():
        sleep(10)
        print('slept 10 seconds')
    
    >>> sleep10()
    sleep10 took too long         # Note that it hangs here about 9 more seconds
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 11, in inner
      File "<stdin>", line 3, in sleep10
    KeyboardInterrupt
    

    也不可能中断在扩展中运行的代码,除非它明确检查 PyErr_CheckSignals() ,参见Cython, Python and KeyboardInterrupt ignored

    在任何情况下,我都会避免睡一个线程超过一秒 - 这是处理器时间的一个因素 .

    我如何调用该函数或将其包装在哪里,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?

    要捕获它并执行其他操作,您可以捕获KeyboardInterrupt .

    >>> try:
    ...     countdown(10)
    ... except KeyboardInterrupt:
    ...     print('do something else')
    ... 
    countdown started
    10, 9, 8, 7, 6, countdown took too long
    do something else
    
  • 1

    我有一个不同的提议,它是一个纯函数(使用与线程建议相同的API)并且似乎工作正常(基于此线程的建议)

    def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
        import signal
    
        class TimeoutError(Exception):
            pass
    
        def handler(signum, frame):
            raise TimeoutError()
    
        # set the timeout handler
        signal.signal(signal.SIGALRM, handler) 
        signal.alarm(timeout_duration)
        try:
            result = func(*args, **kwargs)
        except TimeoutError as exc:
            result = default
        finally:
            signal.alarm(0)
    
        return result
    
  • 11

    在单元测试中搜索超时调用时,我遇到了这个线程 . 我没有在答案或第三方包中找到任何简单的内容,所以我在下面编写了装饰器,你可以直接进入代码:

    import multiprocessing.pool
    import functools
    
    def timeout(max_timeout):
        """Timeout decorator, parameter in seconds."""
        def timeout_decorator(item):
            """Wrap the original function."""
            @functools.wraps(item)
            def func_wrapper(*args, **kwargs):
                """Closure for function."""
                pool = multiprocessing.pool.ThreadPool(processes=1)
                async_result = pool.apply_async(item, args, kwargs)
                # raises a TimeoutError if execution exceeds max_timeout
                return async_result.get(max_timeout)
            return func_wrapper
        return timeout_decorator
    

    然后就这么简单来超时测试或任何你喜欢的功能:

    @timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
    def test_base_regression(self):
        ...
    
  • 181

    有很多建议,但没有使用concurrent.futures,我认为这是最清晰的处理方式 .

    from concurrent.futures import ProcessPoolExecutor
    
    # Warning: this does not terminate function if timeout
    def timeout_five(fnc, *args, **kwargs):
        with ProcessPoolExecutor() as p:
            f = p.submit(fnc, *args, **kwargs)
            return f.result(timeout=5)
    

    超级简单的阅读和维护 .

    我们创建一个池,提交一个进程,然后等待最多5秒钟,然后提出一个TimeoutError,你可以捕获并处理你需要的东西 .

    原生于python 3.2并向后移植到2.7(pip install期货) .

    在线程和进程之间切换就像用 ThreadPoolExecutor 替换 ProcessPoolExecutor 一样简单 .

    如果你想在超时时终止进程,我建议你查看Pebble .

  • 116

    在pypi上找到的 stopit 包似乎可以很好地处理超时 .

    我喜欢 @stopit.threading_timeoutable 装饰器,它为装饰函数添加一个 timeout 参数,它可以满足你的期望,它会停止这个功能 .

    在pypi上查看:https://pypi.python.org/pypi/stopit

  • 28
    #!/usr/bin/python2
    import sys, subprocess, threading
    proc = subprocess.Popen(sys.argv[2:])
    timer = threading.Timer(float(sys.argv[1]), proc.terminate)
    timer.start()
    proc.wait()
    timer.cancel()
    exit(proc.returncode)
    
  • 2

    我需要 nestable 定时中断(SIGALARM可以被time.sleep阻塞(基于线程的方法无法做到) . 我最终复制并从这里轻轻修改代码:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

    代码本身:

    #!/usr/bin/python
    
    # lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
    
    
    """alarm.py: Permits multiple SIGALRM events to be queued.
    
    Uses a `heapq` to store the objects to be called when an alarm signal is
    raised, so that the next alarm is always at the top of the heap.
    """
    
    import heapq
    import signal
    from time import time
    
    __version__ = '$Revision: 2539 $'.split()[1]
    
    alarmlist = []
    
    __new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
    __next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
    __set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
    
    
    class TimeoutError(Exception):
        def __init__(self, message, id_=None):
            self.message = message
            self.id_ = id_
    
    
    class Timeout:
        ''' id_ allows for nested timeouts. '''
        def __init__(self, id_=None, seconds=1, error_message='Timeout'):
            self.seconds = seconds
            self.error_message = error_message
            self.id_ = id_
        def handle_timeout(self):
            raise TimeoutError(self.error_message, self.id_)
        def __enter__(self):
            self.this_alarm = alarm(self.seconds, self.handle_timeout)
        def __exit__(self, type, value, traceback):
            try:
                cancel(self.this_alarm) 
            except ValueError:
                pass
    
    
    def __clear_alarm():
        """Clear an existing alarm.
    
        If the alarm signal was set to a callable other than our own, queue the
        previous alarm settings.
        """
        oldsec = signal.alarm(0)
        oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
        if oldsec > 0 and oldfunc != __alarm_handler:
            heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
    
    
    def __alarm_handler(*zargs):
        """Handle an alarm by calling any due heap entries and resetting the alarm.
    
        Note that multiple heap entries might get called, especially if calling an
        entry takes a lot of time.
        """
        try:
            nextt = __next_alarm()
            while nextt is not None and nextt <= 0:
                (tm, func, args, keys) = heapq.heappop(alarmlist)
                func(*args, **keys)
                nextt = __next_alarm()
        finally:
            if alarmlist: __set_alarm()
    
    
    def alarm(sec, func, *args, **keys):
        """Set an alarm.
    
        When the alarm is raised in `sec` seconds, the handler will call `func`,
        passing `args` and `keys`. Return the heap entry (which is just a big
        tuple), so that it can be cancelled by calling `cancel()`.
        """
        __clear_alarm()
        try:
            newalarm = __new_alarm(sec, func, args, keys)
            heapq.heappush(alarmlist, newalarm)
            return newalarm
        finally:
            __set_alarm()
    
    
    def cancel(alarm):
        """Cancel an alarm by passing the heap entry returned by `alarm()`.
    
        It is an error to try to cancel an alarm which has already occurred.
        """
        __clear_alarm()
        try:
            alarmlist.remove(alarm)
            heapq.heapify(alarmlist)
        finally:
            if alarmlist: __set_alarm()
    

    和一个用法示例:

    import alarm
    from time import sleep
    
    try:
        with alarm.Timeout(id_='a', seconds=5):
            try:
                with alarm.Timeout(id_='b', seconds=2):
                    sleep(3)
            except alarm.TimeoutError as e:
                print 'raised', e.id_
            sleep(30)
    except alarm.TimeoutError as e:
        print 'raised', e.id_
    else:
        print 'nope.'
    
  • 13

    这是对给定的基于线程的解决方案的略微改进 .

    以下代码支持 exceptions

    def runFunctionCatchExceptions(func, *args, **kwargs):
        try:
            result = func(*args, **kwargs)
        except Exception, message:
            return ["exception", message]
    
        return ["RESULT", result]
    
    
    def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
        import threading
        class InterruptableThread(threading.Thread):
            def __init__(self):
                threading.Thread.__init__(self)
                self.result = default
            def run(self):
                self.result = runFunctionCatchExceptions(func, *args, **kwargs)
        it = InterruptableThread()
        it.start()
        it.join(timeout_duration)
        if it.isAlive():
            return default
    
        if it.result[0] == "exception":
            raise it.result[1]
    
        return it.result[1]
    

    使用5秒超时调用它:

    result = timeout(remote_calculate, (myarg,), timeout_duration=5)
    
  • 3

    我们可以使用相同的信号 . 我认为以下示例对您有用 . 这个很与线程相比简单 .

    import signal
    
    def timeout(signum, frame):
        raise myException
    
    #this is an infinite loop, never ending under normal circumstances
    def main():
        print 'Starting Main ',
        while 1:
            print 'in main ',
    
    #SIGALRM is only usable on a unix platform
    signal.signal(signal.SIGALRM, timeout)
    
    #change 5 to however many seconds you need
    signal.alarm(5)
    
    try:
        main()
    except myException:
        print "whoops"
    
  • 0

    timeout-decorator 不要't work on windows system as , windows didn' t支持 signal .

    如果您在Windows系统中使用timeout-decorator,您将获得以下内容

    AttributeError: module 'signal' has no attribute 'SIGALRM'
    

    有人建议使用 use_signals=False ,但对我没用 .

    作者@bitranox创建了以下包:

    pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
    

    代码示例:

    import time
    from wrapt_timeout_decorator import *
    
    @timeout(5)
    def mytest(message):
        print(message)
        for i in range(1,10):
            time.sleep(1)
            print('{} seconds have passed'.format(i))
    
    def main():
        mytest('starting')
    
    
    if __name__ == '__main__':
        main()
    

    给出以下例外:

    TimeoutError: Function mytest timed out after 5 seconds
    

相关问题