SparkRDD论文总结
本篇文章是对SparkRDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。
RDD
Motivation
传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象——RDD。
设计思想及特点
ResilientDistributedDataset(RDD)是ApacheSpark中数据的核心抽象,它是一种只读的、分区的数据记录集合。
RDD的特点:
Lazyevaluation,只在需要的时候才进行计算
RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算
Resilient:借助RDDlineagegraph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance
那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:
valrdd=sc.parallelize(1to)
valresult=rdd.map(_+10)
.filter(_15)
.map(x=(x,1))
.reduceByKey(_+_)
.collect
并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。
RDD的表示
Spark中的RDD主要包含五部分信息:
partitions():partition集合
dependencies():当前RDD的dependency集合
iterator(split,context):对每个partition进行计算或读取操作的函数
partitioner():分区方式,如HashPartitioner和RangePartitioner
preferredLocations(split):访问某个partition最快的节点
所有的RDD都继承抽象类RDD。几种常见的操作:
sc#textFile:生成HadoopRDD,代表可以从HDFS中读取数据的RDD
sc#parallelize:生成ParallelCollectionRDD,代表从Scala集合中生成的RDD
map,flatMap,filter:生成MapPartitionsRDD,其partition与parentRDD一致,同时会对parentRDD中iterator函数返回的数据进行对应的操作(lazy)
union:生成UnionRDD或PartitionerAwareUnionRDD
reduceByKey,groupByKey:生成ShuffledRDD,需要进行shuffle操作
cogroup,join:生成CoGroupedRDD
Operations
Spark里面对RDD的操作分为两种:transformation和action。
transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作
action会依次执行计算操作并且得到结果
这些transformation和action在FP中应该是很常见的,如map,flatMap,filter,reduce,count,sum。
对单个数据操作的transformation函数都在RDD抽象类内,而对tuple操作的transformation都在PairRDDFunctions包装类中。RDD可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions类,从而可以进行reduceByKey之类的操作。对应的implicit函数:
implicitdefrddToPairRDDFunctions[K,V](rdd:RDD[(K,V)])
(implicitkt:ClassTag[K],vt:ClassTag[V],ord:Ordering[K]=null):PairRDDFunctions[K,V]={
newPairRDDFunctions(rdd)
}
Dependency
上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineagegraph(依赖图)。Dependency大致分两种:narrowdependency和widedependency。
Narrowdependency(NarrowDependency):ParentRDD中的每个partition最多被childRDD中的一个partition使用,即一对一的关系。比如map,flatMap,filter等transformation都是narrowdependency
Widedependency(ShuffleDependency):ParentRDD中的每个partition会被childRDD中的多个partition使用,即一对多的关系。比如join生成的RDD一般是widedependency(不同的partitioner)
论文中的图例很直观地表示了RDD间的依赖关系:
这样划分dependency的原因:
Narrowdependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而widedependency必须要等所有的parentRDD的结果都准备好以后再执行计算
Narrowdependency失败以后,Spark只需要重新计算失败的parentRDD即可;而对于widedependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算
Shuffle
Spark中的shuffle操作与MapReduce中类似,在计算widedependency对应的RDD的时候(即ShuffleMapStage)会触发。
首先来回顾一下为什么要进行shuffle操作。以reduceByKey操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组maptask来将每个分区写入到临时文件中,然后下一个stage端(reducetask)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在mapside也可能在reduceside进行)。
Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。
Shufflewrite(maptask):SortShuffleWriter#write
Shuffleread(reducetask):ShuffleRDD#北京白癜风治疗医院哪家好有哪些白癜风医院
转载请注明:http://www.shijichaoguyj.com/wxbzhu/309.html