博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark java_spark基本操作 java 版
阅读量:5976 次
发布时间:2019-06-20

本文共 5312 字,大约阅读时间需要 17 分钟。

1.map算子

private static void map() {

//创建SparkConf

SparkConf conf = new SparkConf()

.setAppName("map")

.setMaster("local");

//创建JavasparkContext

JavaSparkContext sc = new JavaSparkContext(conf);

//构造集合

List numbers = Arrays.asList(1,2,3,4,5);

//并行化集合,创建初始RDD

JavaRDD numberRDD = sc.parallelize(numbers);

//使用map算子,将集合中的每个元素都乘以2

JavaRDD multipleNumberRDD = numberRDD.map(new Function() {

@Override

public Integer call(Integer v1) throws Exception {

return v1 * 2;

}

});

//打印新的RDD

multipleNumberRDD.foreach(new VoidFunction() {

@Override

public void call(Integer t) throws Exception {

System.out.println(t);

}

});

//关闭JavasparkContext

sc.close();

}

2.filter算子

private static void filter() {

//创建SparkConf

SparkConf conf = new SparkConf()

.setAppName("filter")

.setMaster("local");

//创建JavaSparkContext

JavaSparkContext sc = new JavaSparkContext(conf);

//模拟集合

List numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

//并行化集合,创建初始RDD

JavaRDD numberRDD = sc.parallelize(numbers);

//对集合使用filter算子,过滤出集合中的偶数

JavaRDD evenNumberRDD = numberRDD.filter(new Function() {

@Override

public Boolean call(Integer v1) throws Exception {

return v1%2==0;

}

});

evenNumberRDD.foreach(new VoidFunction() {

@Override

public void call(Integer t) throws Exception {

System.out.println(t);

}

});

sc.close();

}

3.flatMap算子

Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;

而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:

操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象

操作2:最后将所有对象合并为一个对象

private static void flatMap() {

SparkConf conf = new SparkConf()

.setAppName("flatMap")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List lineList = Arrays.asList("hello you","hello me","hello world");

JavaRDD lines = sc.parallelize(lineList);

//对RDD执行flatMap算子,将每一行文本,拆分为多个单词

JavaRDD words = lines.flatMap(new FlatMapFunction() {

//在这里,传入第一行,hello,you

//返回的是一个Iterable(hello,you)

@Override

public Iterable call(String t) throws Exception {

return Arrays.asList(t.split(" "));

}

});

words.foreach(new VoidFunction() {

@Override

public void call(String t) throws Exception {

System.out.println(t);

}

});

sc.close();

}

4.groupByKey算子

private static void groupByKey() {

SparkConf conf = new SparkConf()

.setAppName("groupByKey")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.asList(

new Tuple2("class1", 80),

new Tuple2("class2", 90),

new Tuple2("class1", 97),

new Tuple2("class2", 89));

JavaPairRDD scores = sc.parallelizePairs(scoreList);

//针对scoresRDD,执行groupByKey算子,对每个班级的成绩进行分组

//相当于是,一个key join上的所有value,都放到一个Iterable里面去了

JavaPairRDD> groupedScores = scores.groupByKey();

groupedScores.foreach(new VoidFunction>>() {

@Override

public void call(Tuple2> t)

throws Exception {

System.out.println("class:" + t._1);

Iterator ite = t._2.iterator();

while(ite.hasNext()) {

System.out.println(ite.next());

}

}

});

}

5.reduceByKey算子

private static void reduceByKey() {

SparkConf conf = new SparkConf()

.setAppName("reduceByKey")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.asList(

new Tuple2("class1", 80),

new Tuple2("class2", 90),

new Tuple2("class1", 97),

new Tuple2("class2", 89));

JavaPairRDD scores = sc.parallelizePairs(scoreList);

//reduceByKey算法返回的RDD,还是JavaPairRDD

JavaPairRDD totalScores = scores.reduceByKey(new Function2() {

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

totalScores.foreach(new VoidFunction>() {

@Override

public void call(Tuple2 t) throws Exception {

System.out.println(t._1 + ":" + t._2);

}

});

sc.close();

}

6.sortByKey算子

private static void sortByKey() {

SparkConf conf = new SparkConf()

.setAppName("sortByKey")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.asList(

new Tuple2(78, "marry"),

new Tuple2(89, "tom"),

new Tuple2(72, "jack"),

new Tuple2(86, "leo"));

JavaPairRDD scores = sc.parallelizePairs(scoreList);

JavaPairRDD sortedScores = scores.sortByKey();

sortedScores.foreach(new VoidFunction>() {

@Override

public void call(Tuple2 t) throws Exception {

System.out.println(t._1 + ":" + t._2);

}

});

sc.close();

}

7.join算子

join算子用于关联两个RDD,join以后,会根据key进行join,并返回JavaPairRDD。JavaPairRDD的第一个泛型类型是之前两个JavaPairRDD的key类型,因为通过key进行join的。第二个泛型类型,是Tuple2的类型,Tuple2的两个泛型分别为原始RDD的value的类型

private static void join() {

SparkConf conf = new SparkConf()

.setAppName("join")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List> studentList = Arrays.asList(

new Tuple2(1, "tom"),

new Tuple2(2, "jack"),

new Tuple2(3, "marry"),

new Tuple2(4, "leo"));

List> scoreList = Arrays.asList(

new Tuple2(1, 78),

new Tuple2(2, 87),

new Tuple2(3, 89),

new Tuple2(4, 98));

//并行化两个RDD

JavaPairRDD students = sc.parallelizePairs(studentList);;

JavaPairRDD scores = sc.parallelizePairs(scoreList);

//使用join算子关联两个RDD

//join以后,会根据key进行join,并返回JavaPairRDD

//JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的

//第二个泛型类型,是Tuple2的类型,Tuple2的两个泛型分别为原始RDD的value的类型

JavaPairRDD> studentScores = students.join(scores);

//打印

studentScores.foreach(new VoidFunction>>() {

@Override

public void call(Tuple2> t)

throws Exception {

System.out.println("student id:" + t._1);

System.out.println("student name:" + t._2._1);

System.out.println("student score:" + t._2._2);

System.out.println("==========================");

}

});

sc.close();

}

转载地址:http://zoiox.baihongyu.com/

你可能感兴趣的文章
Linux 内核已支持苹果
查看>>
shell脚本逻辑判断,文件目录属性判断,if,case用法
查看>>
【二叉树系列】二叉树课程大作业
查看>>
App重新启动
查看>>
矩阵乘法
查看>>
得到目标元素距离视口的距离以及元素自身的宽度与高度(用于浮层位置的动态改变)...
查看>>
安装和配置Tomcat
查看>>
实验三
查看>>
openssh for windows
查看>>
PostgreSQL cheatSheet
查看>>
ASP.NET Core 2 学习笔记(三)中间件
查看>>
转:Mosquitto用户认证配置
查看>>
SpringBoot上传文件到本服务器 目录与jar包同级
查看>>
python开发_difflib字符串比较
查看>>
被解放的姜戈01 初试天涯
查看>>
三极管工作区在Spectre中的表示
查看>>
HT for Web的HTML5树组件延迟加载技术实现
查看>>
ASP.NET MVC 3 Razor Nested foreach with if statements
查看>>
【Mysql】命令行
查看>>
Asterisk 安装与配置
查看>>