Spark 性能调优

由于大部分的 Spark 计算都是在内存中完成的,集群中的任何资源(CPU,网络带宽,或者内存)都可能成 为 Spark 应用程序的瓶颈。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需 要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存(storing RDDs in serialized form)。 本文将主要涵盖两个主题:1.数据序列化(这对于优化网络性能极为重要);2.减少内存占用以及内存调优。 同时,我们也会提及其他几个比较小的主题。

数据序列化

序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于要达到便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库: * Java serialization: 默认情况,Spark使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。同时,你还可以通过扩展 java.io.Externalizable 来控制序列化性能。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。 * Kryo serialization: Spark还可以使用Kryo 库(版本2)提供更高效的序列化格式。Kryo的序列化速度和字节占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有实现了Serializable 接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。 要切换到使用 Kryo,你可以在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo。

Spark对一些常用的Scala核心类型(包括在Twitter chill 库的AllScalaRegistrar中)自动使用Kryo序列化格式。

如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册:

val conf = new SparkConf().setMaster(…).setAppName(…) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)

Kryo的文档(Kryo documentation )中有详细描述了更多的高级选项,如:自定义序列化代码等。

如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer 配置项(config)。其值至少需要大于最大对象的序列化长度。

最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。

内存调优

调整内存使用有三个注意事项:对象使用的内存量(您可能希望整个数据集适合内存),访问这些对象的成本,以及垃圾收集的开销(如果您有更高的营业额对象的条款)。

默认情况下,Java对象的访问速度很快,但是与其“字段”中的“原始”数据相比,可以轻松地占用多达2-5倍的空间。这是由于几个原因:

每个不同的Java对象都有一个“对象头”,大约有16个字节,并包含诸如指向其类的指针等信息。对于数据非常少的对象(比如一个Int字段),这可能比数据大。 Java字符串在原始字符串数据上有大约40字节的开销(因为它们将字符串数据存储在一个字符数组中,并保留额外的数据,如长度),并将每个字符存储为两个字节,这是由于字符串内部使用了UTF-16编码。因此一个10个字符的字符串可以很容易地消耗60个字节 常见的集合类(如HashMap和LinkedList)使用链接的数据结构,每个条目都有一个“包装器”对象(例如Map.Entry)。这个对象不仅有一个头,而且还有指向列表中下一个对象的指针(通常是8个字节)。 基本类型的集合通常将它们存储为“装箱”对象,如java.lang.Integer。 本节将首先概述Spark的内存管理,然后讨论用户可以在他/她的应用程序中更有效地使用内存的具体策略。具体来说,我们将介绍如何确定对象的内存使用情况,以及如何改进它,或者通过更改数据结构,或者以序列化格式存储数据。接下来我们将介绍Spark的缓存大小和Java垃圾收集器。

内存管理概述

Spark中的内存使用大部分属于两类:执行和存储。执行内存是指在混洗,连接,排序和聚合中用于计算的内存,而存储内存指的是用于跨群集缓存和传播内部数据的内存。在Spark中,执行和存储共享一个统一的区域(M)。当不使用执行内存时,存储器可以获取所有可用内存,反之亦然。如有必要,执行可以驱逐存储器,但是只有在总存储器内存使用量低于特定阈值(R)时才执行。换句话说,R描述了M内的一个分区,缓存块不会被驱逐。由于执行的复杂性,存储可能不会执行。

这种设计确保了几个理想的性能首先,不使用缓存的应用程序可以使用整个空间执行,避免不必要的磁盘溢出。其次,使用高速缓存的应用程序可以保留最小的存储空间(R),使数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户如何在内部划分内存的专业知识。

虽然有两种相关配置,但典型用户不需要调整它们,因为默认值适用于大多数工作负载:

spark.memory.fraction将M的大小表示为(JVM堆空间 - 300MB)的一部分(默认值为0.6)。其余空间(40%)保留给用户数据结构,Spark中的内部元数据,并在稀疏和异常大的记录的情况下防止OOM错误。 spark.memory.storageFraction将R的大小表示为M的一部分(默认为0.5)。 R是M中的存储空间,缓存的块不会被执行驱逐。 应该设置spark.memory.fraction的值,以便在JVM的旧时代或“终身”时代中舒适地适应这种堆空间。有关详细信息,请参阅下面对高级GC调整的讨论。

确定内存消耗

调整数据集所需的内存消耗量的最佳方法是创建RDD,将其放入缓存,然后查看Web UI中的“存储”页面。 页面会告诉你RDD占用了多少内存。

要估计特定对象的内存消耗,请使用SizeEstimator的估计方法。这对于尝试使用不同数据布局来调整内存使用情况以及确定每个执行程序堆中广播变量占用的空间量非常有用。

调整数据结构

减少内存消耗的第一种方法是避免增加开销的Java功能,例如基于指针的数据结构和包装对象。 做这件事有很多种方法:

1、设计你的数据结构来优先选择对象数组和基本类型,而不是标准的Java或Scala集合类(例如HashMap)。 fastutil库为与Java标准库兼容的基本类型提供了方便的集合类。 2、尽可能避免使用大量小对象和指针的嵌套结构。 3、考虑使用数字ID或枚举对象而不是键的字符串。 4、如果RAM少于32 GB,请设置JVM标志-XX:+ UseCompressedOops使指针为4个字节而不是8个。 您可以在spark-env.sh中添加这些选项。

序列化的 RDD 存储

如果对象仍然太大,无法进行高效存储,但是使用RDD持久性API中的序列化存储级别(例如MEMORY_ONLY_SER)来减少内存使用的一种更简单的方法是以序列化的形式存储它们。 Spark然后将每个RDD分区存储为一个大字节数组。 以序列化形式存储数据的唯一缺点是访问速度较慢,这是由于必须快速反序列化每个对象。 如果你想以序列化的形式缓存数据,我们强烈推荐使用Kryo,因为它比Java序列化(当然还有原始的Java对象)要小得多。

垃圾回收调优

当您的程序存储RDD时,JVM垃圾回收会成为问题。 (在只读RDD的程序中,通常不会出现问题,然后在其上运行很多操作。)当Java需要驱逐旧对象以腾出空间给新对象时,它需要跟踪所有的Java对象并找到未使用的。这里要记住的要点是垃圾收集的成本与Java对象的数量成正比,所以使用较少对象的数据结构(例如Ints数组而不是LinkedList)大大降低了成本。更好的方法是以序列化的形式保存对象,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要尝试使用序列化缓存。

由于任务的工作内存(运行任务所需的空间量)和缓存在节点上的RDD之间的干扰,GC也可能成为问题。我们将讨论如何控制分配给RDD缓存的空间来缓解这个问题。

测量GC的影响

GC调优的第一步是收集垃圾收集发生的频率和GC花费的时间。这可以通过在Java选项中添加-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStam来完成。 (有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾回收时都会在工作人员的日志中看到消息。请注意,这些日志将位于群集的工作节点上(在其工作目录中的stdout文件中),而不是在驱动程序上。

高级GC调整

为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息:

爪哇堆空间分为两个地区的年轻人和老人。年轻一代是为了保存短寿命的物体,而老一代则是为了寿命更长的物体。

年轻一代进一步分为三个地区[伊甸园,幸存者1,幸存者2]。

垃圾收集过程的简单描述:当Eden已满时,在Eden上运行一个小型GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区交换。如果一个对象足够旧或者Survivor2已满,则将其移至Old。最后,当Old接近满时,调用完整的GC。

在Spark中进行GC调优的目标是确保只有长寿命的RDD才被存储在旧一代中,并且Young生成的大小足以存储短期对象。这将有助于避免完整的GC收集任务执行期间创建的临时对象。一些可能有用的步骤是:

通过收集GC统计信息来检查是否有太多的垃圾回收。如果在任务完成之前多次调用完整的GC,则意味着没有足够的内存可用于执行任务。

如果有太多次要收集,但没有太多主要地理信息,那么为伊甸园分配更多的内存将会有所帮助。您可以将Eden的大小设置为高估每个任务需要多少内存。如果Eden的大小确定为E,则可以使用选项-Xmn = 4/3 * E来设置Young代的大小。 (增加4/3也是为了解释幸存者地区所使用的空间)。

在打印的GC统计信息中,如果OldGen接近满,则通过降低spark.memory.fraction来减少用于缓存的内存量;缓存更少的对象比减慢任务执行更好。或者,考虑减少年轻一代的规模。这意味着如果你按照上面的方式设置,则降低-Xmn。如果不是,请尝试更改JVM的NewRatio参数的值。许多JVM默认这个为2,这意味着老一代占2/3的堆。它应该足够大,使得这个分数超过spark.memory.fraction。

使用-XX:+ UseG1GC试用G1GC垃圾回收器。在某些垃圾收集是瓶颈的情况下,它可以提高性能。请注意,对于较大的执行程序堆大小,使用-XX:G1HeapRegionSize增加G1区大小可能很重要

例如,如果您的任务正在从HDFS中读取数据,则可以使用从HDFS读取的数据块的大小来估计该任务使用的内存量。请注意,解压缩块的大小通常是块大小的2到3倍。所以如果我们希望有3或4个任务的工作空间,HDFS块大小为128 MB,我们可以估计Eden的大小为4 * 3 * 128MB。

监视垃圾收集所花费的时间和频率如何随新设置发生变化。

我们的经验表明,GC调整的效果取决于您的应用程序和可用的内存量。在线描述的调谐选项还有很多,但在较高的层次上,管理全面GC发生的频率有助于减少开销。

GC调整标志

其它考虑事项

并行度

除非您将每个操作的并行度设置得足够高,否则群集不会被充分利用。 Spark会根据自己的大小(尽管可以通过可选参数控制SparkContext.textFile等)自动设置每个文件上运行的“map”任务的数量,而对于分布式的“reduce”操作,比如groupByKey和reduceByKey, 它使用最大的父RDD的分区数量。 您可以将并行级别作为第二个参数(请参阅spark.PairRDDFunctions文档),或者将config属性设置为spark.default.parallelism以更改默认值。 一般来说,我们建议您的群集中每个CPU核心有2-3个任务。

Reduce 任务的内存使用

有时,你会得到一个OutOfMemoryError,不是因为你的RDD不适合内存,而是因为你的一个任务的工作集,比如groupByKey中的一个reduce任务,太大了。 Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等)在每个任务中构建一个哈希表来执行分组,这通常会很大。 这里最简单的解决方法是增加并行度,使每个任务的输入集合更小。 Spark能够有效地支持短至200毫秒的任务,因为它可以在一个任务中重复使用一个执行器JVM,并且任务启动成本较低,因此可以安全地将并行级别提高到超过集群内核的数量。

广播超大变量

使用SparkContext中可用的广播功能可以大大减少每个序列化任务的大小,以及通过集群启动作业的成本。 如果您的任务使用其中的驱动程序的任何大对象(例如静态查找表),请考虑将其转换为广播变量。 Spark打印每个任务的序列化大小,所以你可以看看,以确定你的任务是否太大; 一般来说大于20KB的任务可能是值得优化的。

数据本地化

数据局部性可能会对Spark作业的性能产生重大影响。 如果数据和在其上运行的代码在一起,那么计算就会很快。 但是,如果代码和数据是分开的,就必须转移到另一个。 通常情况下,由于代码大小比数据小得多,所以将数据从一个地方传输到另一个地方比传输数据更快。 Spark围绕这个数据局部性的一般原则构建调度。

数据局部性是数据与代码的接近程度。 根据数据的当前位置,有几个级别的地点。 从最近到最远的顺序:

PROCESS_LOCAL 数据与运行代码位于同一个JVM中。 这是最好的地方可能 NODE_LOCAL 数据在同一个节点上。 例子可能在同一个节点上的HDFS中,或者在同一个节点上的另一个执行器上。 这比PROCESS_LOCAL稍慢,因为数据必须在进程之间传输 NO_PREF 数据可以从任何地方以相同的速度访问,并且没有本地偏好 RACK_LOCAL 数据位于同一台服务器上。 数据位于同一机架上的不同服务器上,因此需要通过网络进行发送,通常通过一台交换机进行发送 ANY 数据都在网络上的其他地方,而不在同一个机架上

Spark更喜欢在最好的地点级别安排所有任务,但这并不总是可能的。 在任何空闲的执行器上没有未处理的数据的情况下,Spark会切换到较低的地点级别。 有两种选择:a)等待一个繁忙的CPU释放,以便在同一台服务器上的数据上启动一个任务;或者b)立即在较远的地方开始一个需要移动数据的新任务。

Spark通常所做的就是等待繁忙的CPU释放的希望。 一旦超时,它就开始将数据从远处移动到空闲的CPU。 每个级别之间回退的等待超时可以单独配置,也可以全部配置在一个参数中; 有关详细信息,请参阅配置页面上的spark.locality参数。 如果你的任务很长,看到地方不好,你应该增加这些设置,但是默认情况下通常效果不错。

小结

这是一个简短的指南,指出调整Spark应用程序时应该了解的主要问题 - 最重要的是数据序列化和内存调整。 对于大多数程序来说,切换到Kryo序列化和以序列化形式保存数据将解决最常见的性能问题。 请随时在Spark邮件列表上询问其他调整最佳实践。