首页 文章

将某些工作流程执行优先于其他工作流程

提问于
浏览
3

我一直在使用amazon swf的流程框架,我希望能够运行优先级工作流程执行和正常的工作流程执行 . 如果存在优先级任务,则活动应在正常优先级任务之前获取优先级任务 . 完成此任务的最佳方法是什么?

我认为以下可能会有效,但我想知道是否有更好/推荐的方法 .

  • 我将为活动定义两个活动工作者和两个活动列表 . 一个优先级列表和一个普通列表 . 每个工作人员将使用相同的活动类 .

  • 两个工作程序将在同一主机(ec2实例)上运行 .

  • 在工作流程上,我将定义两个方法:startNormalWorkflow和startHighWorkflow . 在startHighWorkflow方法中,我可以使用ActivitySchedulingOptions将任务放在高优先级列表中 .

这种方法的问题在于无法保证在正常任务之前安排高优先级任务 .

2 回答

  • 1

    这是一个很好的问题,它让我抓了一会儿 .

    当然,有不止一种方法可以给这只猫提供皮肤,并且存在许多有效的解决方案 . 我在这里集中讨论了我能想到的最简单的可能性,即在单个工作流程中按优先级顺序执行任务 .

    场景如下:我定义了一个活动工作者,提供两个任务列表, default_tasksurgent_tasks ,具有一个简单的逻辑:

    • 如果 urgent_tasks 列表中有待处理的任务,则从那里选择一个,

    • 否则,从 default_tasks 中选择一项任务

    • 执行所选的任何任务 .

    问题是如何检查是否有任何高优先级的任务正在等待处理? CountPendingActivityTasks API来救援!

    我知道你使用Flow进行开发 . 我的例子是使用 boto.swf.layer2 编写的,因为Python对于原型设计来说更容易 - 但是这个想法保持不变,并且可以扩展到具有高优先级和低优先级工作流程执行的更复杂的场景 .

    因此,要使用boto.swf完成上述操作,请按照下列步骤操作:

    Export credentials to the environment

    $ export AWS_ACCESS_KEY_ID=your access key
    $ export AWS_SECRET_ACCESS_KEY= your secret key
    

    Get the code snippets

    为方便起见,您可以从github分叉:

    $ git clone git@github.com:oozie/stackoverflow.git
    $ cd stackoverflow/amazon-swf/priority_tasks/
    

    要引导域和工作流:

    # domain_setup.py 
    import boto.swf.layer2 as swf
    
    DOMAIN = 'stackoverflow'
    VERSION = '1.0'
    
    swf.Domain(name=DOMAIN).register()
    swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
    swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
    

    决策者实施:

    # decider.py
    import boto.swf.layer2 as swf
    
    DOMAIN = 'stackoverflow'
    ACTIVITY = 'SomeActivity'
    VERSION = '1.0'
    
    class MyWorkflowDecider(swf.Decider):
    
        domain = DOMAIN
        task_list = 'default_tasks'
        version = VERSION
    
        def run(self):
            history = self.poll()
            print history
            if 'events' in history:
                # Get a list of non-decision events to see what event came in last.
                workflow_events = [e for e in history['events']
                                   if not e['eventType'].startswith('Decision')]
    
                decisions = swf.Layer1Decisions()
    
                last_event = workflow_events[-1]
                last_event_type = last_event['eventType']
    
                if last_event_type == 'WorkflowExecutionStarted':
                    # At the start, get the worker to fetch the first assignment.
                    decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
                    decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
                    decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
                    decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
                    decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
                elif last_event_type == 'ActivityTaskCompleted':
                    # Complete workflow execution after 5 completed activities.
                    closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
                    if closed_activity_count == 5:
                        decisions.complete_workflow_execution()
    
                self.complete(decisions=decisions)
                return True
    

    优先考虑 Worker 实施:

    # worker.py
    import boto.swf.layer2 as swf
    
    DOMAIN = 'stackoverflow'
    VERSION = '1.0'
    
    class PrioritizingWorker(swf.ActivityWorker):
    
        domain = DOMAIN
        version = VERSION
    
        def run(self):
    
            urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
            if urgent_task_count > 0:
                self.task_list = 'urgent_tasks'
            else:
                self.task_list = 'default_tasks'
            activity_task = self.poll()
    
            if 'activityId' in activity_task:
                print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
                self.complete()
                return True
    

    Run the workflow from three instances of an interactive Python shell

    运行决策程序:

    $ python -i decider.py
    >>> while MyWorkflowDecider().run(): pass
    ...
    

    开始执行:

    $ python -i decider.py 
    >>> swf.WorkflowType(domain='stackoverflow', name='MyWorkflow', version='1.0', task_list='default_tasks').start()
    

    最后,启动工作人员并在执行任务时观察任务:

    $ python -i worker.py 
    >>> while PrioritizingWorker().run(): pass
    ... 
    2 urgent tasks in the queue. Executing SomeActivity2
    1 urgent tasks in the queue. Executing SomeActivity4
    0 urgent tasks in the queue. Executing SomeActivity5
    0 urgent tasks in the queue. Executing SomeActivity1
    0 urgent tasks in the queue. Executing SomeActivity3
    
  • 2

    事实证明,使用您必须首先检查的单独任务列表不能很好地工作 .

    有几个问题 .

    首先,count API不能可靠地更新 . 因此,即使队列中有紧急任务,您也可能获得0个任务 .

    其次,如果没有可用任务,则轮询任务的调用会挂起 . 因此,当您轮询非紧急任务时,这将“坚持”2分钟,或直到您有非紧急任务要做 .

    因此,这可能会导致工作流程中出现各种问题 .

    为此,SWF必须实现一个轮询API,该API可以从任务列表列表中返回第一个任务 . 那会更容易 .

相关问题