首页 文章

在USQL中编写自定义提取程序以跳过包含编码问题的行

提问于
浏览
2

我有一大堆数据,涵盖了几百个文件 . 显然,它主要是UTF-8,但显然有些字符无效) . 根据https://msdn.microsoft.com/en-us/library/azure/mt764098.aspx,如果存在编码错误,则无论将silent标志设置为true(都只是跳过错误的行),都会发生运行时错误 .

因此,我需要编写一个自定义提取器 . 我在https://blogs.msdn.microsoft.com/data_otaku/2016/10/27/a-fixed-width-extractor-for-azure-data-lake-analytics/编写了一个很大程度上简化版本的例子,因为它只需要一行,用分隔符拆分它,然后只返回try块中的值 . 如果有任何例外,我只是处理它们并继续前进 .

不幸的是,我在USQL脚本本身实际上引用了这个提取器 . 当我按照上面链接的指导,它建议在另一个程序集中编写逻辑,构建它,在ADLS数据库/程序集中注册它,然后通过脚本顶部的 REFERENCE ASSEMBLY MyExtractors; 包含它(因为这是使用的命名空间) . 在下面的Using语句中,我用 USING new SimpleExtractor(); 调用它 . 如果我这样做,在针对 type or namespace cannot be found 的ADLS服务运行脚本时出现错误 . 另外,如果我试图更精确并在using语句中使用 USING new MyExtractors.SimpleExtractor(); ,它会产生相同的错误,引用上面的USING语句 .

然后我在https://azure.microsoft.com/en-us/documentation/articles/data-lake-analytics-u-sql-develop-user-defined-operators/的旧源中找到了其他文档,它描述了在代码隐藏文件中做同样的事情 . 我删除了单独的程序集并将逻辑复制到该文件中的类中 . 步骤#6中的示例未显示任何 REFERENCE ASSEMBLY 语句,但再次,当我运行它时,我收到 type or namespace name cannot be found 的错误 .

查看最新的发行说明,希望这里有些东西过时,我唯一看到的是如果我使用 USING 语句,我需要在自定义代码的程序集之前引用自定义代码的程序集(如第一次尝试)实际使用它,我是 .

任何人都可以提供一些关于如何在USQL中正确引用UDO的指导,或者指示如何让运行时处理静默编码异常(并且只是跳过它们)?

以下是我的逻辑在提取器本身中的样子:

using System.Collections.Generic;
using System.IO;
using System.Text;
using Microsoft.Analytics.Interfaces;

namespace Utilities
{
    [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
    public class ModifiedTextExtractor : IExtractor
    {
        //Contains the row
        private readonly Encoding _encoding;
        private readonly byte[] _row_delim;
        private readonly char _col_delim;

        public ModifiedTextExtractor()
        {
            _encoding = Encoding.UTF8;
            _row_delim = _encoding.GetBytes("\r\n");
            _col_delim = '\t';
        }

        public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
        {
            //Read the input line by line
            foreach (var current in input.Split(_row_delim))
            {
                using (var reader = new StreamReader(current, this._encoding))
                {
                    var line = reader.ReadToEnd().Trim();

                    //If there are any single or double quotes in the line, escape them
                    line = line.Replace(@"""", @"\""");

                    var count = 0;

                    //Split the input by the column delimiter
                    var parts = line.Split(_col_delim);

                    foreach (var part in parts)
                    {
                        output.Set<string>(count, part);
                        count += 1;
                    }
                }
                yield return output.AsReadOnly();
            }
        }
    }
}

以及我如何尝试在USQL语句中使用它(在将其注册为程序集后)的片段:

REFERENCE ASSEMBLY [Utilities];

CREATE VIEW MyView AS ...
USING new Utilities.ModifiedTextExtractor();

谢谢!

2 回答

  • 2

    您遇到的问题是VIEWs无法引用自定义代码 . 在U-SQL中,所有对象都需要包含它们的上下文规范(例如它们体内引用的程序集)(这使得对象更加自包含,并避免了在对象的用户不知情的情况下拉出可能很长的依赖项的问题) .

    您需要做的是将VIEW转换为表值函数:

    CREATE FUNCTION MyFunct(/* optional parameters */) RETURNS @res AS
    BEGIN
      REFERENCE ASSEMBLY [Utilities];
      @res = EXTRACT ... USING new Utilities.ModifiedTextExtractor();
    END;
    

    然后按如下方式调用该函数(注意,您需要在 SELECT 语句中提供行集别名):

    @data = SELECT ... FROM MyFunct() AS f WHERE ...;
    

    或者如果您不想应用投影或过滤器:

    @data = MyFunct();
    

    像视图一样,将内联表值函数 .

  • 0

    解决此问题的另一种方法是使用支持拒绝行的Azure SQL数据仓库和Polybase .

    1)在ADW上为外部文件创建外部表:

    CREATE EXTERNAL TABLE ext.mycsv (
        colA INT NOT NULL,
        colB INT NOT NULL,
        colC INT NOT NULL
    )
    WITH (
        DATA_SOURCE = eds_mycsv,
        LOCATION = N'/myblobstorage/',
        FILE_FORMAT = eff_csv,
        REJECT_TYPE = VALUE,
        REJECT_VALUE = 1
    )
    

    外部表可以指向sinlge文件或目录(多个文件具有与我的示例中相同的结构) . reject_value为1将允许一行失败而不会使整个作业失败 . 这也可以是一个百分比,即“允许3%的行失败而不会使整个负载失败 . 该语句还将为您提供有关失败行的信息 . 阅读有关 REJECT_TYPEREJECT_VALUE here的更多信息 .

    关于ADW的另一个好处是它可以在你不使用它时暂停 .

    2)在ADW中创建一个内部表来实现它,例如

    CREATE TABLE dbo.mycsv
    WITH 
    (   
        CLUSTERED COLUMNSTORE INDEX,
        DISTRIBUTION = ROUND_ROBIN
    )
    AS
    SELECT * FROM ext.mycsv;
    

    3)使用U-SQL在Azure Data Lake Analytics(ADLA)中创建外部表,使用外部数据源“查询其所在的数据”,即在仓库中 .

    // Create external table which is in SQL DW
    CREATE EXTERNAL TABLE IF NOT EXISTS adlaExt.mycsv
    (
        colA        int,
        colB        int,
        colC        int
    )
    FROM ds_adw LOCATION "dbo.mycsv";
    

    4)在U-SQL中查询外部表,例如:

    // Query external table
    @e =
        SELECT *
        FROM dbo.mycsv;
    
    
    // Join with internal table
    @q =
        SELECT a.*, b.someColumn
        FROM @e AS a
                INNER JOIN
                    dbo.someOtherTable AS b
                ON a.colA == b.n_colA;
    
    
    // Output it
    OUTPUT @q TO "/output/output.csv"
    USING Outputters.Csv();
    

    可以选择将其导入ADLA . Jorg Klein在ADLA here的联合查询设置上有一篇很棒的博客文章 .

    恕我直言,这比使用本机Azure组件创建自定义extactor更安全 . Polybase尚不支持ADLA,但几乎肯定会在未来的某个时刻支持,此时可以简化设计 .

相关问题