首页 文章

我们能否使用多个sparksessions来访问两个不同的Hive服务器

提问于
浏览
3

我有一个方案来比较来自两个独立的远程配置单元服务器的两个不同的表源和目标,我们能够使用两个 SparkSessions ,就像我在下面尝试的那样: -

val spark = SparkSession.builder().master("local")
  .appName("spark remote")
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
  .config("hive.metastore.uris", "thrift://192.168.175.160:9083")
  .enableHiveSupport()
  .getOrCreate()

SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

val sparkdestination = SparkSession.builder()
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
  .config("hive.metastore.uris", "thrift://192.168.175.42:9083")
  .enableHiveSupport()
  .getOrCreate()

我尝试使用 SparkSession.clearActiveSession() and SparkSession.clearDefaultSession() ,但它不起作用,抛出以下错误:

Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

有没有其他方法我们可以使用多个 SparkSessionsSparkContext 访问两个配置单元表 .

谢谢

2 回答

  • 3

    SparkSession getOrCreate method

    说明哪一点

    获取现有[[SparkSession]],或者,如果没有,则根据此构建器中设置的选项创建新的[[SparkSession]] . 此方法首先检查是否存在有效的线程局部SparkSession,如果是,则返回该值 . 然后检查是否存在有效的全局默认SparkSession,如果是,则返回该值 . 如果不存在有效的全局默认SparkSession,则该方法将创建新的SparkSession并将新创建的SparkSession指定为全局默认值 . 如果返回现有SparkSession,则此构建器中指定的配置选项将应用于现有SparkSession .

    这就是它返回第一次会话及其配置的原因 .

    请浏览docs以找出创建会话的其他方法..


    我正在研究<2火花版 . 所以我不确定如何创建新的会话与配置冲突完全..

    但是,这是有用的测试用例,即SparkSessionBuilderSuite.scala做那个 - DIY ..

    该测试用例中的示例方法

    test("use session from active thread session and propagate config options") {
        val defaultSession = SparkSession.builder().getOrCreate()
        val activeSession = defaultSession.newSession()
        SparkSession.setActiveSession(activeSession)
        val session = SparkSession.builder().config("spark-config2", "a").getOrCreate()
    
        assert(activeSession != defaultSession)
        assert(session == activeSession)
        assert(session.conf.get("spark-config2") == "a")
        assert(session.sessionState.conf == SQLConf.get)
        assert(SQLConf.get.getConfString("spark-config2") == "a")
        SparkSession.clearActiveSession()
    
        assert(SparkSession.builder().getOrCreate() == defaultSession)
        SparkSession.clearDefaultSession()
      }
    
  • 1

    我使用这种方式并使用Spark 2.1完美地工作

    val sc = SparkSession.builder()
                 .config("hive.metastore.uris", "thrift://dbsyz1111:10000")
                 .enableHiveSupport()
                 .getOrCreate()
    
    // Createdataframe 1 from by reading the data from hive table of metstore 1
    val dataframe_1 = sc.sql("select * from <SourcetbaleofMetaStore_1>")
    
    // Resetting the existing Spark Contexts
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()
    
    //Initialize Spark session2 with Hive Metastore 2
    val spc2 = SparkSession.builder()
                   .config("hive.metastore.uris", "thrift://dbsyz2222:10004")
                   .enableHiveSupport()
                   .getOrCreate()
    
    // Load dataframe 2 of spark context 1 into a new dataframe of spark context2, By getting schema and data by converting to rdd  API
    val dataframe_2 = spc2.createDataFrame(dataframe_1.rdd, dataframe_1.schema)
    
    dataframe_2.write.mode("Append").saveAsTable(<targettableNameofMetastore_2>)
    

相关问题