首页 文章

Python SDK将U-SQL作业部署到Azure时没有明确原因的异常

提问于
浏览
1

我使用python SDK使用adlaJobClient向Azure提交作业,我使用JINJA2构建了大约30个动态USQL,我在列表中填充,然后逐个使用adlaJobClient将它们推送到Azure,我面临的问题是在随机数量的成功部署之后,python在程序控制台中抛出异常而没有任何进一步的解释 . 在Azure中也没有U-SQL作业失败的实例,下面提到的是python中的错误堆栈跟踪,当我运行相同的U- SQL查询,我正在动态生成...执行停止它在Azure中运行(手动)

***** starting query number **** 24
Job is not yet done, waiting for 3 seconds. Current state: Compiling
Job is not yet done, waiting for 3 seconds. Current state: Compiling
Job is not yet done, waiting for 3 seconds. Current state: Compiling
Job is not yet done, waiting for 3 seconds. Current state: starting
Job is not yet done, waiting for 3 seconds. Current state: starting
An exception has occurred, use %tb to see the full traceback.
SystemExit: 1
for script in sql_query_list:
    jobId = str(uuid.uuid4())

    jobResult = adlaJobClient.job.create(adla,jobId,JobInformation(name='Submit ADLA Job '+jobId,type='USql',properties=USqlJobProperties(script=script)))

    try:

        while(jobResult.state != JobState.ended):
            print('Job is not yet done, waiting for 3 seconds. Current state: ' + jobResult.state.value)
            time.sleep(3)
            jobResult = adlaJobClient.job.get(adla, jobId)

        print(' ******* JOB ID  ********',jobId)
        print("****QUERY no FINISHED *****",sql_query_list.index(script))
        print ('**** JOB ID RESULT: ****** ' + jobResult.result.value)

    except Exception as e:
        raise ValueError
        print ("xxxxxx JOB SUBMISSION TO ADLA FAILED xxxxxxx")
        print(e)

1 回答

  • 0

    选项A:手动登录Portal

    检查失败作业的最简单方法是登录Azure门户(http://portal.azure.com),导航到Data Lake Analytics帐户,然后单击"View all jobs" . 从作业列表中,您可以导航到作业并查看具有特定错误消息的输出 . (继续阅读自动化方法 . )

    选项B:自动Python作业

    您可以使用 job_result.properties.errors 获取包含错误消息的作业属性 . 下面的代码示例将执行任何U-SQL作业错误的"pretty print",从而引发这些细节的异常 .

    解析错误信息:

    def get_pretty_error(error_message_obj):
        """
        Returns a string describing the USQL error.
        error_message_obj can be obtained via `job_result.error_message[0]`
        """
        err_info = error_message_obj.__dict__
        error_msgs = "=" * 80 + "\n"
        error_msgs += "=" * 6 + " ERROR: {}".format(err_info.pop("description", None)) + "\n"
        error_msgs += "=" * 80 + "\n"
        error_msgs += err_info.pop("details", None).replace("\\r\\n", "\n").replace("\\n", "\n").replace("\\t", "\t").rstrip() + "...\n"
        error_msgs += "=" * 80 + "\n"
        error_msgs += "Message:    {}\n".format(err_info.pop("message", None))
        error_msgs += "Severity:   {}\n".format(str(err_info.pop("severity", None)).upper())
        error_msgs += "Resolution: {}\n".format(err_info.pop("resolution", None))
        inner = err_info.pop("inner_error", None)
        for key in ["end_offset", "line_number", "start_offset", "source", "additional_properties"]:
            # ignore (don't print these)
            err_info.pop(key, None)
        err_info = {x: y for x, y in err_info.items() if y} # Remove empty keys
        error_msgs += "Addl. Info:\n\t{}\n".format(
            yaml.dump(err_info,
                      default_flow_style=True
                      ).replace("\\t", "\t").replace("\\n", "\n").replace("\n", "\n\t"))
        if inner:
            # If there's an inner error, concatenate that message as well recursively
            error_msgs += _get_pretty_error(inner, ordinal_text + " (B)")
        return error_msgs
    

    在等待工作中使用它:

    def wait_for_usql_job(adlaJobClient, adla_account, job_id):
        """Wait for completion, on error raise an exception (with specific details)"""
        print("Waiting for job ID '{}'".format(job_id))
        job_result = adlaJobClient.job.get(adla_account, job_id)
        while(job_result.state != JobState.ended):
            print('Job is not yet done, waiting for 3 seconds. Current state: ' + job_result.state.value)
            time.sleep(3)
            job_result = adlaJobClient.job.get(adla_account, job_id)
        job_properties = job_result.properties.__dict__
        detail_msg = (
            "\tCompilation Time: {}\n".format(job_properties.pop("total_running_time", None)) +
            "\tQueued Time:      {}\n".format(job_properties.pop("total_compilation_time", None)) +
            "\tExecution Time:   {}\n".format(job_properties.pop("total_queued_time", None)))
        print('Job completed with result: {}\n{}'
              .format(job_result.result.value, detail_msg))
        if job_result.result.value == "Succeeded":
            return job_result.result.value
        elif job_result.result.value == "Failed":
            error_msgs = ""
            for error in job_result.error_message:
                # Loop through errors and concatenate error messages
                error_msgs += get_pretty_error(error)
            raise Exception("Job execution failed for job_id '{}':\n{}"
                            .format(job_id, error_msgs))
    

    Notes:

    • 因为这些类没有在线记录,所以我使用了 __dict__ 属性来探索 JobPropertiesErrorInfo 对象的所有属性,处理任何空白或我不需要的属性并打印其余属性 .

    • 您可以选择重写此代码以显式调用这些属性,而无需使用 __dict__ .

    • 任何错误都可能有内部错误,这就是我写 get_pretty_error() 作为它自己的函数的原因 - 这样它就可以递归地调用它自己 .

相关问题