Skip to content

Commit 39c7c6d

Browse files
author
James Lee
committed
count by key
1 parent 7d4da97 commit 39c7c6d

1 file changed

Lines changed: 51 additions & 0 deletions

File tree

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.sparkTutorial.pairRdd.sort.sortbykey;
2+
3+
4+
import com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice.AvgCount;
5+
import org.apache.log4j.Level;
6+
import org.apache.log4j.Logger;
7+
import org.apache.spark.SparkConf;
8+
import org.apache.spark.api.java.JavaPairRDD;
9+
import org.apache.spark.api.java.JavaRDD;
10+
import org.apache.spark.api.java.JavaSparkContext;
11+
import org.apache.spark.api.java.function.Function2;
12+
import org.apache.spark.api.java.function.PairFunction;
13+
import scala.Tuple2;
14+
15+
import java.util.Map;
16+
17+
public class AverageHousePriceSolution {
18+
19+
public static void main(String[] args) throws Exception {
20+
21+
Logger.getLogger("org").setLevel(Level.ERROR);
22+
SparkConf conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]");
23+
JavaSparkContext sc = new JavaSparkContext(conf);
24+
25+
JavaRDD<String> lines = sc.textFile("in/RealEstate.csv");
26+
27+
JavaRDD<String> cleanedLines = lines.filter(line -> !line.contains("Bedrooms"));
28+
29+
JavaPairRDD<String, AvgCount> housePricePairRdd = cleanedLines.mapToPair(
30+
(PairFunction<String, String, AvgCount>) line ->
31+
new Tuple2<>(line.split(",")[3],
32+
new AvgCount(1, Double.parseDouble(line.split(",")[2]))));
33+
34+
JavaPairRDD<String, AvgCount> housePriceTotal = housePricePairRdd.reduceByKey(
35+
(Function2<AvgCount, AvgCount, AvgCount>) (x, y) ->
36+
new AvgCount(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
37+
38+
39+
JavaPairRDD<Integer, Double> housePriceAvg = housePriceTotal.mapToPair(
40+
(PairFunction<Tuple2<String, AvgCount>, Integer, Double>) total ->
41+
new Tuple2<>(Integer.valueOf(total._1()), total._2().getTotal()/total._2().getCount()));
42+
43+
44+
JavaPairRDD<Integer, Double> sortedHousePriceAvg = housePriceAvg.sortByKey();
45+
46+
for (Tuple2<Integer, Double> price : sortedHousePriceAvg.collect()) {
47+
System.out.println(price._1() + " : " + price._2());
48+
}
49+
}
50+
51+
}

0 commit comments

Comments
 (0)