首页 文章

Hangfire ContinueWith有多个来源

提问于
浏览
1

我正在咀嚼使用 ContinueWith 完成以下任务的方法:

  • Originating App提交(非并发)作业

  • 作业启动,进行一些处理,在其他地方调用Web服务

  • 工作"completes"(静止)

  • Web Service在完成内部处理后提交"continuation"作业(可能是30分钟,可能是2天)

  • 以某种方式调用Jobs.ContinueWith?

  • 作业最终完成,原始作业标记为完成

我遇到的是太多活动部件 . Originating App是一款C#/ MVC应用程序 . 用户完成他的工作并最终提交一个长时间运行的工作来执行 . 作业处理器(C#库)执行一些工作,然后调用JAVA SOAP endpoints ,提供初始处理的结果 . JAVA SOAP endpoints 调用COTS应用程序来完成处理,然后通过“我完成”回调作业 .

如您所见,我没有明确的方法来执行以下操作:

var parentId = _jobs.Enqueue<MyJob>(x => x.StartExecution(job.Id));
_jobs.ContinueWith<JAVA_ENDPOINT>(parentId, x => x.JAVA_EXECUTION(job.Id));  // this part is not in my control!
_jobs.ContinueWith<MyJob>(parentId, x => x.ContinueExecution(job.Id));

我确实有一个REST服务(POST),我正在使用它作为开始工作的唯一方法 . 基本上,传入一个格式良好的有效负载(JSON),控制器从IoC容器中选择Job对象,决定它是哪种类型的工作(AdHoc,Recurring,Continuation等)然后执行适当的Hangfire调用以进入Enqueue它 . JAVA Endpoint也可以轻松调用此REST服务 .

[HttpPost]
public string Post()
{
    // safety checks removed for brevity...
    var command = new MinimumCommandModel(Request.Content.ReadAsStringAsync().Result);
    return GetPostPipeline().Handle(command).Id;
}

private static IRequestHandler<MinimumCommandModel, MinimumResultModel> GetPostPipeline()
{
    return new MediatorPipeline<MinimumCommandModel, MinimumResultModel>
        (new QueuePostMediator()
            , new IPreRequestHandler<MinimumCommandModel>[]
            {
                new PreJobLogger(),
                new PreJobExistsValidator(),
                new PreJobPropertiesValidator()
            }
            , new IPostRequestHandler<MinimumCommandModel, MinimumResultModel>[]
            {
                new PostJobLogger()
            }
        );
}

QueuePostMediator处理作业类型的细节(AdHoc等) . 我现在正在尝试编写延续处理程序,并且如何解决这个问题有点困难 . 我当然不想在Hangfire之外做任何阻止操作 . 当他们最初没有与原始作业的parentId连接时,我不确定如何“开始”另一个作业作为原始作业的延续 .

基本上,如果我可以,从工作内部,把工作搁置,直到外部刺激告诉篝火继续工作,我会是金 . 我还没有破解如何实现这个目标 .

思考?想法?

1 回答

  • 1

    好 . 我找到了一个让这个工作的黑客 .

    我正在使用策略模式来运行每个作业的不同部分 . 我有一个名为Handoff的JobStatus,现在这样做:

    public class Processing : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
    {
        public Processing(JobPingPong job) : base(job, JobStatus.Processing) {}
    
        public void Handle()
        {
            JobInfo.JobStatus = JobStatus.ExtProcessing;
            JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
            Payload.PostToQueueText(@"http://localhost:8080/api/clone");
    
            // Pause the current job (this is the parent job) so the outside web service has a chance to complete...
            var enqueuedIn = new TimeSpan(0, 6, 0, 0);  // 6 hours out...
            JobPutOnHold(JobInfo.HangfireJobId, enqueuedIn);
    
            // The next status to be executed upon hydration...
            JobInfo.JobStatus = JobStatus.Complete;
            Job.CachePut();
    
            // Signal the job executor that this job is "done" due to an outside process needing to run...
            JobInfo.JobStatus = JobStatus.Handoff;
        }
    }
    public void JobPutOnHold(string jobId, TimeSpan enqueuedIn)
    {
        var jobClient = new BackgroundJobClient();
        jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
    }
    

    现在,在策略 Actuator 中我可以这样做:

    public string Execute(IServerFilter jobContext, IJobCancellationToken cancellationToken)
    {
        while (Payload.JobInfo.JobStatus != JobStatus.Done)
        {
            cancellationToken?.ThrowIfCancellationRequested();
            var jobStrategy = new JobExecutorStrategy<TPayload>(Executors);
            Payload = jobStrategy.Execute(Payload);
    
            if (Payload.JobInfo.JobStatus == JobStatus.Handoff)
                break;
        }
        return PayloadAsString;
    }
    

    作业的第二部分与第一部分相同,但是从具有ExtComplete状态的外部服务进入,允许作业根据外部世界(存储在DB中)的结果执行后处理 . 像这样:

    public class ExtComplete : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
    {
        public ExtComplete(JobPingPong job) : base(job, JobStatus.ExtComplete) { }
    
        public void Handle()
        {
            // do post processing here...
            Payload.Tokens = null;
            JobInfo.JobStatus = JobStatus.Complete;
            if (JobInfo.HangfireJobId != JobContext.JobId || JobInfo.HangfireParentJobId == JobInfo.HangfireJobId)
            {
                JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
                JobInfo.HangfireJobId = JobContext.JobId;
            }
    
            // Enqueue the previous (parent) job so it can complete...
            JobExecuteNow(JobInfo.HangfireParentJobId);
        }
    }
    public void JobExecuteNow(string jobId)
    {
        var enqueuedIn = new TimeSpan(0, 0, 0, 15);
        var jobClient = new BackgroundJobClient();
        jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
    }
    

    最终,时间将由配置驱动,但是现在我将其设置为让第一个作业在15秒内执行 .

    我遇到的唯一挑战是在进行任何处理之前,进入的作业有效负载是原始有效负载 . 这就是为什么你看到上面的“缓存” . 当作业重新启动时,我检查是否存在该Hangfire JobId的缓存,如果存在,则从缓存加载最后一个已知有效负载,然后允许执行程序继续其快乐方式 .

    到目前为止工作得很好 .

    注意:我仍在尝试学习如何在Hangfire中更改/注入命令链和状态对象,以使其更适合于hangfire . 我们有一份工作可以拨打十几个外线电话 . 目前,运行大约需要12个小时 .

相关问题