Spark Ml lib learning
RDD(Resilient Distribute Databases弹性分布式数据集):存储在不同节点计算机中的数据集。弹性指数据的存储方式和容错性强,可以是基于内存或者是磁盘,提供不同的持久化和运行方式,如果一个节点发生错误,RDD在不同节点重试。
Transformation 操作:map,filter ,groupBy ,join
Action 操作:count, reduce,collect
RDD所有操作都是Lazy 加载模式,是一种程序优化的特殊形式,运行编译的过程中不会立即到计算机中计算最终结果,而是记住所有操作和步骤,只有使用Action方法才会进行计算
word/media/image5.gifword/media/image6.gifword/media/image7.gif
word/media/image4.gif
RDD API讲解
1.aggregate方法对给定的数据集进行方法的设定
def aggregate[U:ClassTag](zeroValue:U)(seqop:(U,T)=>U,combOp:(U,U)=>U):U
U为数据类型,可以传入任意数据类型数据,seqop给定的计算方法要和U类型匹配,combOp是合并方法将zeroValue值和seqop值进行合并
代码示例
import org.apache.spark.{SparkContext,SparkConf}
object testRDDMethod{
def main(args:[String]){
val conf = new SparkConf() //创建上下文环境
.setMaster("local") //设置本地化处理
.setAppName("testRDDMethod") //设置应用名称
val sc = new SparkContext(conf) //创建上下文环境实例
val arr = sc.parallelize((1,2,3,4,5,6)) //输入数组数据集
val result = arr.aggregate(0)(math.max(_,_),_+_) //使用aggregate方法
println(result)
}
}
输出结果:6
parallelize方法将内存中数据读入到spark中,作为整体数据集,math.max比较数据集中数据的大小,_+_是对传递的两个值进行求和,Aggregate对第一个方法结果和空值计算。
2.parallelize是SparkContext中的方法
def parallelize(T:ClassTag)(seq:Seq[T],numSlices:Int=defaultParallelism):RDD[T]
第一个测试是数据,第二个表示将数据分布在多少个数据节点中存放,默认为1
import org.apache.spark.{SparkContext,SparkConf}
object testRDDMethod2{
def main(args:[String]){
val conf = new SparkConf() //设置上下文环境
.setMaster("local") //设置本地化处理
.setAppName("testRDDMethod2") //设置应用名称
val sc = new SparkContext(conf) //实例化上下文环境
val arr = sc.parallelized((1,2,3,4,5,6),2) //(1,2,3)+(4,5,6)
val result = arr.aggregate(0)(math.max(_,_),_+_)
pritnln(result)
}
}
Result:3+6=9
Aggregate用于字符串
import org.apache.spark.{SparkContext,SparkConf}
object testRDDMethod2{
def main(args:[String]){
val conf = new SparkConf() //设置上下文环境
.setMaster("local") //设置本地化处理
.setAppName("testRDDMethod2") //设置应用名称
val sc = new SparkContext(conf) //实例化上下文环境
//输入数组数据集,第二个参数表示数据值分布在多少个数据节点中存放,默认为1
val arr =sc.parallelize(("abc","b","c","de","fg")) //创建数据集
//调用aggregate方法
val result = arr.aggregate("")((value,vord)=>value+word,_+_)
pritnln(result)
}
}
3.cache()操作将数据保存到内存中
import org.apache.spark{SparkConf,SparkContext}
object CacheTest{
def main(args;[String]){
val conf = new SparkConf() //设置上下文环境
.setMaster("local") //本地化
.setAppName("cacheTest") //设置应用名称
val sc = new SparkContext(conf) //实例化上下文
val arr =sc.parallelize(("abc","d","def","gag")) //设置数据集
println(arr) //打印
println("---------------------------") //分割线
println(arr.cache()) //加载到内存中
//arr.foreach(println) 迭代形式打印
}
4.cartesian()笛卡尔乘积操作,要求数据集长度必须相同,返回一个新的数据集
实例
import org.apache.spark.{SparkConf,SparkContext}
object Cartesion{
def main(args:[String]){
val conf = new SparkConf() //创建上下文环境
.setMaster("local") //设置本地环境
.setAppName("cartesion") //设置应用名称
val sc = new SparkContext(conf) //创建上下文实例
val arr1 = sc.parallelize((1,2,3,4))
val arr2 = sc.parallelize((5,6,7,8))
val result = arr1.cartesion(arr2) //进行笛卡尔积计算
result.foreach(println)
}
}
5.Coalesce将已经存储的数据重新分片后再进行存储
def coalesce (numPartitions : Int ,shuffle : Boolean = false) (implicit ord : Ordering [T]=null) : RDD [T]
第一个参数分片数,第二个将数据分为更小的片时使用
import org.apache.spark.{SparkConf,SparkContext}
object Cartesion{
def main(args:[String]){
val conf = new SparkConf() //创建上下文环境
.setMaster("local") //设置本地环境
.setAppName("cartesion") //设置应用名称
val sc = new SparkContext(conf) //创建上下文实例
val arr1 = sc.parallelize((1,2,3,4,5,6))
val arr2 = arr1.coalesce(2,true)
val result = aggregate(0)(math.max(_,_),_+_)
println(result)
val result2= arr2.aggregate(0)(math.max(_,_),_+_)
println(result2)
}
}
K-Means算法
本文来源:https://www.2haoxitong.net/k/doc/d2cb222dc1c708a1294a44a2.html
文档为doc格式