spark-MLlib

发布时间:2016-10-10 15:00:49   来源:文档文库   
字号:

Spark Ml lib learning

RDD(Resilient Distribute Databases弹性分布式数据集):存储在不同节点计算机中的数据集。弹性指数据的存储方式和容错性强,可以是基于内存或者是磁盘,提供不同的持久化和运行方式,如果一个节点发生错误,RDD在不同节点重试。

Transformation 操作:map,filter ,groupBy ,join

Action 操作:count, reduce,collect

RDD所有操作都是Lazy 加载模式,是一种程序优化的特殊形式,运行编译的过程中不会立即到计算机中计算最终结果,而是记住所有操作和步骤,只有使用Action方法才会进行计算

word/media/image1.gifword/media/image2.gifword/media/image3.gifword/media/image4.gif

word/media/image5.gifword/media/image6.gifword/media/image7.gif

word/media/image4.gif

RDD API讲解

1.aggregate方法对给定的数据集进行方法的设定

def aggregate[U:ClassTag](zeroValue:U)(seqop:(U,T)=>U,combOp:(U,U)=>U):U

U为数据类型,可以传入任意数据类型数据,seqop给定的计算方法要和U类型匹配,combOp是合并方法将zeroValue值和seqop值进行合并

代码示例

import org.apache.spark.{SparkContext,SparkConf}

object testRDDMethod{

def main(args:[String]){

val conf = new SparkConf() //创建上下文环境

.setMaster("local") //设置本地化处理

.setAppName("testRDDMethod") //设置应用名称

val sc = new SparkContext(conf) //创建上下文环境实例

val arr = sc.parallelize((1,2,3,4,5,6)) //输入数组数据集

val result = arr.aggregate(0)(math.max(_,_),_+_) //使用aggregate方法

println(result)

}

}

输出结果:6

parallelize方法将内存中数据读入到spark中,作为整体数据集,math.max比较数据集中数据的大小,_+_是对传递的两个值进行求和,Aggregate对第一个方法结果和空值计算。

2.parallelizeSparkContext中的方法

def parallelize(T:ClassTag)(seq:Seq[T],numSlices:Int=defaultParallelism):RDD[T]

第一个测试是数据,第二个表示将数据分布在多少个数据节点中存放,默认为1

import org.apache.spark.{SparkContext,SparkConf}

object testRDDMethod2{

def main(args:[String]){

val conf = new SparkConf() //设置上下文环境

.setMaster("local") //设置本地化处理

.setAppName("testRDDMethod2") //设置应用名称

val sc = new SparkContext(conf) //实例化上下文环境

val arr = sc.parallelized((1,2,3,4,5,6),2) //(1,2,3)+(4,5,6)

val result = arr.aggregate(0)(math.max(_,_),_+_)

pritnln(result)

}

}

Result3+6=9

Aggregate用于字符串

import org.apache.spark.{SparkContext,SparkConf}

object testRDDMethod2{

def main(args:[String]){

val conf = new SparkConf() //设置上下文环境

.setMaster("local") //设置本地化处理

.setAppName("testRDDMethod2") //设置应用名称

val sc = new SparkContext(conf) //实例化上下文环境

//输入数组数据集,第二个参数表示数据值分布在多少个数据节点中存放,默认为1

val arr =sc.parallelize(("abc","b","c","de","fg")) //创建数据集

//调用aggregate方法

val result = arr.aggregate("")((value,vord)=>value+word,_+_)

pritnln(result)

}

}

3.cache()操作将数据保存到内存中

import org.apache.spark{SparkConf,SparkContext}

object CacheTest{

def main(args[String]){

val conf = new SparkConf() //设置上下文环境

.setMaster("local") //本地化

.setAppName("cacheTest") //设置应用名称

val sc = new SparkContext(conf) //实例化上下文

val arr =sc.parallelize(("abc","d","def","gag")) //设置数据集

println(arr) //打印

println("---------------------------") //分割线

println(arr.cache()) //加载到内存中

//arr.foreach(println) 迭代形式打印

}

4.cartesian()笛卡尔乘积操作,要求数据集长度必须相同,返回一个新的数据集

实例

import org.apache.spark.{SparkConf,SparkContext}

object Cartesion{

def main(args:[String]){

val conf = new SparkConf() //建上下文环境

.setMaster("local") //设置本地环境

.setAppName("cartesion") //设置应用名称

val sc = new SparkContext(conf) //创建上下文实例

val arr1 = sc.parallelize((1,2,3,4))

val arr2 = sc.parallelize((5,6,7,8))

val result = arr1.cartesion(arr2) //进行笛卡尔积计算

result.foreach(println)

}

}

5.Coalesce将已经存储的数据重新分片后再进行存储

def coalesce (numPartitions : Int ,shuffle : Boolean = false) (implicit ord : Ordering [T]=null) : RDD [T]

第一个参数分片数,第二个将数据分为更小的片时使用

import org.apache.spark.{SparkConf,SparkContext}

object Cartesion{

def main(args:[String]){

val conf = new SparkConf() //创建上下文环境

.setMaster("local") //设置本地环境

.setAppName("cartesion") //设置应用名称

val sc = new SparkContext(conf) //创建上下文实例

val arr1 = sc.parallelize((1,2,3,4,5,6))

val arr2 = arr1.coalesce(2,true)

val result = aggregate(0)(math.max(_,_),_+_)

println(result)

val result2= arr2.aggregate(0)(math.max(_,_),_+_)

println(result2)

}

}

K-Means算法

本文来源:https://www.2haoxitong.net/k/doc/d2cb222dc1c708a1294a44a2.html

《spark-MLlib.doc》
将本文的Word文档下载到电脑,方便收藏和打印
推荐度:
点击下载文档

文档为doc格式