Spark
一、SparkCore
RDD 创建
1 2 3 4 5
| val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) listRdd.foreach(println)
val arrayRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4)) arrayRDD.foreach(println)
|
1
| val lines: RDD[String] = sc.textFile("in")
|
RDD 转换算子
Value 类型
map
1 2 3
| val listRdd: RDD[Int] = sc.makeRDD(1 to 10) val mulRdd: RDD[Int] = listRdd.map(_ * 2) mulRdd.collect().foreach(println)
|
mapPartitions
对每一个分区中的数据批处理。相当于只给每个分区的数据,只发送一次计算;而 map 的实现会给每个数据发送一次计算,增加了网络传输消耗;但是 mapPartitions 由于以整个分区为单位,可能会造成 OOM
1 2 3 4 5
| val listRdd: RDD[Int] = sc.makeRDD(1 to 10) val mapParRdd: RDD[Int] = listRdd.mapPartitions(datas => { datas.map(_ * 2) }) mapParRdd.collect().foreach(println)
|
mapPartitionsWithIndex
1 2 3 4 5 6 7
| val listRdd: RDD[Int] = sc.makeRDD(1 to 10,3) val tupleRdd: RDD[(Int, String)] = listRdd.mapPartitionsWithIndex { case (num, datas) => { datas.map((_, "partition_num: " + num)) } } tupleRdd.collect().foreach(println)
|
flatMap
扁平化,变成一个一个单独的元素
1 2 3
| val listRdd: RDD[List[Int]] = sc.makeRDD(Array(List(1, 2), List(3, 4))) val flatRdd: RDD[Int] = listRdd.flatMap(datas => datas) flatRdd.collect().foreach(println)
|
glom
将同一个分区的元素,放到一个数组里
1 2 3 4 5
| val listRdd: RDD[Int] = sc.makeRDD(1 to 16, 4) val glomRdd: RDD[Array[Int]] = listRdd.glom() glomRdd.collect().foreach(array => { println(array.mkString(",")) })
|
groupBy
同一个分区的放到一个迭代对象中。结果 tuple 中,第一个元素是 key,后面是 iterator
1 2 3 4 5 6
| val listRdd: RDD[Int] = sc.makeRDD(1 to 9) val groupRdd: RDD[(Int, Iterable[Int])] = listRdd.groupBy(i => i % 2) groupRdd.collect().foreach(println) -------------------- (0,CompactBuffer(2, 4, 6, 8)) (1,CompactBuffer(1, 3, 5, 7, 9))
|
filter
按条件筛选
1 2 3
| val listRdd: RDD[Int] = sc.makeRDD(1 to 9) val filterRdd: RDD[Int] = listRdd.filter(_ % 2 == 0) filterRdd.collect().foreach(println)
|
sample
抽样。
参数介绍:withReplacement,是否重复抽样(可重复,泊松抽样;不可重复,伯努利抽样)
fraction,打分?(可重复下,需≥0,代表大概可重复的次数;不可重复下,需[0,1],代表大概抽取比例)
1 2 3 4
| val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
val sampleRdd: RDD[Int] = listRdd.sample(true, 4, 333) sampleRdd.collect().foreach(println)
|
distinct
去重
注意:distinct 计算后,原数据分区会被打乱,是因为中间进行了 shuffle 操作。同时也因为 shuffle 导致必须等待所有分区都计算完成后才能进行下一个操作;而没有 shuffle 操作的算子,执行完一个分区的操作后就可以继续进行下一个操作。
1 2 3 4
| val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 1, 1, 3, 4, 6, 4, 3)) val disRdd: RDD[Int] = listRdd.distinct() val disRdd: RDD[Int] = listRdd.distinct(2) disRdd.collect().foreach(println)
|
coalease
缩减分区。实际为合并分区,即将其中某几个分区合并;若要扩大分区,需要添加 shuffle 参数
1 2 3 4
| val listRdd: RDD[Int] = sc.makeRDD(1 to 16, 4) println("before: ", listRdd.partitions.size) val coalRdd: RDD[Int] = listRdd.coalesce(3) println("after: ", coalRdd.partitions.size)
|
repartition
对 coalease 的封装,shuffle = true
sortBy
排序,可自己设置排序规则
1 2 3 4
| val listRdd: RDD[Int] = sc.makeRDD(List(3, 5, 1, 7, 2)) val sortRdd: RDD[Int] = listRdd.sortBy(x => x)
sortRdd.collect().foreach(println)
|
双 Value 类型
union
合并两个 Rdd
1
| val rdd3 = rdd1.union(rdd2)
|
subtract
去除相同元素,不同的会保留
1
| val rdd3 = rdd1.subtract(rdd2)
|
intersection
求交集后返回
1
| val rdd3 = rdd1.intersection(rdd2)
|
cartesian
笛卡尔积
1
| val rdd3 = rdd1.cartesian(rdd2)
|
zip
将两个 rdd 对应元素组合在一起(tuple?key-value?)。两个 rdd 分区数量和元素数量必须都相同;会把分区中的拆成一个一个的元素,组合的元素还在原来的分区里。
1 2 3 4 5
| val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 2) val rdd2: RDD[String] = sc.makeRDD(Array("a", "b", "c", "d"), 2) val zipRdd: RDD[(Int, String)] = rdd1.zip(rdd2) zipRdd.collect().foreach(println) zipRdd.saveAsTextFile("output")
|
Key-Value 类型
partitionBy
根据 key 进行重新分区(因此 rdd 需要是 kv 的形式),也可自定义分区类
1 2 3
| val arrayRdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc"), (4, "ddd")), 2) val parRdd: RDD[(Int, String)] = arrayRdd.partitionBy(new org.apache.spark.HashPartitioner(3)) parRdd.saveAsTextFile("output")
|
Rdd Action 行动算子
综合练习
二、SparkSQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。
Rdd → DataFrame → DataSet
- DataFrame:在 Rdd 的基础上,装饰了表结构,让每一个字段包含意义
- DataSet:在 DataFrame 基础上,装饰了读取操作,让数据的读取像操作对象一样简单
DataFrame
创建
1 2 3 4 5 6 7
| > val df = spark.read.json("/opt/module/spark-2.3.2-local/mydata/user.json") > df.show ==========
{"name":"123", "age":20} {"name":"456", "age":20} {"name":"789", "age":20}
|
SQL 风格语法
1 2
| > df.createTempView("user") > spark.sql("select * from user").show()
|
1 2
| > df.createGlobalTempView("user_g") > spark.newSession().sql("SELECT * FROM global_temp.user_g").show()
|
DSL 风格语法
以对象的方式来操作数据
1 2 3 4
| > df.select("name").show() > df.select($"name", $"age" + 1).show() > df.filter($"age" > 21).show() > df.groupBy("age").count().show()
|
Rdd 转为 DataFrame
1 2 3 4 5
| > case class People(name:String, age:Int) > val rdd1 = sc.makeRDD(List(("zhangsan", 20), ("lisi", 14))) > val peopleRdd = rdd1.map(t=>{People(t._1, t._2)}) > val df = peopleRdd.toDF > df.show
|
DataFrame 转为 Rdd
注意这里面转换之后,并不会还原成 People 结构,而只是一个 Row 对象。这是因为 DataFrame 本身不存数据的类型
DataSet
Dataset是具有强类型的数据集合,需要提供对应的类型信息。
解决 DataFrame 中取数只能通过下标来取的问题(啥意思??)
创建
1 2
| > case class People(name:String, age:Int) > val caseClassDS = Seq(People("Andy", 21)).toDS()
|
Rdd 转换为 DataSet
Rdd + 结构 → DataFrame;DataFram + 类型 → DataSet
Rdd + 结构 + 类型 → DataSet
1 2 3 4
| > case class Person(name: String, age: Long) > val mapRdd = rdd.map(t=>{Person(t._1, t._2)}) > val ds = mapRdd.toDS > ds.show
|
DataSet 转换为 Rdd
转换回来仍保留着类型
三、SparkStreaming