1
+ package tn .insat .tp21 ;
2
+ import org .apache .spark .SparkConf ;
3
+ import org .apache .spark .api .java .JavaDoubleRDD ;
4
+ import org .apache .spark .api .java .JavaPairRDD ;
5
+ import org .apache .spark .api .java .JavaRDD ;
6
+ import org .apache .spark .api .java .JavaSparkContext ;
7
+
8
+ import org .apache .spark .api .java .function .Function ;
9
+ import org .apache .spark .util .StatCounter ;
10
+ import org .slf4j .Logger ;
11
+ import org .slf4j .LoggerFactory ;
12
+ import scala .Option ;
13
+ import scala .Tuple2 ;
14
+
15
+
16
+ import java .io .File ;
17
+ import java .util .Arrays ;
18
+ import java .util .Locale ;
19
+
20
+ import static jersey .repackaged .com .google .common .base .Preconditions .checkArgument ;
21
+ public class WorldPopulations {
22
+ private static final Logger LOGGER = LoggerFactory .getLogger (WorldPopulations .class );
23
+
24
+ public static void main (String [] args ) {
25
+ checkArgument (args .length > 1 , "Please provide the path of input file and output dir as parameters." );
26
+
27
+ //delete output directory if it exists
28
+ deleteDirectory (new File (args [1 ]));
29
+
30
+ new WorldPopulations ().run (args [0 ], args [1 ],args [2 ],args [3 ]);
31
+ }
32
+
33
+ public void run (String inputFilePath , String outputDir ,String joinFilePath ,String masterAdress ) {
34
+ String master = masterAdress ;
35
+ SparkConf conf = new SparkConf ()
36
+ .setAppName (WorldPopulations .class .getName ())
37
+ .setMaster (master );
38
+ JavaSparkContext sc = new JavaSparkContext (conf );
39
+
40
+ JavaRDD <String > textFile = sc .textFile (inputFilePath );
41
+
42
+ //Ex:01----------------------------------------
43
+ System .out .println (textFile .getNumPartitions ());
44
+ textFile .coalesce (4 );
45
+ //---------------------------------------------
46
+
47
+ JavaPairRDD <String , Float > populations = textFile
48
+ .flatMap (s -> Arrays .asList (s .split ("\n " )).iterator ())
49
+ .mapToPair (line -> {
50
+ String [] fields = line .split ("," );
51
+ Float value = (fields [4 ].equals ("" )||fields [4 ].equals ("Population" )?-1 :Float .parseFloat (fields [4 ]));
52
+ return new Tuple2 <>(fields [0 ].toLowerCase (),value );
53
+ });
54
+
55
+ //Ex:02-----------------------------------------
56
+ populations .reduceByKey ((a , b ) -> {
57
+
58
+ if (!a .equals (-1 ))
59
+ if (!b .equals (-1 ))
60
+ return ((a >b )?a :b );
61
+ else
62
+ return a ;
63
+ else
64
+ if (!b .equals ((-1 )))
65
+ return b ;
66
+ return ((float ) -1 );
67
+ });
68
+
69
+ JavaPairRDD <String , Float > cleanPopulations = populations .filter (t -> t ._2 > 0 );
70
+ //---------------------------------------------
71
+
72
+
73
+ //Ex:03-----------------------------------------
74
+ StatCounter statistics = cleanPopulations .mapToDouble (t -> t ._2 )
75
+ .stats ();
76
+ System .out .println (statistics );
77
+ //---------------------------------------------
78
+
79
+ //Ex:04-----------------------------------------
80
+ JavaDoubleRDD example = cleanPopulations .mapToDouble (y -> y ._2 /statistics .count ());
81
+ Tuple2 <double [], long []> newResults = example .histogram ((int ) Math .log10 (statistics .max ()));
82
+
83
+ System .out .println (newResults );
84
+ //---------------------------------------------
85
+
86
+
87
+ //Ex:05-----------------------------------------
88
+ JavaRDD <String > joinFile = sc .textFile (joinFilePath );
89
+ textFile .coalesce (4 );
90
+ JavaPairRDD <String , String > regions = joinFile
91
+ .flatMap (s -> Arrays .asList (s .split ("\n " )).iterator ())
92
+ .mapToPair (line -> {
93
+ String [] fields = line .split ("," );
94
+ String value =fields [2 ];
95
+ return new Tuple2 <>(fields [0 ],value );
96
+ });
97
+
98
+ JavaPairRDD <String , String > cleanedRegions = regions .filter (t ->(!t ._1 .equals ("" )&&!t ._2 .equals ("" )));
99
+
100
+ JavaPairRDD <String , Tuple2 <String , Float >> regionCities = cleanedRegions .join (cleanPopulations );
101
+ //---------------------------------------------
102
+
103
+ //cleanPopulations.saveAsTextFile(outputDir);
104
+ regionCities .saveAsTextFile (outputDir );
105
+ }
106
+ public static boolean deleteDirectory (File directoryToBeDeleted ) {
107
+ File [] allContents = directoryToBeDeleted .listFiles ();
108
+ if (allContents != null ) {
109
+ for (File file : allContents ) {
110
+ deleteDirectory (file );
111
+ }
112
+ }
113
+ return directoryToBeDeleted .delete ();
114
+ }
115
+
116
+
117
+
118
+ }
0 commit comments