Home Articles

将CSV文件(包含空字符串和重复项)导入DynamoDB

Asked
Viewed 1552 times
3

我有一个CSV文件,我正在尝试导入到Amazon DynamoDB . 所以我将它上传到S3,设置一个EMR集群,并创建一个这样的外部表:

hive> CREATE EXTERNAL TABLE s3_table_myitems (colA BIGINT, colB STRING, colC STRING, colD DOUBLE, colE DOUBLE, colF STRING, colG STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES ('serialization.null.format'='""')
    STORED AS TEXTFILE
    LOCATION 's3://bucketname/dirname/'
    TBLPROPERTIES ('skip.header.line.count'='1');

CSV中的任何列都可能为空,但DynamoDB无法处理空字符串(“ com.amazonaws.AmazonServiceException: One or more parameter values were invalid: An AttributeValue may not contain an empty string ”) .

这是Amazon says

我们将在以后的版本中考虑这个可选的“忽略空字符串”行为 . ...作为一种解决方法,您可以...将空属性值转换为NULL . 例如,您可以...使用更复杂的SELECT表达式将空字符串转换为其他字符串,包括将它们设置为NULL .

所以这就是我想出来的,但它看起来很难看:

hive> INSERT INTO TABLE ddb_tbl_ingredients
    SELECT
    regexp_replace(colA, '^$', 'NULL'),
    regexp_replace(colB, '^$', 'NULL'),
    regexp_replace(colC, '^$', 'NULL'),
    regexp_replace(colD, '^$', 'NULL'),
    regexp_replace(colE, '^$', 'NULL'),
    regexp_replace(colF, '^$', 'NULL'),
    regexp_replace(colG, '^$', 'NULL')
    FROM s3_table_ingredients;

有没有更好的解决方案来解决整体问题(没有预处理CSV),或者至少有更好的 SELECT 语法?


Edit :我最终还要处理重复项(“ com.amazonaws.AmazonServiceException: Provided list of item keys contains duplicates ”) .

对于后人来说,在这里,我喜欢听到一种更好的方式,无论是美学还是表现 . 这个任务看起来很简单("importing a CSV file into DynamoDB"),但到目前为止,这需要几个小时:P

# source
hive> CREATE EXTERNAL TABLE s3_table_myitems (colA STRING, colB STRING, colC DOUBLE, colD DOUBLE, colE STRING, colF STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES ('serialization.null.format'='""')
    STORED AS TEXTFILE
    LOCATION 's3://bucketname/dirname/'
    TBLPROPERTIES ('skip.header.line.count'='1');

# destination
hive> CREATE EXTERNAL TABLE ddb_tbl_myitems (colA STRING, colB STRING, colC DOUBLE, colD DOUBLE, colE STRING, colF STRING)
    STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
    TBLPROPERTIES ("dynamodb.table.name" = "myitems",
        "dynamodb.column.mapping" = "colA:colA,colB:colB,colC:colC,colD:colD,colE:colE,colF:colF");

# remove dupes - http://stackoverflow.com/a/34165762/594211
hive> CREATE TABLE tbl_myitems_deduped AS
    SELECT colA, min(colB) AS colB, min(colC) AS colC, min(colD) AS colD, min(colE) AS colE, min(colF) AS colF
    FROM (SELECT colA, colB, colC, colD, unit, colF, rank() OVER
        (PARTITION BY colA ORDER BY colB, colC, colD, colE, colF)
        AS col_rank FROM s3_table_myitems) t
    WHERE t.col_rank = 1
    GROUP BY colA;

# replace empty strings with placeholder 'NULL'
hive> CREATE TABLE tbl_myitems_noempty AS
    SELECT colA,
    regexp_replace(colB, '^$', 'NULL') AS colB,
    regexp_replace(colC, '^$', 'NULL') AS colC,
    regexp_replace(colD, '^$', 'NULL') AS colD,
    regexp_replace(colE, '^$', 'NULL') AS colE,
    regexp_replace(colF, '^$', 'NULL') AS colF
    FROM tbl_myitems_deduped
    WHERE LENGTH(colA) > 0;

# ...other preprocessing here...

# insert to DB
hive> INSERT INTO TABLE ddb_tbl_myitems
    SELECT * FROM tbl_myitems_noempty;

注意: colA 是分区键 .

1 Answer

  • 0

    您可以向create table语句添加其他表属性,以将任何指定的字符视为空值 .

    TBLPROPERTIES('serialization.null.format'='');
    

Related