本文共 5312 字,大约阅读时间需要 17 分钟。
1.map算子
private static void map() {
//创建SparkConf
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
//创建JavasparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//构造集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,创建初始RDD
JavaRDD numberRDD = sc.parallelize(numbers);
//使用map算子,将集合中的每个元素都乘以2
JavaRDD multipleNumberRDD = numberRDD.map(new Function() {
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
//打印新的RDD
multipleNumberRDD.foreach(new VoidFunction() {
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
//关闭JavasparkContext
sc.close();
}
2.filter算子
private static void filter() {
//创建SparkConf
SparkConf conf = new SparkConf()
.setAppName("filter")
.setMaster("local");
//创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//模拟集合
List numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
//并行化集合,创建初始RDD
JavaRDD numberRDD = sc.parallelize(numbers);
//对集合使用filter算子,过滤出集合中的偶数
JavaRDD evenNumberRDD = numberRDD.filter(new Function() {
@Override
public Boolean call(Integer v1) throws Exception {
return v1%2==0;
}
});
evenNumberRDD.foreach(new VoidFunction() {
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
3.flatMap算子
Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;
而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:
操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
操作2:最后将所有对象合并为一个对象
private static void flatMap() {
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List lineList = Arrays.asList("hello you","hello me","hello world");
JavaRDD lines = sc.parallelize(lineList);
//对RDD执行flatMap算子,将每一行文本,拆分为多个单词
JavaRDD words = lines.flatMap(new FlatMapFunction() {
//在这里,传入第一行,hello,you
//返回的是一个Iterable(hello,you)
@Override
public Iterable call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
words.foreach(new VoidFunction() {
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
4.groupByKey算子
private static void groupByKey() {
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List> scoreList = Arrays.asList(
new Tuple2("class1", 80),
new Tuple2("class2", 90),
new Tuple2("class1", 97),
new Tuple2("class2", 89));
JavaPairRDD scores = sc.parallelizePairs(scoreList);
//针对scoresRDD,执行groupByKey算子,对每个班级的成绩进行分组
//相当于是,一个key join上的所有value,都放到一个Iterable里面去了
JavaPairRDD> groupedScores = scores.groupByKey();
groupedScores.foreach(new VoidFunction>>() {
@Override
public void call(Tuple2> t)
throws Exception {
System.out.println("class:" + t._1);
Iterator ite = t._2.iterator();
while(ite.hasNext()) {
System.out.println(ite.next());
}
}
});
}
5.reduceByKey算子
private static void reduceByKey() {
SparkConf conf = new SparkConf()
.setAppName("reduceByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List> scoreList = Arrays.asList(
new Tuple2("class1", 80),
new Tuple2("class2", 90),
new Tuple2("class1", 97),
new Tuple2("class2", 89));
JavaPairRDD scores = sc.parallelizePairs(scoreList);
//reduceByKey算法返回的RDD,还是JavaPairRDD
JavaPairRDD totalScores = scores.reduceByKey(new Function2() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
totalScores.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 t) throws Exception {
System.out.println(t._1 + ":" + t._2);
}
});
sc.close();
}
6.sortByKey算子
private static void sortByKey() {
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List> scoreList = Arrays.asList(
new Tuple2(78, "marry"),
new Tuple2(89, "tom"),
new Tuple2(72, "jack"),
new Tuple2(86, "leo"));
JavaPairRDD scores = sc.parallelizePairs(scoreList);
JavaPairRDD sortedScores = scores.sortByKey();
sortedScores.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 t) throws Exception {
System.out.println(t._1 + ":" + t._2);
}
});
sc.close();
}
7.join算子
join算子用于关联两个RDD,join以后,会根据key进行join,并返回JavaPairRDD。JavaPairRDD的第一个泛型类型是之前两个JavaPairRDD的key类型,因为通过key进行join的。第二个泛型类型,是Tuple2的类型,Tuple2的两个泛型分别为原始RDD的value的类型
private static void join() {
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List> studentList = Arrays.asList(
new Tuple2(1, "tom"),
new Tuple2(2, "jack"),
new Tuple2(3, "marry"),
new Tuple2(4, "leo"));
List> scoreList = Arrays.asList(
new Tuple2(1, 78),
new Tuple2(2, 87),
new Tuple2(3, 89),
new Tuple2(4, 98));
//并行化两个RDD
JavaPairRDD students = sc.parallelizePairs(studentList);;
JavaPairRDD scores = sc.parallelizePairs(scoreList);
//使用join算子关联两个RDD
//join以后,会根据key进行join,并返回JavaPairRDD
//JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的
//第二个泛型类型,是Tuple2的类型,Tuple2的两个泛型分别为原始RDD的value的类型
JavaPairRDD> studentScores = students.join(scores);
//打印
studentScores.foreach(new VoidFunction>>() {
@Override
public void call(Tuple2> t)
throws Exception {
System.out.println("student id:" + t._1);
System.out.println("student name:" + t._2._1);
System.out.println("student score:" + t._2._2);
System.out.println("==========================");
}
});
sc.close();
}
转载地址:http://zoiox.baihongyu.com/