spark

Featured image

spark是一个快速而通用的集群计算平台。快速的主要原因是在内存进行计算

运行模式

1.本地模式 Windows本地运行需要配置:System.setProperty(“hadoop.home.dir”, “C:\Program Files\winutils\”)

2.standalone模式

3.集群模式
spark与集群资源管理器是松耦合的关系,只需要可以申请到excutor运行所需的进程即可。所以支持多种集群管理组件对spark进行管理。
sparkContext 连接集群资源管理(yarn\k8s\mesos),申请excutors资源,将相关代码发送到excutors;

3.1 yarn管理
使用yarn作为spark集群管理很简单,只需要在使用spark_submit提交任务时,配置HADOOP_CONF_DIR或者YARN_CONF_DIR参数,这时候spark会自动作为yarn的客户端,申请资源(具体来说就是excutors),然后将所需代码传到excutor上,由driver调度执行。
yarn-cluster与yarn-clien模式的区别:

3.2 mesos

3.3 k8s

组件:spark core

spark core实现了spark的基本功能:与存储交互、任务调度、内存管理、错误恢复等;
application运行机制

通常来讲是每一个spark应用程序是由driver program执行用户定义的main函数开始的,main函数中首先要做的是创建sparkContext,主要作用是告诉spark应用程序运行的集群环境以及一些其他配置等。一个JVM中只能有一个active状态的sparkContext,用完之后记得stop。

RDD

弹性分布式数据集(resilient distributed dataset),具有只读-可并行计算-容错性等特点;可以由hdfs文件(包含hbase等各类以hdfs为基础的数据源)或者scala集合来创建RDD。RDD支持转换(transformations)和动作(actions)两类算子,前者主要是基于已有的RDD创建新的RDD,后者是将在rdd上计算的将结果传给driver program。Job就是根据action算子划分的

transformations类操作具有惰性(lazy),仅当action操作需要结果时才执行,这个特性也是spark更加高效的原因之一。默认情况下,每一次action都会触发与之相关的transformations,如果一个rdd会多次使用,建议进行持久化或者缓存。

Shuffle操作:指的是会引起数据的重分布的操作(re-distributing data);比如:repartition、coalesce、ByKey的操作以及join类别的操作,如:cogroup、join等。stage就是根据shuffle算子划分的
shuffle操作需要大量的IO、网络IO以及序列化等操作,比较耗资源。

以reduceByKey为例说明:
reduceByKey所有key相同的元素对应的值通过reduce函数进行计算,得到一个新的值。其中一个问题在于同一个key可能分布在不同的partition中,这样各个partition之间需要all to all的一些操作。需要读取所有partition的所有key-value,各个partition之间需要有数据交换,才能计算每一个key对应的最终的value。

内存不够用,由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

hash based shuffle(现在以及弃用)

Consolidated HashShuffle

数据交换之后,每个partition的数据就是确定的,partition之间是有序的,但是每个partition里面的元素是无序的。如果期待partition元素内部也是有序的,那么可以通过以下算子得到:

mapPartitions to sort each partition using, for example, .sorted
repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
sortBy to make a globally ordered RDD

shuffle调优方式

transformation类型操作

操作 含义   操作 含义
map -   groupByKey -
filter -   reduceByKey -
flatMap -   aggregateByKey 按照key聚合,返回的value类型U和聚合之前的value类型V可以不同,需要提供V merge到U的方法以及U与U merge的方法;可以认为reduceBykey是其特例
mapPartitions -   sortByKey -
mapPartitionsWithIndex -   join -
sample -   cogroup -
union -   cartesian -
intersection -   pipe -
distinct -   coalesce -
repartition -   repartitionAndSortWithinPartitions -

Action操作

操作 含义   操作 含义
reduce -   foreach -
collect -   countByKey -
count -   takeOrdered -
first -   takeSample -
take -   saveAsSequenceFile -
saveAsTextFile -   saveAsObjectFile -

RDD 持久化

当一个RDD会重复使用时,可以选择持久化来缩短任务执行时间。(但如果数据量不是太大,重新生成RDD的时间消耗小于从磁盘或内存加载的时间时,就不宜持久化)

spark自动管理持久化之后的数据,会结合least-recently-used (LRU)算法删除数据。

持久化有如下几种方式:

持久化方式 含义
MEMORY_ONLY 将JVM中的Object缓存到内存中
MEMORY_ONLY_SER 将partition序列化为数组,然后缓存到内存中
DISK_ONLY 将JVM中的Objec将JVM中的Object缓存t缓存到磁盘
MEMORY_AND_DISK 将JVM中的Object缓存到内存中,内存放不下的缓存到磁盘
MEMORY_AND_DISK_SER -
MEMORY_ONLY_2
MEMORY_AND_DISK_2
将JVM中的Object缓存到内存中,但是会缓存两份到不同的node上,防止谋一份数据失效。
OFF_HEAP 与MEMORY_ONLY_SER类似,但是数据会缓存到堆外内存

Broadcast Variables

针对只读变量的数据共享机制。以比较高效的方式将只读变量共享到所有机器上,当然使用普通变量也能达到类似的效果,但Broadcast类型的变量更加高效,主要具有以下优点。

Accumulators

可以理解为一个只能进行加法操作的变量,spark有一些内置类型,如:longAccumulator、doubleAccumulator,用户也可自定义。Task在执行过程中只能调用其add方法,但是不能获取到他的值。只有Driver Progeam可以获取到它的值。常用的场景有计数、求和等。 示例:

val accum = sc.longAccumulator(“My Accumulator”)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value

Dataset

分布式数据集,可以由JVM对象或者外部文件构建;
DataSet元素的序列化不使用java 序列化或者Kryo序列化的方式,而是使用一个可序列化的Encoder完成。
dataSet 与 RDD的区别

  1. 对于spark来说,并不知道RDD元素的内部结构,仅仅知道元素本身的类型,只有用户才了解元素的内部结构,才可以进行处理、分析;但是spark知道DataSet元素的内部结构,包括字段名、数据类型等。这为spark对数据操作进行优化奠定了基础。
    ds 与 RDD的区别

  2. DataSet序列化效率比RDD高很多,原因有二:其一是了解数据内部结构,不需要每一个元素都序列话字段名与数据类型,仅仅通过scheme就可以提供此信息;其二是引入了堆外内存,这在内存管理上可以更灵活自由,可以有效避免引起频繁的GC。

  3. 由于spark了解数据内部结构,并且引入了endcoder机制,所以在内存中以二进制的形式存储,区别于RDD以jvm对象的形式存储,所以在shuffle过程中不需要序列化与反序列化,另外也有提到在排序过程中不需要反序列化,个人觉得是整体元素不需要反序列化,但是对于排序所依据的字段可能仍然需要反序列化,但序列化的消耗肯定比整体反序列化要小很多。

  4. dataSet可与SparkSql结合使用。RDD具有只读性,一般都是基于已有RDD创建新的RDD,SparkSql引擎结合DataSet,可以在一定程度上牺牲内部不变性(但对用户来说是无感知的),来减少对象的创建,避免频繁GC。另外Sql引擎本身也有很多的优化点。

  5. dataSet支持更多的数据存取格式,如:csv、json、parquet、jdbc等。

总之,对于结构化数据,dataset比rdd在cpu、内存、任务耗时上都有很大的性能提升。但对于非结构化数据,往往还是使用RDD处理。

DataFrame

创建方式:基于RDD,Hive Table,Parquet Files,JSON Files等

窗函数

组件:spark mlib/ml

组件:spark stream

组件:spark graphX

一些优化策略

1.数据倾斜
表现:个别task执行任务时长远高于其他的task
解决方案:repartition

2.大数据join小数据
如果小数据可以被广播,则尽量不用join;
如果不能广播,可以按照key进行相同的repartition操作;

一些问题

万金油的方法:很多疑难问题可能是数据量太大导致的,所以先降低数据量,验证程序逻辑是否有问题,如果有就解决,没有那就可以判定是数据量大导致的;
如果是数据量大导致的,那么就在各个维度上分区处理,比如按时间分区,按用户地域分区等;或者是分阶段处理,将数据落地到磁盘,再开启下一个job。

  1. 解析parquet问题:
    异常:org.apache.hadoop.fs.ChecksumException: Checksum error
    问题原因:文件夹内包含一些隐藏的crc文件,这个文件是用于校验使用;如果存在则进行校验
    解决办法:删除隐藏的crc文件

  2. local模式资源限制
    线程数目:local[2]
    内存使用:.config(“spark.driver.memory”, “1g”)
    spark.storage.memory

  3. Total size of serialized results of 61362 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
    问题原因:1. worker 送回driver的数据太大引起的,spark.driver.maxResultSize控制worker送回driver的数据大小。优化代码,减少worker到driver的数据
    1. 分区太多,driver需要维护太多分区的状态等信息导致driver内存溢出,增加spark.driver.maxResultSize的大小,或者减少分区数
  4. Job 0 cancelled because SparkContext was shut down,
    一般情况是: java.lang.OutOfMemoryError: Java heap space导致的。
    如果在driver的日志里,找不到OOM,应该是excutor的OOM导致的。去Log Type: stderr这个地方的日志找
    解决办法就是增加driver或者excutor的内存

  5. shuffle.FetchFailedException: Direct buffer memory
    增加:spark.yarn.executor.memoryOverhead
    或减小:spark.executor.cores

  6. 问题:处理某数据,num-excutors设置为2,执行失败,异常原因内存不足;设置为10 则可以执行成功。
    原因推测:单个excutor处理的partition越多,带来的垃圾回收成本比较高,或者是处理的partition越多,即便处理完一个partition后,进行GC,内存也会逐渐增加,恢复不到处理之前的水平。

  7. 问题:引用类的static变量,在driver程序中修改之后的值,在excutor中不会生效
    解决方式:使用广播变量

  8. 问题表象:driver与excutor通信异常,显示java.io.IOException: java.io.EOFException; Host Details : local host is…
    Invocation returned exception on …
    Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
    最初推测是某个分区的文件异常导致的,但实际上更重要的是spark.rpc.askTimeout 这个参数,driver与excutor断开连接。
    原因:driver内存不足,频繁FULL GC,导致driver与excutor失去连接
    解决方式:优化代码(减少driver中创建对象的大小或者减小partition的数量) 或者 增加driver内存

  9. 问题:org.apache.spark.SparkException: Error communicating with MapOutputTracker(spark2.1)
    原因推测:小数据量没有问题,大数据量会出现这个问题,推测可能是集群资源和计算力不足导致的,各种参考,调节了如下参数得以解决,具体是哪个参数发挥作用,暂时不清楚; 解决方式:
    –num-executors 500
    –conf spark.dynamicAllocation.maxExecutors=500
    –conf spark.dynamicAllocation.enabled=false
    –conf spark.shuffle.consolidateFiles=true

  10. spark 读取json数据,某些行有错误;配置忽略.config(“spark.sql.files.ignoreCorruptFiles”, “true”)