我一直在尝试从couchbase读取数据,但由于身份验证问题而无法读取数据 .
import com.couchbase.client.java.document.JsonDocument
import org.apache.spark.sql.SparkSession
import com.couchbase.spark._
object SparkRead {
def main(args: Array[String]): Unit = {
// The SparkSession is the main entry point into spark
val spark = SparkSession
.builder()
.appName("KeyValueExample")
.master("local[*]") // use the JVM as the master, great for testing
.config("spark.couchbase.nodes", "***********") // connect to couchbase on hostname
.config("spark.couchbase.bucket.beer-sample","") // open the travel-sample bucket with empty password
.config("spark.couchbase.username", "couchdb")
.config("spark.couchbase.password", "******")
.config("spark.couchbase.connectTimeout","30000")
.config("spark.couchbase.kvTimeout","10000")
.config("spark.couchbase.socketConnect","10000")
.getOrCreate()
spark.sparkContext
.couchbaseGet[com.couchbase.client.java.document.JsonDocument](Seq("airline_10123")) // Load documents from couchbase
.collect() // collect all data from the spark workers
.foreach(println) // print each document content
}
}
Below is the Build File
name := "KafkaSparkCouchReadWrite"
organization := "my.clairvoyant"
version := "1.0.0-SNAPSHOT"
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0",
"com.couchbase.client" %% "spark-connector" % "2.1.0",
"org.glassfish.hk2" % "hk2-utils" % "2.2.0-b27",
"org.glassfish.hk2" % "hk2-locator" % "2.2.0-b27",
"javax.validation" % "validation-api" % "1.1.0.Final",
"org.apache.kafka" %% "kafka" % "0.11.0.0",
"com.googlecode.json-simple" % "json-simple" % "1.1").map(_.excludeAll(ExclusionRule("org.glassfish.hk2"),ExclusionRule("javax.validation")))
ERROR LOG
17/12/12 15:18:35 INFO BlockManager:初始化BlockManager:BlockManagerId(驱动程序,192.168.33.220,52402,无)17/12/12 15:18:35 INFO SharedState:仓库路径为'file:/ Users /桑帕特/桌面/ GitClairvoyant / cpdl3-POC / KafkaSparkCouchReadWrite /火花仓库/” . 17/12/12 15:18:35 INFO CouchbaseCore:CouchbaseEnvironment:{sslEnabled = false,sslKeystoreFile ='null',sslKeystorePassword = false,sslKeystore = null,bootstrapHttpEnabled = true,bootstrapCarrierEnabled = true,bootstrapHttpDirectPort = 8091,bootstrapHttpSslPort = 18091, bootstrapCarrierDirectPort = 11210,bootstrapCarrierSslPort = 11207,ioPoolSize = 8,computationPufferSize = 8,responseBufferSize = 16384,requestBufferSize = 16384,kvServiceEndpoints = 1,viewServiceEndpoints = 12,queryServiceEndpoints = 12,searchServiceEndpoints = 12,ioPool = NioEventLoopGroup,kvIoPool = null,viewIoPool = null,searchIoPool = null,querySoPool = null,coreScheduler = CoreScheduler,memcachedHashingStrategy = DefaultMemcachedHashingStrategy,eventBus = DefaultEventBus,packageNameAndVersion = couchbase-java-client / 2.4.2(git:2.4.2,core:1.4.2),dcpEnabled = false ,retryStrategy = BestEffort,maxRequestLifetime = 75000,retryDelay = ExponentialDelay {growBy 1.0 MICROSECONDS,2的幂; lower = 100,upper = 100000},reconnectDelay = ExponentialDelay {growBy 1.0 MILLISECONDS,2的幂; lower = 32,upper = 4096},observeIntervalDelay = ExponentialDelay {growBy 1.0 MICROSECONDS,2的幂; lower = 10,upper = 100000},keepAliveInterval = 30000,autoreleaseAfter = 2000,bufferPoolingEnabled = true,tcpNodelayEnabled = true,mutationTokensEnabled = false,socketConnectTimeout = 1000,dcpConnectionBufferSize = 20971520,dcpConnectionBufferAckThreshold = 0.2,dcpConnectionName = dcp / core-io,callbacksOnIoPool = false,disconnectTimeout = 25000,requestBufferWaitStrategy = com.couchbase.client.core.env.DefaultCoreEnvironment$2 @7b7b3edb,queryTimeout = 75000,viewTimeout = 75000,kvTimeout = 2500,connectTimeout = 5000,dnsSrvEnabled = false} 17/12/12 15 :18:37 WARN endpoints :[null] [KeyValueEndpoint]:身份验证失败 . 17/12/12 15:18:37 INFO endpoints :[null] [KeyValueEndpoint]:从Channel通知为非活动状态,尝试重新连接 . 17/12/12 15:18:37 WARN ResponseStatusConverter:带有协议HTTP的未知ResponseStatus:401 17/12/12 15:18:37 WARN ResponseStatusConverter:带有协议HTTP的未知ResponseStatus:401线程“main”com.couchbase中的异常 . client.java.error.InvalidPasswordException:存储桶“beer-sample”的密码不匹配 . at com.couchbase.client.jient.CouchbaseAsyncCluster $ OpenBucketErrorHandler.call(CouchbaseAsyncCluster.java:601)at com.couchbase.client.jouch.CouchbaseAsyncCluster $ OpenBucketErrorHandler.call(CouchbaseAsyncCluster.java:584)at rx.internal.operators.OperatorOnErrorResumeNextViaFunction $ 4.onError(OperatorOnErrorResumeNextViaFunction.java:140)at rx.internal.operators.OnSubscribeMap $ MapSubscriber.onError(OnSubscribeMap.java:88)
1 回答
示例代码:
POM.xml