本文永久链接:https://www.askmac.cn/archives/hadoop-wordcount-3.html
用Hadoop的各种语言来进行wordcount(3):Apache Crunch
这是Wordcount的最后一篇讲座了。今天用crunch在MapReduce与Spark两方面进行wordcount。
Crunch (MapReduce)
Apache Crunch是apache project的OSS,所以这是将Google的Java Flume作为基础的。通过使用crunch,可以简单地记述mapreduce(现在是spark)的pipeline(现在是spark的program)的库。(即可以简单做到Apache Crunch:MapReduce 模块的java 库)
Crunch是由Cloudera的首席数据科学家,Josh Will开发、维护的。国外的数据科学家都是自己开发必要的工具呢。(Cloudera Oryx、Impyla、其他)。真是太厉害了。
crunch的参考链接
http://crunch.apache.org/getting-started.html
WordCount的代码参考crunch的页面,可以下载演示用代码来执行。
git clone http://github.com/jwills/crunch-demo
src/main/java/com/example/WordCount.java
package com.example; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.spark.SparkPipeline; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * A word count example for Apache Crunch, based on Crunch's example projects. */ public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar" + " [generic options] input output"); System.err.println(); GenericOptionsParser.printGenericCommandUsage(System.err); return 1; } String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); // Pipeline pipeline = new SparkPipeline("local","sort"); // Reference a given text file as a collection of Strings. PCollection<String> lines = pipeline.readTextFile(inputPath); // Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings()); // Take the collection of words and remove known stop words. PCollection<String> noStopWords = words.filter(new StopWordFilter()); // The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable<String, Long> counts = noStopWords.count(); // Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; } }
src/main/java/com/example/StopWordFilter.java
package com.example; import java.util.Set; import org.apache.crunch.FilterFn; import com.google.common.collect.ImmutableSet; /** * A filter that removes known stop words. */ public class StopWordFilter extends FilterFn<String> { // English stop words, borrowed from Lucene. private static final Set<String> STOP_WORDS = ImmutableSet.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }
src/main/java/com/example/Tokenizer.java
package com.example; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import com.google.common.base.Splitter; /** * Splits a line of text, filtering known stop words. */ public class Tokenizer extends DoFn<String, String> { private static final Splitter SPLITTER = Splitter.onPattern("\s+").omitEmptyStrings(); @Override public void process(String line, Emitter<String> emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } }
构建
Crunch的样本代码是用maven来进行构建,所以可以直接这样使用。
$ mvn package [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building crunch-demo 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ crunch-demo --- [debug] execute contextualize [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/main/resources [INFO] [INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ crunch-demo --- [INFO] Compiling 3 source files to /home/cloudera/work/crunch/crunch-demo/target/classes [INFO] [INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ crunch-demo --- [debug] execute contextualize [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/test/resources [INFO] [INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ crunch-demo --- [INFO] Compiling 2 source files to /home/cloudera/work/crunch/crunch-demo/target/test-classes [INFO] [INFO] --- maven-surefire-plugin:2.10:test (default-test) @ crunch-demo --- (略) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 12.738s [INFO] Finished at: Sun Dec 14 20:28:27 PST 2014 [INFO] Final Memory: 33M/311M [INFO] ------------------------------------------------------------------------
执行
hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar input crunch.output
14/12/14 20:30:53 INFO impl.FileTargetImpl: Will write output files to new path: crunch.output 14/12/14 20:30:54 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 14/12/14 20:30:55 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032 14/12/14 20:30:56 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize 14/12/14 20:30:56 INFO input.FileInputFormat: Total input paths to process : 2 14/12/14 20:30:56 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 50 14/12/14 20:30:56 INFO mapreduce.JobSubmitter: number of splits:1 14/12/14 20:30:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0027 14/12/14 20:30:57 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0027 14/12/14 20:30:57 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0027/ 14/12/14 20:30:57 INFO jobcontrol.CrunchControlledJob: Running job "com.example.WordCount: Text(input)+S0+S1+Aggregate.count+GBK+combine+asText+Text... (1/1)" 14/12/14 20:30:57 INFO jobcontrol.CrunchControlledJob: Job status available at: http://quickstart.cloudera:8088/proxy/application_1418545807639_0027/
结果
hadoop fs -cat crunch.output/part-r-00000[Bye,1] [Goodbye,1] [Hadoop,2] [Hello,2] [World,2]
Crunch (Spark)
那么最后就是用crunch的spark了。源代码几乎与java相同,不同的只有一行。(MRPipeline -> SparkPipeline)
-// Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); + Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); - Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount"); + //Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");
在crunch中执行spark应用的信息非常少,我为此费了一番心力,终于成功启动了。
下述以外的文件是相同的。
src/main/java/com/example/WordCount.java
package com.example; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.spark.SparkPipeline; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * A word count example for Apache Crunch, based on Crunch's example projects. */ public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar" + " [generic options] input output"); System.err.println(); GenericOptionsParser.printGenericCommandUsage(System.err); return 1; } String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. // Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); //Pipeline pipeline = new SparkPipeline("local","CrunchWordCount"); Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount"); // Reference a given text file as a collection of Strings. PCollection<String> lines = pipeline.readTextFile(inputPath); // Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings()); // Take the collection of words and remove known stop words. PCollection<String> noStopWords = words.filter(new StopWordFilter()); // The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable<String, Long> counts = noStopWords.count(); // Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; } }
构建
构建也是使用maven。几乎完全相同。
mvn package [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building crunch-demo 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ crunch-demo --- [debug] execute contextualize [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/main/resources [INFO] [INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ crunch-demo --- [INFO] Compiling 1 source file to /home/cloudera/work/crunch/crunch-demo/target/classes [INFO] [INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ crunch-demo --- [debug] execute contextualize [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/test/resources [INFO] [INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ crunch-demo --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-surefire-plugin:2.10:test (default-test) @ crunch-demo --- [INFO] Surefire report directory: /home/cloudera/work/crunch/crunch-demo/target/surefire-reports ------------------------------------------------------- T E S T S ------------------------------------------------------- Running com.example.StopWordFilterTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.139 sec Running com.example.TokenizerTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.233 sec Results : Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ crunch-demo --- [INFO] Building jar: /home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT.jar [INFO] [INFO] --- maven-assembly-plugin:2.3:single (make-assembly) @ crunch-demo --- [INFO] Reading assembly descriptor: src/main/assembly/hadoop-job.xml [INFO] Building jar: /home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT-job.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 12.243s [INFO] Finished at: Mon Dec 15 07:23:26 PST 2014 [INFO] Final Memory: 32M/297M [INFO] ------------------------------------------------------------------------
执行
在执行前设定SPARK_CLASSPATH环境变量。不这样做的话,执行中在ClassNotFound的例外中的话就会报错。
export SPARK_CLASSPATH=/usr/lib/crunch/* spark-submit --class com.example.WordCount target/crunch-demo-1.0-SNAPSHOT-job.jar input crunch.output
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/spark/assembly/lib/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/12/15 07:28:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/15 07:28:47 INFO FileTargetImpl: Will write output files to new path: crunch.output 14/12/15 07:28:47 WARN SparkConf: SPARK_CLASSPATH was detected (set to '/usr/lib/crunch/*'). 在 Spark 1.0+后过时了. 请使用下列: - ./spark-submit with --driver-class-path 来定义驱动路径 - spark.executor.extraClassPath 带指定执行者class路径 14/12/15 07:28:47 WARN SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/lib/crunch/*' as a work-around. 14/12/15 07:28:47 WARN SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/lib/crunch/*' as a work-around. 14/12/15 07:28:47 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.2.220 instead (on interface eth1) 14/12/15 07:28:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/12/15 07:28:47 INFO SecurityManager: Changing view acls to: cloudera 14/12/15 07:28:47 INFO SecurityManager: Changing modify acls to: cloudera 14/12/15 07:28:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 14/12/15 07:28:47 INFO Slf4jLogger: Slf4jLogger started 14/12/15 07:28:47 INFO Remoting: Starting remoting 14/12/15 07:28:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.2.220:46973] 14/12/15 07:28:47 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.2.220:46973] 14/12/15 07:28:47 INFO Utils: Successfully started service 'sparkDriver' on port 46973. 14/12/15 07:28:48 INFO SparkEnv: Registering MapOutputTracker 14/12/15 07:28:48 INFO SparkEnv: Registering BlockManagerMaster 14/12/15 07:28:48 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141215072848-a8ab 14/12/15 07:28:48 INFO Utils: Successfully started service 'Connection manager for block manager' on port 38854. 14/12/15 07:28:48 INFO ConnectionManager: Bound socket to port 38854 with id = ConnectionManagerId(192.168.2.220,38854) 14/12/15 07:28:48 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 14/12/15 07:28:48 INFO BlockManagerMaster: Trying to register BlockManager 14/12/15 07:28:48 INFO BlockManagerMasterActor: Registering block manager 192.168.2.220:38854 with 265.4 MB RAM 14/12/15 07:28:48 INFO BlockManagerMaster: Registered BlockManager 14/12/15 07:28:48 INFO HttpFileServer: HTTP File server directory is /tmp/spark-a04d9dc5-d340-4237-bb12-cf45e390bbd5 14/12/15 07:28:48 INFO HttpServer: Starting HTTP Server 14/12/15 07:28:48 INFO Utils: Successfully started service 'HTTP file server' on port 46908. 14/12/15 07:28:48 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/15 07:28:48 INFO SparkUI: Started SparkUI at http://192.168.2.220:4040 14/12/15 07:28:48 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/crunchwordcount-1418657328679 14/12/15 07:28:49 INFO SparkContext: Added JAR file:/home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT-job.jar at http://192.168.2.220:46908/jars/crunch-demo-1.0-SNAPSHOT-job.jar with timestamp 1418657329490 14/12/15 07:28:49 INFO AppClient$ClientActor: Connecting to master spark://quickstart.cloudera:7077... 14/12/15 07:28:49 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/12/15 07:28:49 INFO MemoryStore: ensureFreeSpace(65576) called with curMem=0, maxMem=278302556 14/12/15 07:28:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.0 KB, free 265.3 MB) 14/12/15 07:28:49 INFO MemoryStore: ensureFreeSpace(20557) called with curMem=65576, maxMem=278302556 14/12/15 07:28:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.1 KB, free 265.3 MB) 14/12/15 07:28:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.220:38854 (size: 20.1 KB, free: 265.4 MB) 14/12/15 07:28:49 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 14/12/15 07:28:50 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141215072850-0001 14/12/15 07:28:50 INFO AppClient$ClientActor: Executor added: app-20141215072850-0001/0 on worker-20141215063206-192.168.2.220-7078 (192.168.2.220:7078) with 2 cores 14/12/15 07:28:50 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141215072850-0001/0 on hostPort 192.168.2.220:7078 with 2 cores, 512.0 MB RAM 14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(260900) called with curMem=86133, maxMem=278302556 14/12/15 07:28:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 254.8 KB, free 265.1 MB) 14/12/15 07:28:50 INFO AppClient$ClientActor: Executor updated: app-20141215072850-0001/0 is now LOADING 14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(21040) called with curMem=347033, maxMem=278302556 14/12/15 07:28:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.5 KB, free 265.1 MB) 14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:38854 (size: 20.5 KB, free: 265.4 MB) 14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 14/12/15 07:28:50 INFO AppClient$ClientActor: Executor updated: app-20141215072850-0001/0 is now RUNNING 14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(65576) called with curMem=368073, maxMem=278302556 14/12/15 07:28:50 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 64.0 KB, free 265.0 MB) 14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(20557) called with curMem=433649, maxMem=278302556 14/12/15 07:28:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.1 KB, free 265.0 MB) 14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.220:38854 (size: 20.1 KB, free: 265.4 MB) 14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 14/12/15 07:28:50 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 14/12/15 07:28:50 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at SparkRuntime.java:321 14/12/15 07:28:50 INFO deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize 14/12/15 07:28:50 INFO FileInputFormat: Total input paths to process : 2 14/12/15 07:28:50 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 50 14/12/15 07:28:50 INFO DAGScheduler: Registering RDD 9 (mapToPair at PGroupedTableImpl.java:115) 14/12/15 07:28:50 INFO DAGScheduler: Got job 0 (saveAsNewAPIHadoopFile at SparkRuntime.java:321) with 1 output partitions (allowLocal=false) 14/12/15 07:28:50 INFO DAGScheduler: Final stage: Stage 0(saveAsNewAPIHadoopFile at SparkRuntime.java:321) 14/12/15 07:28:50 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/12/15 07:28:50 INFO DAGScheduler: Missing parents: List(Stage 1) 14/12/15 07:28:50 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[9] at mapToPair at PGroupedTableImpl.java:115), which has no missing parents 14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(6984) called with curMem=454206, maxMem=278302556 14/12/15 07:28:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 6.8 KB, free 265.0 MB) 14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(3636) called with curMem=461190, maxMem=278302556 14/12/15 07:28:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.6 KB, free 265.0 MB) 14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.220:38854 (size: 3.6 KB, free: 265.3 MB) 14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 14/12/15 07:28:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[9] at mapToPair at PGroupedTableImpl.java:115) 14/12/15 07:28:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/12/15 07:28:53 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@192.168.2.220:46808/user/Executor#-1997953566] with ID 0 14/12/15 07:28:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 192.168.2.220, ANY, 1556 bytes) 14/12/15 07:28:53 INFO BlockManagerMasterActor: Registering block manager 192.168.2.220:38205 with 265.4 MB RAM 14/12/15 07:28:55 INFO ConnectionManager: Accepted connection from [192.168.2.220/192.168.2.220:47191] 14/12/15 07:28:55 INFO SendingConnection: Initiating connection to [/192.168.2.220:38205] 14/12/15 07:28:55 INFO SendingConnection: Connected to [/192.168.2.220:38205], 1 messages pending 14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.220:38205 (size: 3.6 KB, free: 265.4 MB) 14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:38205 (size: 20.5 KB, free: 265.4 MB) 14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.220:38205 (size: 20.1 KB, free: 265.4 MB) 14/12/15 07:28:57 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 4136 ms on 192.168.2.220 (1/1) 14/12/15 07:28:57 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/12/15 07:28:57 INFO DAGScheduler: Stage 1 (mapToPair at PGroupedTableImpl.java:115) finished in 6.832 s 14/12/15 07:28:57 INFO DAGScheduler: looking for newly runnable stages 14/12/15 07:28:57 INFO DAGScheduler: running: Set() 14/12/15 07:28:57 INFO DAGScheduler: waiting: Set(Stage 0) 14/12/15 07:28:57 INFO DAGScheduler: failed: Set() 14/12/15 07:28:57 INFO DAGScheduler: Missing parents for Stage 0: List() 14/12/15 07:28:57 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[18] at mapToPair at SparkRuntime.java:307), which is now runnable 14/12/15 07:28:57 INFO MemoryStore: ensureFreeSpace(69448) called with curMem=464826, maxMem=278302556 14/12/15 07:28:57 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 67.8 KB, free 264.9 MB) 14/12/15 07:28:57 INFO MemoryStore: ensureFreeSpace(25227) called with curMem=534274, maxMem=278302556 14/12/15 07:28:57 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 24.6 KB, free 264.9 MB) 14/12/15 07:28:57 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.220:38854 (size: 24.6 KB, free: 265.3 MB) 14/12/15 07:28:57 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0 14/12/15 07:28:57 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[18] at mapToPair at SparkRuntime.java:307) 14/12/15 07:28:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/12/15 07:28:57 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 1, 192.168.2.220, PROCESS_LOCAL, 1022 bytes) 14/12/15 07:28:57 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.220:38205 (size: 24.6 KB, free: 265.3 MB) 14/12/15 07:28:57 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@192.168.2.220:41851 14/12/15 07:28:57 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 131 bytes 14/12/15 07:28:58 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 534 ms on 192.168.2.220 (1/1) 14/12/15 07:28:58 INFO DAGScheduler: Stage 0 (saveAsNewAPIHadoopFile at SparkRuntime.java:321) finished in 0.534 s 14/12/15 07:28:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/15 07:28:58 INFO SparkContext: Job finished: saveAsNewAPIHadoopFile at SparkRuntime.java:321, took 7.709089524 s 14/12/15 07:28:58 INFO SparkUI: Stopped Spark web UI at http://192.168.2.220:4040 14/12/15 07:28:58 INFO DAGScheduler: Stopping DAGScheduler 14/12/15 07:28:58 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/12/15 07:28:58 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/12/15 07:28:58 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(192.168.2.220,38205) 14/12/15 07:28:58 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(192.168.2.220,38205) 14/12/15 07:28:58 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(192.168.2.220,38205) not found 14/12/15 07:28:59 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/12/15 07:28:59 INFO ConnectionManager: Selector thread was interrupted! 14/12/15 07:28:59 INFO ConnectionManager: ConnectionManager stopped 14/12/15 07:28:59 INFO MemoryStore: MemoryStore cleared 14/12/15 07:28:59 INFO BlockManager: BlockManager stopped 14/12/15 07:28:59 INFO BlockManagerMaster: BlockManagerMaster stopped 14/12/15 07:28:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/12/15 07:28:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/12/15 07:28:59 INFO SparkContext: Successfully stopped SparkContext
结果
hadoop fs -cat crunch.output/part-r-00000[Hello,2] [Bye,1] [World,2] [Goodbye,1] [Hadoop,2]
至此,Apache Crunch的MapReduce与Spark的源代码就几乎可以相同地来执行了。虽然信息量较少的话非常难以处理。但这仍旧是一个强力的功能。
Comment