本文永久链接:https://www.askmac.cn/archives/hadoop-wordcount-1.html
用Hadoop的各种语言进行wordcount(1)
我稍微去调查了下Apache Crunch,顺便就在Hadoop中试着用各种语言来执行wordcount。首先是用MapReduce, HadoopStreaming、Hive、Pig执行了wordcount。
(追记):在github中放code:https://github.com/kawamon/wordcount.git
Wordcount的闲话
Wordcount经常在Hadoop的MapReduce的最开始的说明中使用,也有Hello World这样的意思。
Hadoop的MapReduce中,Wordcount作为样本拿来讲解的理由实在有点暧昧,大家肯定想问,为什么要拿wordcount来做样本呢。
现在处理所谓的量很多的大数据时,有两个问题。
- 为了将存储中保存的大量数据能用CPU来读入处理,移动数据是非常费时间的
- 用1台机器来执行耗费的时间太长(量大到内存上无法搭载,或者1台的CPU无法处理)
那么让我们试着使用之前安装的Cloudera Quickstart VM来执行吧。
准备
首先在HDFS中复制测试用的数据。这次使用的是crunch的样本,使用的是两个单纯的文件(file01, file02)(这是为了更容易比较结果)。
$ hadoop fs -cat input/file01 Hello World Bye World $ hadoop fs -cat input/file02 Hello Hadoop Goodbye Hadoop
MapReduce (Java)
首先是MapReduce (Java)。New API的WordCount。我参考了下述教程,但因为是Old API,所以需要做少许变更,请不要使用StringTokenizer。
WordCount.java
package org.myorg; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCount extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.out.printf( "Usage: %s [generic options] <input dir> <output dir>\n", getClass() .getSimpleName()); ToolRunner.printGenericCommandUsage(System.out); return -1; } Job job = new Job(getConf()); job.setJarByClass(WordCount.class); job.setJobName(this.getClass().getName()); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); if (job.waitForCompletion(true)) { return 0; } return 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } }
WordMapper.java
package org.myorg; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); for (String w : s.split("\W+")) { if (w.length() > 0) { word.set(w); context.write(word, one); } } } }
WordReducer.java
package org.myorg; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int wordCount = 0; for (IntWritable value : values) { wordCount += value.get(); } context.write(key, new IntWritable(wordCount)); } }
Compile与执行
$ javac -classpath `hadoop classpath` org/myorg/*.java Note: org/myorg/WordCount.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. [cloudera@quickstart mr_java]$ jar cvf wc.jar org/myorg/*.class added manifest adding: org/myorg/WordCount.class(in = 2253) (out= 1132)(deflated 49%) adding: org/myorg/WordMapper.class(in = 1915) (out= 810)(deflated 57%) adding: org/myorg/WordReducer.class(in = 1602) (out= 670)(deflated 58%) [cloudera@quickstart mr_java]$ hadoop jar wc.jar org.myorg.WordCount input output 14/12/14 05:08:37 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032 14/12/14 05:08:37 INFO input.FileInputFormat: Total input paths to process : 2 14/12/14 05:08:38 INFO mapreduce.JobSubmitter: number of splits:2 14/12/14 05:08:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0014 14/12/14 05:08:38 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0014 14/12/14 05:08:38 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0014/ 14/12/14 05:08:38 INFO mapreduce.Job: Running job: job_1418545807639_0014 14/12/14 05:08:51 INFO mapreduce.Job: Job job_1418545807639_0014 running in uber mode : false 14/12/14 05:08:51 INFO mapreduce.Job: map 0% reduce 0% 14/12/14 05:09:01 INFO mapreduce.Job: map 100% reduce 0% 14/12/14 05:09:11 INFO mapreduce.Job: map 100% reduce 100% 14/12/14 05:09:12 INFO mapreduce.Job: Job job_1418545807639_0014 completed successfully 14/12/14 05:09:12 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=87 FILE: Number of bytes written=319063 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=296 HDFS: Number of bytes written=41 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=13884 Total time spent by all reduces in occupied slots (ms)=4067 Total time spent by all map tasks (ms)=13884 Total time spent by all reduce tasks (ms)=4067 Total vcore-seconds taken by all map tasks=13884 Total vcore-seconds taken by all reduce tasks=4067 Total megabyte-seconds taken by all map tasks=14217216 Total megabyte-seconds taken by all reduce tasks=4164608 Map-Reduce Framework Map input records=2 Map output records=8 Map output bytes=82 Map output materialized bytes=101 Input split bytes=246 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=101 Reduce input records=8 Reduce output records=5 Spilled Records=16 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=232 CPU time spent (ms)=2460 Physical memory (bytes) snapshot=700850176 Virtual memory (bytes) snapshot=2683498496 Total committed heap usage (bytes)=510656512 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=50 File Output Format Counters Bytes Written=41
结果
$ hadoop fs -cat output/part-r-00000 Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2
HadoopStreaming
mapper.pl
#!/usr/bin/env perl while (<>) { chomp(); (@words) = split /\W+/; foreach $w (@words) { print "$w\t1\n" } }
reduce.pl
#!/usr/bin/env perl $sum = 0; $last = ""; while(<>) { chomp; ($key,$value) = split /\t/; $last = $key if $last eq ""; if ($last ne $key) { print "$last\t$sum\n"; $last = $key; $sum = 0; } $sum += $value; } print "$key\t$sum\n";
执行
$ $ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -mapper mapper.pl -reducer reduce.pl -file mapper.pl -file reduce.pl -input input -output streamoutput 14/12/14 05:53:58 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. packageJobJar: [mapper.pl, reduce.pl] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar] /tmp/streamjob8660928725375064201.jar tmpDir=null 14/12/14 05:53:59 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032 14/12/14 05:54:00 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032 14/12/14 05:54:01 INFO mapred.FileInputFormat: Total input paths to process : 2 14/12/14 05:54:01 INFO mapreduce.JobSubmitter: number of splits:3 14/12/14 05:54:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0025 14/12/14 05:54:01 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0025 14/12/14 05:54:01 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0025/ 14/12/14 05:54:01 INFO mapreduce.Job: Running job: job_1418545807639_0025 14/12/14 05:54:13 INFO mapreduce.Job: Job job_1418545807639_0025 running in uber mode : false 14/12/14 05:54:13 INFO mapreduce.Job: map 0% reduce 0% 14/12/14 05:54:26 INFO mapreduce.Job: map 100% reduce 0% 14/12/14 05:54:36 INFO mapreduce.Job: map 100% reduce 100% 14/12/14 05:54:37 INFO mapreduce.Job: Job job_1418545807639_0025 completed successfully 14/12/14 05:54:37 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=83 FILE: Number of bytes written=437439 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=383 HDFS: Number of bytes written=41 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=33174 Total time spent by all reduces in occupied slots (ms)=3734 Total time spent by all map tasks (ms)=33174 Total time spent by all reduce tasks (ms)=3734 Total vcore-seconds taken by all map tasks=33174 Total vcore-seconds taken by all reduce tasks=3734 Total megabyte-seconds taken by all map tasks=33970176 Total megabyte-seconds taken by all reduce tasks=3823616 Map-Reduce Framework Map input records=2 Map output records=8 Map output bytes=66 Map output materialized bytes=119 Input split bytes=330 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=119 Reduce input records=8 Reduce output records=5 Spilled Records=16 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=222 CPU time spent (ms)=3050 Physical memory (bytes) snapshot=967741440 Virtual memory (bytes) snapshot=3570974720 Total committed heap usage (bytes)=719847424 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=53 File Output Format Counters Bytes Written=41 14/12/14 05:54:37 INFO streaming.StreamJob: Output directory: streamoutput
结果
$ hadoop fs -cat streamoutput/part-00000 Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2
Hive
接下来是hive。请参考programming hive。在hive中将text文件作为外部表。
HiveQL
DROP TABLE docs; CREATE EXTERNAL TABLE docs (line STRING) LOCATION '/user/cloudera/input'; SELECT word, count(1) AS count FROM (SELECT explode(split(line, ' ')) AS word FROM docs) w GROUP BY word ORDER BY word;
执行与结果
hive> SELECT word, count(1) AS count FROM > (SELECT explode(split(line, ' ')) AS word FROM docs) w GROUP BY word > ORDER BY word; Total jobs = 2 Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1418545807639_0019, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0019/ Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1418545807639_0019 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2014-12-14 05:23:53,107 Stage-1 map = 0%, reduce = 0% 2014-12-14 05:24:00,570 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.29 sec 2014-12-14 05:24:10,901 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.49 sec MapReduce Total cumulative CPU time: 2 seconds 490 msec Ended Job = job_1418545807639_0019 Launching Job 2 out of 2 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1418545807639_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0020/ Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1418545807639_0020 Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1 2014-12-14 05:24:26,302 Stage-2 map = 0%, reduce = 0% 2014-12-14 05:24:33,842 Stage-2 map = 100%, reduce = 0%, Cumulative CPU 1.04 sec 2014-12-14 05:24:43,114 Stage-2 map = 100%, reduce = 100%, Cumulative CPU 2.26 sec MapReduce Total cumulative CPU time: 2 seconds 260 msec Ended Job = job_1418545807639_0020 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.49 sec HDFS Read: 337 HDFS Write: 217 SUCCESS Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 2.26 sec HDFS Read: 594 HDFS Write: 41 SUCCESS Total MapReduce CPU Time Spent: 4 seconds 750 msec OK Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2 Time taken: 66.765 seconds, Fetched: 5 row(s) hive>
Pig
然后是pig。
PigLatin Script
docs = LOAD '/user/cloudera/input' AS (line:chararray); words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word; groupd = GROUP words BY word; wordcount = FOREACH groupd GENERATE group, COUNT(words); DUMP wordcount;
执行与结果
grunt> docs = LOAD '/user/cloudera/input' AS (line:chararray); grunt> words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word; grunt> groupd = GROUP words BY word; grunt> wordcount = FOREACH groupd GENERATE group, COUNT(words); grunt> DUMP wordcount; 2014-12-14 05:27:00,067 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY 2014-12-14 05:27:00,112 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]} 2014-12-14 05:27:00,230 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false (略) 2014-12-14 05:27:42,341 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2014-12-14 05:27:42,341 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 (Bye,1) (Hello,2) (World,2) (Hadoop,2) (Goodbye,1) grunt>
因为太长了,明天我将继续讲解。
Comment