我有一个softmax文档分类器的实现,编写为spark应用程序 . 我有一套培训文件,一套培训文件的标签和一套测试文件 . 我的任务是使用在培训文档上训练的softmax分类器来预测测试文档的标签 . 我发现this是关于softmax回归和梯度体面过程的非常有用的教程 . 那是's what I'米的工作 . 训练数据是一个文件,每行文件一个文档 . 训练标签是一个文件,在文件的每一行上具有逗号分隔的标签列表,其对应于训练数据文件中相同行号上的文档 . 测试数据与训练数据相同 . 我正在使用RDD操作从头开始实现softmax回归的梯度下降 . 我知道这已经在spark机器学习库中实现了,我的实现并不是最有效的实现,但这主要是因为我对RDD和Spark的理解 .

我的代码如下:

X     = sc.textFile(DATA_PATH)
Xtest = sc.textFile(TEST_DATA_PATH)
y     = sc.textFile(LABEL_PATH)
ytest = sc.textFile(TEST_LABEL_PATH)

X.cache()
Xtest.cache() 
y.cache()
ytest.cache()

#A little prepocessing. 
#Each document might have several labels given as a comma separated list
#We're only interested in labels with the substring "CAT"
y = y.zipWithIndex()\
     .map(lambda x: (x[1],x[0]))\
     .flatMapValues(lambda x: x.split(","))\
     .filter(lambda x: not x[1].find("CAT")==-1)
y.cache()

#We want to filter out any documents from the training set that don't have one of our labels
#   y  .keys()     => did NON-DISTINCT
#  ''  .distinct() => did DISTINCT
#  ''  .map(...)   => (did,1)    #  ** This is our "filter rdd" **

filterRDD = y.keys()\
             .distinct()\
             .map(lambda x: (x,1))

# Now we'll use the filterRDD to remove superfluous docs from the training set
#  X   .zipWithIndex() => (doc,did)
#  ''  .map(...)       => (did,doc)
#  ''  .roj('')        => (did,(doc,1)) for did in other
#                                   OR
#                         (did,(None,1)) for did not in this
#  ''  .filter(...)    => (did,(doc,1)) i.e. gets rid of unimportant documents
#  ''  .mapValues(...) => (did,doc)
X = X.zipWithIndex()\
         .map(lambda x: swap(x))\
         .rightOuterJoin(filterRDD)\
         .filter(lambda x: not x[1][0]==None)\
         .mapValues(lambda x: x[0])


#Some preprocessing on the test documents
#We'll lower case the words, strip them of punctuation (not including apostrophes, and remove that weird non-word
Xtest = Xtest.zipWithIndex()\
                     .map(lambda x : (x[1],x[0]))\
                     .flatMapValues(lambda x : x.split())\
                     .mapValues(lambda x: x.lower().replace(""","").strip(string.punctuation.replace("'","")))\
                     .distinct()

#For multinomial logistic regression we have to create a duplicate document for documents with multiple labels
#  y   .keys()              => did values (not necessarilly unique)
#  ''  .map(...)            => (did,1)
#  ''  .reduceByKey(add)    => (did,count) the number of times each did appears in y
#  ''  .filter(...)         => (did,count) for all did which appear multiple times in y


repeats = y.keys()\
                   .map(lambda x: (x,1))\
                   .reduceByKey(add)\
                   .filter(lambda x: x[1]>1)\
                   .mapValues(lambda x: x-1)
# Now that we know which documents have multiple labels, and how many labels each such document has,
#we can duplicate those documents and add them back into the fold
#  X   .join(repeats)       => (did,(doc,count)) for each did which appears multiple times in y
#  ''  .flatMapValues(...)  => creates count-1 many copies of each (did,doc) pair in the above rdd
XX = X.join(repeats)\
      .flatMapValues(lambda x: [x[0]]*(x[1]))\

#  X   .union(XX)           => (did,doc) where did is NO LONGER UNIQUE
#  ''  .sort(...)           => (did,doc) in sorted order)
#  ''  .zipWithIndex()      => ((did,doc),i)
#  ''  .map(...)            => (i,doc)  Note that i is now did'
#  ''  .flatMapValues(...)  => (did',wid)
#  ''  .mapValues(...)      => (did',wid') a little prepocessing
X  = X.union(XX)\
      .sortByKey()\
      .zipWithIndex()\
      .map(lambda x: (x[1],x[0][1]))\
      .flatMapValues(lambda x : x.split())\
      .mapValues(lambda x: x.lower().replace(""","").strip(string.punctuation.replace("'","")))\
      .filter(lambda x: len(x[1])>1)

# We have to relabel the labels now, to correspond with the data
y = y.sortByKey()\
        .zipWithIndex()\
        .map(lambda x: (x[1],x[0][1]))

CONVERGED = False
m = X.keys().count()
alpha = .1
epsilon = .001

#Get the word counts for each document
#   X  .map(...) => ((did,wid),1)
#  ''  .rbk(add) => ((did,wid),count)
#  ''  .map(...) => (did,wid,count)
counts = X.map(lambda x: (x,1))\
            .reduceByKey(add)\
            .map(lambda x: (x[0][0],x[0][1],x[1]))

#The size of the vocabulary
B = counts.map(lambda x: x[1])\
            .distinct()\
            .count()

#add the bias word on...
#  X   .keys()     => (wid) NOT UNIQUE
#  ''  .distinct() => (wid) UNIQUE
#  ''  .zwi()      => (wid,i)
#  ''  .map(...)   =>  (i,wid)
keys = X.keys()\
        .distinct()\
        .zipWithIndex()\
        .map(lambda x: swap(x))


#create a biase word for each document
#  sc  .par      => (B,...,B) (m many B words)
#  ''  .zwi      => [(B,1),(B,2),...,(B,m)]
#  ''  .map(...) => [(1,B),(2,B),...,(m,B)]
biasWords = sc.parallelize(['B']*m)\
                .zipWithIndex()\
                .map(lambda x: swap(x))

#create a one count for each bias word
#  sc  .par      => [1,...,1] (m many ones)
#  ''  .zwi      => [(1,1),...,(1,m)]
#  ''  .map(...) => [(1,1),...,(m,1)]
ones = sc.parallelize(np.ones(m))\
            .zipWithIndex()\
            .map(lambda x: swap(x))
#Join the Bias words to the bias counts and to the documents, using the indices to join
#  keys  .join(...) => (i,(did,B))
#  ''    .join(...) => (i,((did,B),1))
#  ''    .values()  => ((did,B),1)
#  ''    .map(...)  => (did,B,1)
bias = keys.join(biasWords)\
                .join(ones)\
                .values()\
                .map(lambda x: (x[0][0],x[0][1],x[1]))

#add the bias words into the counts data
counts  = counts.union(bias)

##### ***** REGULARIZE THE COUNTS ****####
#First get the mean value for each word
#  counts  .map(...)       => (wid,count)
#  ''      .rbk(...)       => (wid,sum_wid(count)))
#  ''      .mapValues(...) => (wid, mu_wid)
mean = counts.map(lambda x: (x[1],x[2]))\
             .reduceByKey(add)\
             .mapValues(lambda x: x/B)

#Now get the standard deviation for each word
#  counts  .map(...)    => (wid,count)
#  ''      .join(mean)  => (wid,(count,mu_wid))
#  ''      .mapValues() => (wid, (count-mu_wid)^2)
#  ''      .rbk()       => (wid, var_wid)
#  ''      .mapValues() => (wid, sd_wid)
sd   = counts.map(lambda x: (x[1],x[2]))\
             .join(mean)\
             .mapValues(lambda x: (x[0]-x[1])*(x[0]-x[1]))\
             .reduceByKey(add)\
             .mapValues(lambda x: np.sqrt(x))

#Join together for easier manipulation
#  mean  .join(sd) => (wid,(mu_wid,sd_wid))
regs = mean.join(sd)

#Now regularize the counts as count = (count-mu)/sd
#  counts  .map(...)   => (wid,(did,count))
#  ''      .join(regs) => (wid,((did,count),(mu_wid,sd_wid)))
#  ''      .map(...)   =>
counts = counts.map(lambda x: (x[1],(x[0],x[2])))\
                .join(regs)\
                .map(lambda x: (x[1][0][0],x[0],(x[1][0][1]-x[1][1][0])/x[1][1][1]))
counts.cache()

#we need the initial population for theta = (lab,wid,w_wid)
labs  = y.values().distinct()
words = X.values().distinct().union(sc.parallelize(['B']))
zeros = sc.parallelize([0.0])

#Building theta
#  labs  .cart(...) => (lab,wid) for each lab/wid pair
#  ''    .cart(...) => ((lab,wid),0) for each lab/wid pair
#  ''    .map(...)  => (lab,wid,w_wid) where w_wid is initialized to zero
theta = labs.cartesian(words)\
            .cartesian(zeros)\
            .map(lambda x: (x[0][0],x[0][1],x[1]))

#gradient decent
i=0
J = 0
while not CONVERGED :
    print("i: ",i)
    #Get the dot product of every document with every row in theta
    #  theta  .map(...) => (wid,(lab,w_wid))
    #  ''     .join(...) => (wid,((lab,w_wid),(did,count)))
    #  ''     .map(...)  => ((did,lab),w_wid*count)
    #  ''     .rbk(...)  => ((did,lab),x^(did)*DOT*theta^(lab))

    dots = theta.map(lambda x: (x[1],(x[0],x[2])))\
                .join(counts.map(lambda x: (x[1],(x[0],x[2]))))\
                .map(lambda x: ((x[1][1][0],x[1][0][0]),(x[1][0][1]*x[1][1][1])))\
                .reduceByKey(add)
    dots.cache()

    #Now find the denominator for the predictions
    #  dots  .map(...) => (did,x^(did)*DOT*theta^lab) for each row in theta
    #  ''    .mapValues(...) => (did,exp(x^(did)*DOT*theta^lab))
    #  ''    .rbk(add)       => (did,Sum_lab(x^(did)*DOT*theta^lab))
    denom = dots.map(lambda x: (x[0][0],x[1]))\
                .mapValues(lambda x: np.exp(x))\
                .reduceByKey(add)

    #Now make our predictions
    #  dots  .map(...)  => (did,(lab,x^(did)*DOT*theta^lab)
    #  ''    .join(...) => (did,((lab,x^(did)*DOT*theta^lab),denom))
    #  ''    .map(...)  => ((did,lab),exp(x^(did)*DOT*theta^lab)/denom)
    h = dots.map(lambda x: (x[0][0],(x[0][1],x[1])))\
            .join(denom)\
            .map(lambda x: ((x[0],x[1][0][0]),(np.exp(x[1][0][1])/x[1][1])))

    if i%10 == 0 :
        # Calculate the loss
        # we need ((did,lab),1) pairs for each did and each lab
        yc  = y.keys()\
                .distinct()\
                .cartesian(y.values().distinct())\
                .map(lambda x: (x,1))
        yc.cache()
        # now we'll get ((did,lab),1) pairs for each (did,lab) in y
        ycc = y.map(lambda x: (x,1))
        ycc.cache()
        # Now we'll use the two to calculate the loss
        #  yc  .loj(ycc) =>  ((did,lab),(1,1)) when (did,lab) in ycc
        #                                     OR
        #                    ((did,lab),(1,None)) when (did,lab) not in ycc
        #  ''  .mapValues(...) => ((did,lab),1{y^did=lab})
        #  ''  .join(h...)     => ((did,lab),(1{y^did=lab},log(h(did,lab);theta)))
        #  ''  .mapValues(...) => ((did,lab),(-1/m * 1{y^did=lab}*log(h(did,lab);theta)))
        #  ''  .values()       => -1/m*1{y^did=lab}*log(h(did,lab);theta) FOR EACH did,lab combo
        #  ''  .reduce(add)    => -1/m*sum_did[sum_lab[1{y^did=lab}*log(h(did,lab);theta)]]
        J =  yc.leftOuterJoin(ycc)\
                .mapValues(lambda x: 1 if x[1]==1 else 0)\
                .join(h.mapValues(lambda x: np.log(x)))\
                .mapValues(lambda x: x[0]*x[1]*-1/m)\
                .values()\
                .reduce(add)
        print("Cost:",J)
    i+=1
    #Now calculate the gradient
    #  y   .map(...)       => ((did,lab),1)
    #  h   .loj(yy)        => ((did,lab),(pred,1)) for (did,lab) in yy
    #                                    OR
    #                         ((did,lab),(pred,None)) for (did,lab) not in yy
    #  ''  .mapValues(...) => ((did,lab),1{y^did=lab}-pred) **NOTE: We'll refer to
    #                                                          1{y^did=lab}-pred
    #                                                         as expr from now on**
    #  ''  .map(...)       => (did,(lab,expr))
    #  ''  .join(counts)   => (did,((lab,expr),(wid,count)))
    #  ''  .map(...)       => ((lab,wid),expr*count)
    #  ''  .rbk(add)       => ((lab,wid),sum_did(expr*count))
    #  ''  .mapValues(...) => ((lab,wid),-1/m * sum_did(expr*count))
    yy = y.map(lambda x: (x,1))

    grad = h.leftOuterJoin(yy)\
            .mapValues(lambda x: 1-x[0] if x[1]==1 else 0-x[0])\
            .map(lambda x: (x[0][0],(x[0][1],x[1])))\
            .join(counts.map(lambda x: (x[0],(x[1],x[2]))))\
            .map(lambda x: ((x[1][0][0],x[1][1][0]),x[1][0][1]*x[1][1][1]))\
            .reduceByKey(add)\
            .mapValues(lambda x: -1/m * x )

    #find the new theta value
    # theta  .map(...)     => ((lab,wid),oldW)
    #  ''    .join(grad)   => ((lab,wid),(oldW,gradW))
    #  ''     .mapValues() => ((lab,wid),new_weight)
    thetaNew = theta.map(lambda x: ((x[0],x[1]),x[2]) )\
                    .join(grad)\
                    .mapValues(lambda x: x[0]-alpha*x[1])\
                    .map(lambda x: (x[0][0],x[0][1],x[1]))

    theta = thetaNew
    theta.cache()
    if i == 200 : CONVERTED = True

#Now we'll make our predictions
#Recreate the counts map out of the Xtest instead of the X data
counts = Xtest.map(lambda x: (x,1))\
              .reduceByKey(add)\
              .map(lambda x: (x[0][0],x[0][1],x[1]))

#add the bias word on...
#  X   .keys()     => (wid) NOT UNIQUE
#  ''  .distinct() => (wid) UNIQUE
#  ''  .zwi()      => (wid,i)
#  ''  .map(...)   =>  (i,wid)
keys = Xtest.keys()\
            .distinct()\
            .zipWithIndex()\
            .map(lambda x: swap(x))

#Join the Bias words to the bias counts and to the documents, using the indices to join
#  keys  .join(...) => (i,(did,B))
#  ''    .join(...) => (i,((did,B),1))
#  ''    .values()  => ((did,B),1)
#  ''    .map(...)  => (did,B,1)
bias = keys.join(biasWords)\
                .join(ones)\
                .values()\
                .map(lambda x: (x[0][0],x[0][1],x[1]))

#add the bias words into the counts data
counts  = counts.union(bias)

#Get the dot product of every document with every row in theta
#  theta  .map(...) => (wid,(lab,w_wid))
#  ''     .join(...) => (wid,((lab,w_wid),(did,count)))
#  ''     .map(...)  => ((did,lab),w_wid*count)
#  ''     .rbk(...)  => ((did,lab),x^(did)*DOT*theta^(lab))
dots = theta.map(lambda x: (x[1],(x[0],x[2])))\
            .join(counts.map(lambda x: (x[1],(x[0],x[2]))))\
            .map(lambda x: ((x[1][1][0],x[1][0][0]),(x[1][0][1]*x[1][1][1])))\
            .reduceByKey(add)

#Now find the denominator for the predictions
#  dots  .map(...) => (did,x^(did)*DOT*theta^lab) for each row in theta
#  ''    .mapValues(...) => (did,exp(x^(did)*DOT*theta^lab))
#  ''    .rbk(add)       => (did,Sum_lab(x^(did)*DOT*theta^lab))
denom = dots.map(lambda x: (x[0][0],x[1]))\
            .mapValues(lambda x: np.exp(x))\
            .reduceByKey(add)

#Now make our predictions
#  dots  .map(...)  => (did,(lab,x^(did)*DOT*theta^lab)
#  ''    .join(...) => (did,((lab,x^(did)*DOT*theta^lab),denom))
#  ''    .map(...)  => ((did,lab),exp(x^(did)*DOT*theta^lab)/denom)
h = dots.map(lambda x: (x[0][0],(x[0][1],x[1])))\
        .join(denom)\
        .map(lambda x: (x[0],(x[1][0][0],np.exp(x[1][0][1])/x[1][1])))\
        .reduceByKey(NBFun6)
print("preds:",h.collect())

上面的代码不会运行超过10次迭代 . 它逐渐变慢,在第9次迭代时慢慢爬行并在第10次迭代时耗尽内存 .

Traceback (most recent call last):
  File "p1.py", line 411, in <module>
    .join(counts.map(lambda x: (x[1],(x[0],x[2]))))\
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 1661, in join
    return python_join(self, other, numPartitions)
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/join.py", line 53, in python_join
    return _do_python_join(rdd, other, numPartitions, dispatch)
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/join.py", line 41, in _do_python_join
    return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 564, in union
    self.getNumPartitions() == rdd.getNumPartitions()):
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 385, in getNumPartitions
    return self._jrdd.partitions().size()
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/MacBot/anaconda/lib/python3.6/site-packages/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2495.partitions.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Integer.valueOf(Integer.java:832)
    at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
    at scala.collection.IndexedSeqOptimized$class.zipWithIndex(IndexedSeqOptimized.scala:103)
    at scala.collection.mutable.ArrayOps$ofRef.zipWithIndex(ArrayOps.scala:186)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
    at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

我不明白为什么任何迭代的运行速度都比其他迭代慢 . 我已经验证了每个RDD中的元组数量从一次迭代到下一次迭代没有变化,所以这不是问题 . 我在每次迭代时都重新缓存了我的RDD,所以我不必从头开始重新计算它们,但它似乎仍然是正在发生的事情 . 有人知道我在这里缺少什么吗?