我有一个softmax文档分类器的实现,编写为spark应用程序 . 我有一套培训文件,一套培训文件的标签和一套测试文件 . 我的任务是使用在培训文档上训练的softmax分类器来预测测试文档的标签 . 我发现this是关于softmax回归和梯度体面过程的非常有用的教程 . 那是's what I'米的工作 . 训练数据是一个文件,每行文件一个文档 . 训练标签是一个文件,在文件的每一行上具有逗号分隔的标签列表,其对应于训练数据文件中相同行号上的文档 . 测试数据与训练数据相同 . 我正在使用RDD操作从头开始实现softmax回归的梯度下降 . 我知道这已经在spark机器学习库中实现了,我的实现并不是最有效的实现,但这主要是因为我对RDD和Spark的理解 .
我的代码如下:
X = sc.textFile(DATA_PATH)
Xtest = sc.textFile(TEST_DATA_PATH)
y = sc.textFile(LABEL_PATH)
ytest = sc.textFile(TEST_LABEL_PATH)
X.cache()
Xtest.cache()
y.cache()
ytest.cache()
#A little prepocessing.
#Each document might have several labels given as a comma separated list
#We're only interested in labels with the substring "CAT"
y = y.zipWithIndex()\
.map(lambda x: (x[1],x[0]))\
.flatMapValues(lambda x: x.split(","))\
.filter(lambda x: not x[1].find("CAT")==-1)
y.cache()
#We want to filter out any documents from the training set that don't have one of our labels
# y .keys() => did NON-DISTINCT
# '' .distinct() => did DISTINCT
# '' .map(...) => (did,1) # ** This is our "filter rdd" **
filterRDD = y.keys()\
.distinct()\
.map(lambda x: (x,1))
# Now we'll use the filterRDD to remove superfluous docs from the training set
# X .zipWithIndex() => (doc,did)
# '' .map(...) => (did,doc)
# '' .roj('') => (did,(doc,1)) for did in other
# OR
# (did,(None,1)) for did not in this
# '' .filter(...) => (did,(doc,1)) i.e. gets rid of unimportant documents
# '' .mapValues(...) => (did,doc)
X = X.zipWithIndex()\
.map(lambda x: swap(x))\
.rightOuterJoin(filterRDD)\
.filter(lambda x: not x[1][0]==None)\
.mapValues(lambda x: x[0])
#Some preprocessing on the test documents
#We'll lower case the words, strip them of punctuation (not including apostrophes, and remove that weird non-word
Xtest = Xtest.zipWithIndex()\
.map(lambda x : (x[1],x[0]))\
.flatMapValues(lambda x : x.split())\
.mapValues(lambda x: x.lower().replace(""","").strip(string.punctuation.replace("'","")))\
.distinct()
#For multinomial logistic regression we have to create a duplicate document for documents with multiple labels
# y .keys() => did values (not necessarilly unique)
# '' .map(...) => (did,1)
# '' .reduceByKey(add) => (did,count) the number of times each did appears in y
# '' .filter(...) => (did,count) for all did which appear multiple times in y
repeats = y.keys()\
.map(lambda x: (x,1))\
.reduceByKey(add)\
.filter(lambda x: x[1]>1)\
.mapValues(lambda x: x-1)
# Now that we know which documents have multiple labels, and how many labels each such document has,
#we can duplicate those documents and add them back into the fold
# X .join(repeats) => (did,(doc,count)) for each did which appears multiple times in y
# '' .flatMapValues(...) => creates count-1 many copies of each (did,doc) pair in the above rdd
XX = X.join(repeats)\
.flatMapValues(lambda x: [x[0]]*(x[1]))\
# X .union(XX) => (did,doc) where did is NO LONGER UNIQUE
# '' .sort(...) => (did,doc) in sorted order)
# '' .zipWithIndex() => ((did,doc),i)
# '' .map(...) => (i,doc) Note that i is now did'
# '' .flatMapValues(...) => (did',wid)
# '' .mapValues(...) => (did',wid') a little prepocessing
X = X.union(XX)\
.sortByKey()\
.zipWithIndex()\
.map(lambda x: (x[1],x[0][1]))\
.flatMapValues(lambda x : x.split())\
.mapValues(lambda x: x.lower().replace(""","").strip(string.punctuation.replace("'","")))\
.filter(lambda x: len(x[1])>1)
# We have to relabel the labels now, to correspond with the data
y = y.sortByKey()\
.zipWithIndex()\
.map(lambda x: (x[1],x[0][1]))
CONVERGED = False
m = X.keys().count()
alpha = .1
epsilon = .001
#Get the word counts for each document
# X .map(...) => ((did,wid),1)
# '' .rbk(add) => ((did,wid),count)
# '' .map(...) => (did,wid,count)
counts = X.map(lambda x: (x,1))\
.reduceByKey(add)\
.map(lambda x: (x[0][0],x[0][1],x[1]))
#The size of the vocabulary
B = counts.map(lambda x: x[1])\
.distinct()\
.count()
#add the bias word on...
# X .keys() => (wid) NOT UNIQUE
# '' .distinct() => (wid) UNIQUE
# '' .zwi() => (wid,i)
# '' .map(...) => (i,wid)
keys = X.keys()\
.distinct()\
.zipWithIndex()\
.map(lambda x: swap(x))
#create a biase word for each document
# sc .par => (B,...,B) (m many B words)
# '' .zwi => [(B,1),(B,2),...,(B,m)]
# '' .map(...) => [(1,B),(2,B),...,(m,B)]
biasWords = sc.parallelize(['B']*m)\
.zipWithIndex()\
.map(lambda x: swap(x))
#create a one count for each bias word
# sc .par => [1,...,1] (m many ones)
# '' .zwi => [(1,1),...,(1,m)]
# '' .map(...) => [(1,1),...,(m,1)]
ones = sc.parallelize(np.ones(m))\
.zipWithIndex()\
.map(lambda x: swap(x))
#Join the Bias words to the bias counts and to the documents, using the indices to join
# keys .join(...) => (i,(did,B))
# '' .join(...) => (i,((did,B),1))
# '' .values() => ((did,B),1)
# '' .map(...) => (did,B,1)
bias = keys.join(biasWords)\
.join(ones)\
.values()\
.map(lambda x: (x[0][0],x[0][1],x[1]))
#add the bias words into the counts data
counts = counts.union(bias)
##### ***** REGULARIZE THE COUNTS ****####
#First get the mean value for each word
# counts .map(...) => (wid,count)
# '' .rbk(...) => (wid,sum_wid(count)))
# '' .mapValues(...) => (wid, mu_wid)
mean = counts.map(lambda x: (x[1],x[2]))\
.reduceByKey(add)\
.mapValues(lambda x: x/B)
#Now get the standard deviation for each word
# counts .map(...) => (wid,count)
# '' .join(mean) => (wid,(count,mu_wid))
# '' .mapValues() => (wid, (count-mu_wid)^2)
# '' .rbk() => (wid, var_wid)
# '' .mapValues() => (wid, sd_wid)
sd = counts.map(lambda x: (x[1],x[2]))\
.join(mean)\
.mapValues(lambda x: (x[0]-x[1])*(x[0]-x[1]))\
.reduceByKey(add)\
.mapValues(lambda x: np.sqrt(x))
#Join together for easier manipulation
# mean .join(sd) => (wid,(mu_wid,sd_wid))
regs = mean.join(sd)
#Now regularize the counts as count = (count-mu)/sd
# counts .map(...) => (wid,(did,count))
# '' .join(regs) => (wid,((did,count),(mu_wid,sd_wid)))
# '' .map(...) =>
counts = counts.map(lambda x: (x[1],(x[0],x[2])))\
.join(regs)\
.map(lambda x: (x[1][0][0],x[0],(x[1][0][1]-x[1][1][0])/x[1][1][1]))
counts.cache()
#we need the initial population for theta = (lab,wid,w_wid)
labs = y.values().distinct()
words = X.values().distinct().union(sc.parallelize(['B']))
zeros = sc.parallelize([0.0])
#Building theta
# labs .cart(...) => (lab,wid) for each lab/wid pair
# '' .cart(...) => ((lab,wid),0) for each lab/wid pair
# '' .map(...) => (lab,wid,w_wid) where w_wid is initialized to zero
theta = labs.cartesian(words)\
.cartesian(zeros)\
.map(lambda x: (x[0][0],x[0][1],x[1]))
#gradient decent
i=0
J = 0
while not CONVERGED :
print("i: ",i)
#Get the dot product of every document with every row in theta
# theta .map(...) => (wid,(lab,w_wid))
# '' .join(...) => (wid,((lab,w_wid),(did,count)))
# '' .map(...) => ((did,lab),w_wid*count)
# '' .rbk(...) => ((did,lab),x^(did)*DOT*theta^(lab))
dots = theta.map(lambda x: (x[1],(x[0],x[2])))\
.join(counts.map(lambda x: (x[1],(x[0],x[2]))))\
.map(lambda x: ((x[1][1][0],x[1][0][0]),(x[1][0][1]*x[1][1][1])))\
.reduceByKey(add)
dots.cache()
#Now find the denominator for the predictions
# dots .map(...) => (did,x^(did)*DOT*theta^lab) for each row in theta
# '' .mapValues(...) => (did,exp(x^(did)*DOT*theta^lab))
# '' .rbk(add) => (did,Sum_lab(x^(did)*DOT*theta^lab))
denom = dots.map(lambda x: (x[0][0],x[1]))\
.mapValues(lambda x: np.exp(x))\
.reduceByKey(add)
#Now make our predictions
# dots .map(...) => (did,(lab,x^(did)*DOT*theta^lab)
# '' .join(...) => (did,((lab,x^(did)*DOT*theta^lab),denom))
# '' .map(...) => ((did,lab),exp(x^(did)*DOT*theta^lab)/denom)
h = dots.map(lambda x: (x[0][0],(x[0][1],x[1])))\
.join(denom)\
.map(lambda x: ((x[0],x[1][0][0]),(np.exp(x[1][0][1])/x[1][1])))
if i%10 == 0 :
# Calculate the loss
# we need ((did,lab),1) pairs for each did and each lab
yc = y.keys()\
.distinct()\
.cartesian(y.values().distinct())\
.map(lambda x: (x,1))
yc.cache()
# now we'll get ((did,lab),1) pairs for each (did,lab) in y
ycc = y.map(lambda x: (x,1))
ycc.cache()
# Now we'll use the two to calculate the loss
# yc .loj(ycc) => ((did,lab),(1,1)) when (did,lab) in ycc
# OR
# ((did,lab),(1,None)) when (did,lab) not in ycc
# '' .mapValues(...) => ((did,lab),1{y^did=lab})
# '' .join(h...) => ((did,lab),(1{y^did=lab},log(h(did,lab);theta)))
# '' .mapValues(...) => ((did,lab),(-1/m * 1{y^did=lab}*log(h(did,lab);theta)))
# '' .values() => -1/m*1{y^did=lab}*log(h(did,lab);theta) FOR EACH did,lab combo
# '' .reduce(add) => -1/m*sum_did[sum_lab[1{y^did=lab}*log(h(did,lab);theta)]]
J = yc.leftOuterJoin(ycc)\
.mapValues(lambda x: 1 if x[1]==1 else 0)\
.join(h.mapValues(lambda x: np.log(x)))\
.mapValues(lambda x: x[0]*x[1]*-1/m)\
.values()\
.reduce(add)
print("Cost:",J)
i+=1
#Now calculate the gradient
# y .map(...) => ((did,lab),1)
# h .loj(yy) => ((did,lab),(pred,1)) for (did,lab) in yy
# OR
# ((did,lab),(pred,None)) for (did,lab) not in yy
# '' .mapValues(...) => ((did,lab),1{y^did=lab}-pred) **NOTE: We'll refer to
# 1{y^did=lab}-pred
# as expr from now on**
# '' .map(...) => (did,(lab,expr))
# '' .join(counts) => (did,((lab,expr),(wid,count)))
# '' .map(...) => ((lab,wid),expr*count)
# '' .rbk(add) => ((lab,wid),sum_did(expr*count))
# '' .mapValues(...) => ((lab,wid),-1/m * sum_did(expr*count))
yy = y.map(lambda x: (x,1))
grad = h.leftOuterJoin(yy)\
.mapValues(lambda x: 1-x[0] if x[1]==1 else 0-x[0])\
.map(lambda x: (x[0][0],(x[0][1],x[1])))\
.join(counts.map(lambda x: (x[0],(x[1],x[2]))))\
.map(lambda x: ((x[1][0][0],x[1][1][0]),x[1][0][1]*x[1][1][1]))\
.reduceByKey(add)\
.mapValues(lambda x: -1/m * x )
#find the new theta value
# theta .map(...) => ((lab,wid),oldW)
# '' .join(grad) => ((lab,wid),(oldW,gradW))
# '' .mapValues() => ((lab,wid),new_weight)
thetaNew = theta.map(lambda x: ((x[0],x[1]),x[2]) )\
.join(grad)\
.mapValues(lambda x: x[0]-alpha*x[1])\
.map(lambda x: (x[0][0],x[0][1],x[1]))
theta = thetaNew
theta.cache()
if i == 200 : CONVERTED = True
#Now we'll make our predictions
#Recreate the counts map out of the Xtest instead of the X data
counts = Xtest.map(lambda x: (x,1))\
.reduceByKey(add)\
.map(lambda x: (x[0][0],x[0][1],x[1]))
#add the bias word on...
# X .keys() => (wid) NOT UNIQUE
# '' .distinct() => (wid) UNIQUE
# '' .zwi() => (wid,i)
# '' .map(...) => (i,wid)
keys = Xtest.keys()\
.distinct()\
.zipWithIndex()\
.map(lambda x: swap(x))
#Join the Bias words to the bias counts and to the documents, using the indices to join
# keys .join(...) => (i,(did,B))
# '' .join(...) => (i,((did,B),1))
# '' .values() => ((did,B),1)
# '' .map(...) => (did,B,1)
bias = keys.join(biasWords)\
.join(ones)\
.values()\
.map(lambda x: (x[0][0],x[0][1],x[1]))
#add the bias words into the counts data
counts = counts.union(bias)
#Get the dot product of every document with every row in theta
# theta .map(...) => (wid,(lab,w_wid))
# '' .join(...) => (wid,((lab,w_wid),(did,count)))
# '' .map(...) => ((did,lab),w_wid*count)
# '' .rbk(...) => ((did,lab),x^(did)*DOT*theta^(lab))
dots = theta.map(lambda x: (x[1],(x[0],x[2])))\
.join(counts.map(lambda x: (x[1],(x[0],x[2]))))\
.map(lambda x: ((x[1][1][0],x[1][0][0]),(x[1][0][1]*x[1][1][1])))\
.reduceByKey(add)
#Now find the denominator for the predictions
# dots .map(...) => (did,x^(did)*DOT*theta^lab) for each row in theta
# '' .mapValues(...) => (did,exp(x^(did)*DOT*theta^lab))
# '' .rbk(add) => (did,Sum_lab(x^(did)*DOT*theta^lab))
denom = dots.map(lambda x: (x[0][0],x[1]))\
.mapValues(lambda x: np.exp(x))\
.reduceByKey(add)
#Now make our predictions
# dots .map(...) => (did,(lab,x^(did)*DOT*theta^lab)
# '' .join(...) => (did,((lab,x^(did)*DOT*theta^lab),denom))
# '' .map(...) => ((did,lab),exp(x^(did)*DOT*theta^lab)/denom)
h = dots.map(lambda x: (x[0][0],(x[0][1],x[1])))\
.join(denom)\
.map(lambda x: (x[0],(x[1][0][0],np.exp(x[1][0][1])/x[1][1])))\
.reduceByKey(NBFun6)
print("preds:",h.collect())
上面的代码不会运行超过10次迭代 . 它逐渐变慢,在第9次迭代时慢慢爬行并在第10次迭代时耗尽内存 .
Traceback (most recent call last):
File "p1.py", line 411, in <module>
.join(counts.map(lambda x: (x[1],(x[0],x[2]))))\
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 1661, in join
return python_join(self, other, numPartitions)
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/join.py", line 53, in python_join
return _do_python_join(rdd, other, numPartitions, dispatch)
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/join.py", line 41, in _do_python_join
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 564, in union
self.getNumPartitions() == rdd.getNumPartitions()):
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 385, in getNumPartitions
return self._jrdd.partitions().size()
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/MacBot/anaconda/lib/python3.6/site-packages/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2495.partitions.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Integer.valueOf(Integer.java:832)
at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
at scala.collection.IndexedSeqOptimized$class.zipWithIndex(IndexedSeqOptimized.scala:103)
at scala.collection.mutable.ArrayOps$ofRef.zipWithIndex(ArrayOps.scala:186)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
我不明白为什么任何迭代的运行速度都比其他迭代慢 . 我已经验证了每个RDD中的元组数量从一次迭代到下一次迭代没有变化,所以这不是问题 . 我在每次迭代时都重新缓存了我的RDD,所以我不必从头开始重新计算它们,但它似乎仍然是正在发生的事情 . 有人知道我在这里缺少什么吗?