我是使用Jupyter Notebook的PySpark和AWS Sagemaker的新手 . 我知道如何编写SQL语句来回答我的问题 . 此代码段应该:1 . 在第2年提取我的数据集(CDC死亡数据-in CSV)中的可用死亡处置方法 . 按年度计算每个处置的频率

我能够在MySQL数据库中的同一数据集上运行SQL语句 . 但是一旦我将查询添加到我的PySpark代码中,我得到了 ParseError 请,请参阅下面的错误 .

我该如何解决此错误?如果我想用输出创建图形/图表,我该怎么做呢?谢谢

df.registerTempTable("data")
methods = sqlContext.sql("\
    SELECT current_data_year AS Year, \
        CASE method_of_disposition \
            WHEN 'C' THEN 'Cremation' \
            WHEN 'B' THEN 'Burial' \
            WHEN 'D' THEN 'D' \
            WHEN 'E' THEN 'E' \
            WHEN 'O' THEN 'O' \
            WHEN 'R' THEN 'R' \
            WHEN 'U' THEN 'Unknown' \
            END AS 'Method of Disposition', \
        COUNT(method_of_disposition) AS Count \
        FROM data \
        GROUP BY current_data_year, method_of_disposition\
    ").show()

我的新产品

+----+-------------------+-------+
|Year|MethodofDisposition|  Count|
+----+-------------------+-------+
|   0|               null|     10|
|2005|              Other|   2199|
|2005|           Donation|   4795|
|2005|                  E|  21247|
|2005|     RemovedFromUSA|  31954|
|2005|          Cremation| 350018|
|2005|             Burial| 553202|
|2005|            Unknown|1489091|
|2006|              Other|   2252|
|2006|           Donation|   6883|
|2006|                  E|  23412|
|2006|     RemovedFromUSA|  40870|
|2006|          Cremation| 423282|
|2006|             Burial| 667169|
|2006|            Unknown|1266857|
|2007|              Other|   3119|
|2007|           Donation|   8719|
|2007|                  E|  26139|
|2007|     RemovedFromUSA|  41411|
|2007|          Cremation| 472220|
|2007|             Burial| 725666|
|2007|            Unknown|1151069|
|2008|              Other|   5511|
|2008|           Donation|  10981|
|2008|                  E|  31913|
|2008|     RemovedFromUSA|  44713|
|2008|          Cremation| 579827|
|2008|             Burial| 866384|
|2008|            Unknown| 937482|
|2009|              Other|   3688|
|2009|           Donation|  12011|
|2009|                  E|  30344|
|2009|     RemovedFromUSA|  45451|
|2009|          Cremation| 599202|
|2009|             Burial| 802305|
|2009|            Unknown| 948218|
|2010|              Other|   3782|
|2010|           Donation|  15208|
|2010|                  E|  32807|
|2010|     RemovedFromUSA|  47899|
|2010|          Cremation| 706224|
|2010|            Unknown| 760192|
|2010|             Burial| 906430|
|2011|              Other|   5169|
|2011|           Donation|  17450|
|2011|                  E|  33847|
|2011|     RemovedFromUSA|  47199|
|2011|            Unknown| 685325|
|2011|          Cremation| 780480|
|2011|             Burial| 950372|
|2012|              Other|   6649|
|2012|           Donation|  20790|
|2012|                  E|  35110|
|2012|     RemovedFromUSA|  52896|
|2012|            Unknown| 440569|
|2012|          Cremation| 898222|
|2012|             Burial|1093628|
|2013|              Other|   6962|
|2013|           Donation|  21653|
|2013|                  E|  36949|
|2013|     RemovedFromUSA|  53678|
|2013|            Unknown| 395080|
|2013|          Cremation| 973768|
|2013|             Burial|1113362|
|2014|              Other|   7871|
|2014|           Donation|  24004|
|2014|                  E|  39321|
|2014|     RemovedFromUSA|  59884|
|2014|            Unknown| 242963|
|2014|          Cremation|1094292|
|2014|             Burial|1162836|
|2015|              Other|  11729|
|2015|           Donation|  27870|
|2015|                  E|  40880|
|2015|     RemovedFromUSA|  71744|
|2015|            Unknown|  74050|
|2015|          Cremation|1244297|
|2015|             Burial|1247628|
+----+-------------------+-------+

错误信息

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/anaconda3/envs/python3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:

Py4JJavaError: An error occurred while calling o19.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input ''Method of Disposition'' expecting {<EOF>, ',', 'FROM', 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 213)

== SQL ==
SELECT current_data_year AS Year, CASE method_of_disposition WHEN 'C' THEN 'Cremation' WHEN 'B' THEN 'Burial' WHEN 'D' THEN 'D' WHEN 'E' THEN 'E' WHEN 'O' THEN 'O' WHEN 'R' THEN 'R' WHEN 'U' THEN 'Unknown' END AS 'Method of Disposition', COUNT(method_of_disposition) AS Count FROM data GROUP BY current_data_year, method_of_disposition
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:68)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:632)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)


During handling of the above exception, another exception occurred:

ParseException                            Traceback (most recent call last)
<ipython-input-7-f99c8a5b941c> in <module>()
      1 #Grouping and counting Cremation vs Burial by Year
      2 df.registerTempTable("data")
----> 3 sqlContext.sql("SELECT current_data_year AS Year, CASE method_of_disposition WHEN 'C' THEN 'Cremation' WHEN 'B' THEN 'Burial' WHEN 'D' THEN 'D' WHEN 'E' THEN 'E' WHEN 'O' THEN 'O' WHEN 'R' THEN 'R' WHEN 'U' THEN 'Unknown' END AS 'Method of Disposition', COUNT(method_of_disposition) AS Count FROM data GROUP BY current_data_year, method_of_disposition").show()

~/anaconda3/envs/python3/lib/python3.6/site-packages/pyspark/sql/context.py in sql(self, sqlQuery)
    382         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    383         """
--> 384         return self.sparkSession.sql(sqlQuery)
    385 
    386     @since(1.0)

~/anaconda3/envs/python3/lib/python3.6/site-packages/pyspark/sql/session.py in sql(self, sqlQuery)
    601         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    602         """
--> 603         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    604 
    605     @since(2.0)

~/anaconda3/envs/python3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

~/anaconda3/envs/python3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     72             if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
---> 73                 raise ParseException(s.split(': ', 1)[1], stackTrace)
     74             if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '):
     75                 raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)

ParseException: "\nextraneous input ''Method of Disposition'' expecting {<EOF>, ',', 'FROM', 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 213)\n\n== SQL ==\nSELECT current_data_year AS Year, CASE method_of_disposition WHEN 'C' THEN 'Cremation' WHEN 'B' THEN 'Burial' WHEN 'D' THEN 'D' WHEN 'E' THEN 'E' WHEN 'O' THEN 'O' WHEN 'R' THEN 'R' WHEN 'U' THEN 'Unknown' END AS 'Method of Disposition', COUNT(method_of_disposition) AS Count FROM data GROUP BY current_data_year, method_of_disposition\n---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^\n"

示例MYSQL OUTPUT