首页 文章

使用Zeppelin Spark 2.0和Pyspark连接到AWS Redshift

提问于
浏览
1

我需要在Zeppelin中将Redshift数据读入数据帧 . 在过去的几个月里,我一直在AWS上通过Zeppelin使用Spark 2.0来成功打开csv和json S3文件 .

我以前能够使用以下代码在Spark EM 1.6R(可能是1.6.1)上从Zeppelin连接到Redshift,使用此代码:

%pyspark

from pyspark.sql import SQLContext, Row
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

#Load the data
aquery = "(SELECT serial_number, min(date_time) min_date_time from schema.table where serial_number in ('abcdefg','1234567') group by serial_number) as minDates"

dfMinDates = sqlContext.read.format('jdbc').options(url='jdbc:postgresql://dadadadaaaredshift.amazonaws.com:5439/idw?tcpKeepAlive=true&ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory?user=user&password=password', dbtable=aquery).load()
dfMinDates.show()

它起作用了 . 那是2016年的夏天 .

从那时起我就不需要它,现在AWS拥有Spark 2.0 .

新的语法是

myDF = spark.read.jdbc是这样的:

%pyspark

aquery = "(SELECT serial_number, min(date_time) min_date_time from schema.table where serial_number in ('abcdefg','1234567') group by serial_number) as minDates"

dfMinDates = spark.read.jdbc("jdbc:postgresql://dadadadaaaredshift.amazonaws.com:5439/idw?tcpKeepAlive=true&ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory?user=user&password=password", dbtable=aquery).load()
dfMinDates.show()

但我得到这个错误:

Py4JJavaError:调用o119.jdbc时发生错误 . :java.sql.SQLException:在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils $$ anonfun $ 2.apply(JdbcUtils . )java.sql.DriverManager.getDriver(DriverManager.java:315)上没有合适的驱动程序 . scala:54)在org.apache的scala.Option.getOrElse(Option.scala:121)的org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils $$ anonfun $ 2.apply(JdbcUtils.scala:54) .spark.sql.execution.datasources.jdbc.JdbcUtils $ .createConnectionFactory(JdbcUtils.scala:53)org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD $ .resolveTable(JDBCRDD.scala:123)at org位于org.apache.spark.sql的org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:237)的.apache.spark.sql.execution.datasources.jdbc.JDBCRelation . (JDBCRelation.scala:117) . Data.merameReader.jdbc(DataFrameReader.scala:159)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.j) ava:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) )py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)at py4j.commands.CallCommand.execute(CallCommand.java:79)py4j.GatewayConnection.run (GatewayConnection.java:211)java.lang.Thread.run(Thread.java:745)(,Py4JJavaError(调用o119.jdbc时发生错误 . \ n',JavaObject id = o121),)

我研究了Spark 2.0文档,发现了这个:

JDBC驱动程序类必须对客户端会话和所有执行程序上的原始类加载器可见 . 这是因为Java的DriverManager类进行了安全检查,导致它忽略了当打开连接时原始类加载器不可见的所有驱动程序 . 一种方便的方法是修改所有工作节点上的compute_classpath.sh以包含驱动程序JAR .

我不知道如何实现这一点,并从各种帖子,一些博客和stackoverflow中的一些帖子做了更多的阅读,并发现:

spark.driver.extraClassPath = org.postgresql.Driver

我在Zeppelin的Interpreter设置页面中这样做了,但我仍然得到了同样的错误 .

我试图添加一个Postgres解释器,我不确定我做得对(因为我不确定是否将它放在Spark解释器或Python解释器中),我选择了Spark解释器 . 现在,Postgres解释器也具有与Spark解释器相同的设置,这可能无关紧要,但我仍然得到相同的错误 .

在Spark 1.6中,我只是不记得经历所有这些麻烦 .

作为一个实验,我用Spark 1.6.2启动了一个EMR集群,并尝试了以前工作的旧代码,并得到了与上面相同的错误!

Zeppelin网站有Postgres覆盖,但他们的信息看起来像代码而不是如何设置解释器,所以我不知道如何使用它 .

我没有想法和参考 .

任何建议都非常感谢!

1 回答

相关问题