Skip to content

Commit 7d4da97

Browse files
author
James Lee
committed
add combinebykey solution
1 parent 4487e0d commit 7d4da97

3 files changed

Lines changed: 64 additions & 9 deletions

File tree

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.sparkTutorial.pairRdd.aggregation.combinebykey;
2+
3+
import com.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice.AvgCount;
4+
import org.apache.log4j.Level;
5+
import org.apache.log4j.Logger;
6+
import org.apache.spark.SparkConf;
7+
import org.apache.spark.api.java.JavaPairRDD;
8+
import org.apache.spark.api.java.JavaRDD;
9+
import org.apache.spark.api.java.JavaSparkContext;
10+
import org.apache.spark.api.java.function.Function;
11+
import org.apache.spark.api.java.function.Function2;
12+
import org.apache.spark.api.java.function.PairFunction;
13+
import scala.Tuple2;
14+
15+
import java.util.Map;
16+
17+
public class AverageHousePriceSolution {
18+
19+
public static void main(String[] args) throws Exception {
20+
21+
Logger.getLogger("org").setLevel(Level.ERROR);
22+
SparkConf conf = new SparkConf().setAppName("wordCounts").setMaster("local[3]");
23+
JavaSparkContext sc = new JavaSparkContext(conf);
24+
25+
JavaRDD<String> lines = sc.textFile("in/RealEstate.csv");
26+
27+
JavaRDD<String> cleanedLines = lines.filter(line -> !line.contains("Bedrooms"));
28+
29+
JavaPairRDD<String, Double> housePricePairRdd = cleanedLines.mapToPair(
30+
(PairFunction<String, String, Double>) line ->
31+
new Tuple2<>(line.split(",")[3],
32+
Double.parseDouble(line.split(",")[2])));
33+
34+
JavaPairRDD<String, AvgCount> housePriceTotal= housePricePairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners);
35+
36+
JavaPairRDD<String, Double> housePriceAvg = housePriceTotal.mapToPair(
37+
(PairFunction<Tuple2<String, AvgCount>, String, Double>) total ->
38+
new Tuple2<>(total._1(), total._2().getTotal()/total._2().getCount()));
39+
40+
for (Map.Entry<String, Double> housePriceAvgPair : housePriceAvg.collectAsMap().entrySet()) {
41+
System.out.println(housePriceAvgPair.getKey() + " : " + housePriceAvgPair.getValue());
42+
43+
}
44+
}
45+
46+
static Function<Double, AvgCount> createCombiner = (Function<Double, AvgCount>) x -> new AvgCount(1, x);
47+
48+
static Function2<AvgCount, Double, AvgCount> mergeValue
49+
= (Function2<AvgCount, Double, AvgCount>) (avgCount, x) -> new AvgCount(avgCount.getCount() + 1,
50+
avgCount.getTotal() + x);
51+
static Function2<AvgCount, AvgCount, AvgCount> mergeCombiners
52+
= (Function2<AvgCount, AvgCount, AvgCount>) (avgCountA, avgCountB) -> new AvgCount(avgCountA.getCount() + avgCountB.getCount(),
53+
avgCountA.getTotal() + avgCountB.getTotal());
54+
55+
}

src/main/java/com/sparkTutorial/pairRdd/aggregation/reducebykey/housePrice/AverageHousePriceSolution.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ public static void main(String[] args) throws Exception {
2525

2626
JavaRDD<String> cleanedLines = lines.filter(line -> !line.contains("Bedrooms"));
2727

28-
JavaPairRDD<String, CountAndTotal> housePricePairRdd = cleanedLines.mapToPair(
29-
(PairFunction<String, String, CountAndTotal>) line ->
28+
JavaPairRDD<String, AvgCount> housePricePairRdd = cleanedLines.mapToPair(
29+
(PairFunction<String, String, AvgCount>) line ->
3030
new Tuple2<>(line.split(",")[3],
31-
new CountAndTotal(1, Double.parseDouble(line.split(",")[2]))));
31+
new AvgCount(1, Double.parseDouble(line.split(",")[2]))));
3232

33-
JavaPairRDD<String, CountAndTotal> housePriceTotal = housePricePairRdd.reduceByKey(
34-
(Function2<CountAndTotal, CountAndTotal, CountAndTotal>) (x, y) ->
35-
new CountAndTotal(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
33+
JavaPairRDD<String, AvgCount> housePriceTotal = housePricePairRdd.reduceByKey(
34+
(Function2<AvgCount, AvgCount, AvgCount>) (x, y) ->
35+
new AvgCount(x.getCount() + y.getCount(), x.getTotal() + y.getTotal()));
3636

3737

3838
JavaPairRDD<String, Double> housePriceAvg = housePriceTotal.mapToPair(
39-
(PairFunction<Tuple2<String, CountAndTotal>, String, Double>) total ->
39+
(PairFunction<Tuple2<String, AvgCount>, String, Double>) total ->
4040
new Tuple2<>(total._1(), total._2().getTotal()/total._2().getCount()));
4141

4242
for (Map.Entry<String, Double> housePriceAvgPair : housePriceAvg.collectAsMap().entrySet()) {

src/main/java/com/sparkTutorial/pairRdd/aggregation/reducebykey/housePrice/CountAndTotal.java renamed to src/main/java/com/sparkTutorial/pairRdd/aggregation/reducebykey/housePrice/AvgCount.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
import java.io.Serializable;
44

5-
class CountAndTotal implements Serializable {
5+
public class AvgCount implements Serializable {
66
private int count;
77
private double total;
88

9-
CountAndTotal(int count, double total) {
9+
public AvgCount(int count, double total) {
1010
this.count = count;
1111
this.total = total;
1212
}

0 commit comments

Comments
 (0)