测试Hadoop程序

本文固定链接:https://www.askmac.cn/archives/testing-hadoop-programs.html

测试Hadoop程序

 

本章介绍了如何在你本地集成开发环境( IDE )中对Hadoop程序进行单元测试。虽然自早期的Hadoop以来,Hadoop程序单元测试已经取得很大进展,但因为Hadoop的组件,如Mappers 和 Reducers,是在分布式环境中运行,它仍然是具有挑战性的。

我们讨论了MapReduce单元测试API,称为MRUnit,这使我们能够独立地对Mapper 和 Reducer类进行单元测试。讨论完MRUnit的限制,我们将探讨处理这些限制的LocalJobRunner类。本章结尾将着探讨MiniMRCluster类,这使我们能够在内存中运行一个完整的MapReduce框架,使其在整个MapReduce框架环境下,适合于MapReduce组件的综合测试。

 

回顾Word Counter

表8-1是你在第3章所看到的word counter的Reducer

8-1 WordCountNewAPI.java的Reducer部分

public static class MyReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException

{

int sum = 0;

for(IntWritable val : values) { 
sum += val.get();

}

context.write(key, new IntWritable(sum));

}

}

虽然该程序发挥着非常简单的功能:在一组文件中记字数,但进行单元测试其实是非常困难的,因为对各种Hadoop类,如Context类的依赖性,它需要在MapReduce集群上执行。Hadoop程序员的典型设计模式是将所有的重要的逻辑移进服务,作为Plain Old Java Objects (POJOs),因为这些POJO类很容易进行单元测试。Mappers 和 Reducers充当各种服务之间的协调机制,负责I / O从/到HDFS(www.askmac.cn)。

UnitTestableReducer的完整代码,如表8-2所示,展示了这一概念。有一种被恰当地称为UnitTestableReducer的Reducer。在reduce调用内部,我们实例化一个WordCountingService实例。注意,每次调用reduce方法都会创建其自己的WordCountingService实例,这是状态性的,保持为给定词计数的状态。即便使用Combiner,程序仍运行,因为该服务的incrementCount方法接受一个int参数。如果没有Combiner,val.get()返回1 ;如果有Combiner,val.get()返回一个大于1的值(www.askmac.cn)。

 

8-2  UnitTestableReducer.java

public static class UnitTestableReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException,

InterruptedException { 
WordCountingService service = new WordCountingService(key); 
for (IntWritable val : values) 
{

service.incrementCount(val.get());

}

IntWritable wrdCnt = IntWritable(service.getCount()); 
context.write(service.getWord(),wrdCnt);

}

}

 

表8-3展示了整个WordCountingService类。

8-3 WordCountingService.java

 

/*Package and Import declarations*/ public class WordCountingService {

private Text word=null; 
private int count=0;

public WordCountingService(Text word){ 
this.word = word;
}

public void incrementCount(int incr){ 
this.count=this.count+incr;
}

public Text getWord() { 
return word;
}

public int getCount() { 
return this.count;

}

}

 

介绍MRUnit

虽然上一节中讨论的开发模式使你能够测试许多MapReduce任务,有时你的输入在你的Mapper 或 Reducer中可能经历多个状态变化。对于一个给定键,Reducer接收的值之间可能有依赖关系。这种依赖关系的一个例子是当在Reducer中求一个给定键的值时,Reducer会执行一些计算。当一个键的所有值都被接收时,它就执行附加计算。这种情况下也需要某些被存储起来的值,直到所有的值都被接收。在像这种复杂情况下,用前面例子中的开发模式被证明是冗长枯燥的,甚至可能导致Reducer和测试单元之间的代码重复(www.askmac.cn)。

MRUnit是一个使我们能够测试Mappers、Reducers以及作为一个整体的MapReduce程序的库。 MRUnit,由Cloudera进行开发和开源,是MapReduce和标准测试库,方便JUnit和的Mockito之间的集成。接下来的几节将展示MRUnit如何对MapReduce任务进行单元测试。

 

安装MRUnit

因为我们用Maven创建代码,我们也用它来安装MRUnit。要安装MRUnit,需要将以下几行代码添加到你的pom.xml文件(www.askmac.cn)。

 

<!--  Start MR Unit -->

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

</dependency>



<dependency>

<groupId>org.apache.mrunit</groupId>

<artifactId>mrunit</artifactId>

<version>1.0.0</version>

<classifier>hadoop2</classifier>

</dependency>

<!--  End MR Unit -->

<classifier>标签表明下载的MRUnit JAR文件适用于Hadoop 2。生成的JAR文件名称为:mrunit-1.0.0-hadoop2.jar,文件的指定格式为:<artifactId>-<version>-<classifier>.jar。

MRUnit 核心类

MRUnit的核心类如下所示:

  • MapDriver: 负责调用Mapper 的map()方法的驱动类。
  • ReducerDriver: 负责调用Reducer的 reduce()方法的驱动类。
  • MapReduceDriver: 负责调用Mapper的 map()方法的组合MapReduce驱动,然后是内存中Shuffle阶段。内存中Shuffle阶段后接着是调用Reducer的 reduce()方法(www.askmac.cn)。

每个驱动具有使其能够围绕run()方法提供输入和那些输入的预期输出的方法。JUnit API的setup()方法负责创建Mapper, Reducer以及前面提到的适当的MRUnit驱动的新实例。

正如你在第3章学到的,有两个版本的MapReduce(旧的API和新的API),且MRUnit与每个API有相匹配的驱动。对应于MapReducerV1 API (旧 API)的驱动可在org.apache.hadoop.mrunit包中找到;对应于MapReduceV2 API (新 API)的驱动位于org.apache.hadoop.mrunit.mapreduce包中。

  • 注意 本章仅使用新 api(mapreduceV2),因此我们使用后一种包。如果你使用的是旧api之一,请使用前一种包的驱动器。

 

编写一个MRUnit测试用例

 

在源代码中(可从本书推荐的网站中下载),你会在org.apress.prohadoop.c8包中发现以下列表,我们为其编写MRUnit测试用例:

  • java
  • java

包含MRUnit测试用例的文件是WordCountMRUnitTest,在org.apress.prohadoop.c8包中也能找到(www.askmac.cn)。

该类的框架如表8-4所示:

 

8-4 WordCountMRUnitTest.java

/*Package and Import Declarations*/

public class WordCountMRUnitTest extends TestCase { private Mapper mapper;

private Reducer reducer; private MapDriver mapDriver;

private ReduceDriver reduceDriver; private MapReduceDriver mapReduceDriver;



@Before

public void setUp() {

/*Initialize Mappers, Reducers and MRUnit Drivers*/

}

@Test

public void testWordCountMapper() {

{

/*Test only the Mapper*/

}

@Test

public void testWordCountReducer() {

{

/*Test only the Reducer*/

}


@Test

public void testWordCountMapReducer() {

{

/*

Test both Mapper and Reducer together where Mapper Inputs are passed to the Reducer after an in-memory Shuffle

*/

}

 

一个典型的MRUnit程序的框架模式如表8-4所示。(在本节后面的内容中,假设你是熟悉JUnit4的。)在每一次带@Test标记的方法被调用之前,带@Before标记的方法会被调用。setUp()方法确保Mappers, Reducers, 和 Drivers的新实例对每一次测试是可用的(www.askmac.cn)。

表8-5展示了setUp()方法。

8-5 setUp() 方法

@Before

public void setUp() {

mapper = new WordCountMapper(); 
mapDriver = new MapDriver(mapper);

reducer = new WordCountReducer(); 
reduceDriver = new ReduceDriver(reducer);

mapReduceDriver = new MapReduceDriver(mapper,reducer);

}

接下来,我们将展示如何轻松地测试Mapper。早期版本的MRUnit只允许单个输入,但版本1.0.0消除了这一限制,现在可将多个输入传递到驱动(它扩展了用MRUnit可运行的测试类型)。一个给定键往往需要多个输入来制定全面的测试。例如,曲线图问题,其中键是公共顶点,值是共享一个公共顶点的边,如果Mappers只允许单个输入的话,就不能对曲线图进行全面测试(www.askmac.cn)。

 

表8-6 展示了MapDriver实例是如何用来仅测试Mapper的。

8-6  testWordCountMapper()方法

 

@Test

public void testWordCountMapper() {

{

mapDriver.withInput(new LongWritable(1), new Text("map"))

.withInput(new LongWritable(2), new Text("reduce"))

.withOutput(new Text("map"), new IntWritable(1))

.withOutput(new Text("reduce"), new IntWritable(1))

.runTest();

}

 

这段代码调用withInput ( )方法,在MapDriver实例上有两个输入。注意,我们从物理上硬编码传递到该程序的典型输入,此程序来自TextInputFormat(参见表8-1)。第一个参数(LongWritable实例)是文件中每一行开始时的字节偏移量,第二个参数是实际行。因为我们忽略了第一个参数,然后添加一个适当类型的值(www.askmac.cn)。

输出对是作为输入传递的词的一个Text 实例,接着是一个值为1的IntWritable实例。输出对中的词序必须匹配输入中的词序,否则单元测试失败。

接下来我们用相关的testWordCountReducer()方法只测试Reducer,此方法如表8-7所示:

 

8-7 testWordCountReducer()

@Test

public void testWordCountReducer() throws Exception { 
Text key1 = new Text("map");

List<IntWritable> values1 = new ArrayList<IntWritable>(); 
values1.add(new IntWritable(1));
values1.add(new IntWritable(1));
Text key2 = new Text("reducer");

List<IntWritable> values2 = new ArrayList<IntWritable>(); 
values2.add(new IntWritable(1));

values2.add(new IntWritable(1)); 
values2.add(new IntWritable(1)); 
reduceDriver.withInput(key1, values1)
.withInput(key2, values2)
.withOutput(key1, new IntWritable(2))
.withOutput(key2, new IntWritable(3))
.runTest();

}

我们假设Reducer接收map的两个输入,reducer接收三个输入。该例中的预期输出按相应的顺序为<map, 2> 和 <reduce, 3>对。Reducer输出键和值实例的输出类型分别为Text 和IntWritable(www.askmac.cn)。

最后,Mapper 和Reducer一起测试,这是对完整的MapReduce程序一次真正的测试。Mapper输出由MRUnit程序在内存中洗牌,最后被Reducer使用。提供给该程序(通过withOutput( )方法)的预期输出是通过withInput()方法提供的Mapper输入作为整体的MapReduce程序的预期输出。

Listing 8-8 shows how the MapReduce program can be tested as a whole.

表8-8展示了MapReduce程序如何作为整体进行测试。

8-8 testWordCountMapReducer()方法

 

@Test

public void testWordCountMapReducer() throws Exception { 
mapReduceDriver.withInput(new LongWritable(1), new Text("map"))

.withInput(new LongWritable(2), new Text("map"))

.withInput(new LongWritable(3), new Text("reduce"))

.withOutput(new Text("map"), new IntWritable(2))

.withOutput(new Text("reduce"), new IntWritable(1))

.runTest();

}

 

测试Counters

MapReduce counters(第6章中介绍的)在Map Reduce程序中被广泛应用。在某些情况下,counters被用作测试工件。例如,在社交网络或欺诈检测应用中使用的复杂的图算法对于互连事件可能具有极为复杂的逻辑。在这种情况下经常使用Counters以确保某个指定事件交互的实际计数与预期次数相匹配。这可作为一个高级检查,以确保所有预期的事件交互如预期那样被使用(www.askmac.cn)。

MRUnit用于测试counters的方法可通过一个非常简单的例子证明。WordCountReducer以某个字母开始计词数,若第一个字母区分大小写,则可能产生26次。

 

表8-9展示了管理此counter的新WordCountWithCounterReducer。

8-9 WordCountWithCounterReducer

public class WordCountWithCounterReducer

extends Reducer<Text, IntWritable, Text, IntWritable> { 
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException

{

int sum = 0;

for(IntWritable val : values) 
{ sum += val.get();

}

String firstLt = key.toString().substring(0,1).toLowerCase(); 
context.getCounter("FIRST_LETTER", firstLt).increment(sum); 
context.write(key, new IntWritable(sum));

}

}

 

用之前的Reducer测试MapReduce 程序的WordCountMRUnitCountersTest.java源文件在org.apress.prohadoop.c8包中可找到。唯一有所不同的组件是表8-10所示的setUp()方法。用新的Reducer来代替旧的,旧的Reducer不管理counters(www.askmac.cn)。

 

8-10 WordCountWithCounterReducer的setUp()方法

 

/*Other Import Statements*/

/*JUnit specific import statements start here*/ 
import org.junit.Before;

import org.junit.Test;

import junit.framework.Assert; 
import junit.framework.TestCase;

@Before

public void setUp() {

mapper = new WordCountMapper(); 
mapDriver = new MapDriver(mapper);

reducer  =  new  WordCountWithCounterReducer();

reduceDriver = new ReduceDriver(reducer);

mapReduceDriver = new MapReduceDriver(mapper,reducer);

}

 

测试counter的两种方法是如表8-11所示的testWordCountReducer(),和表8-12所示的testWordCountMapReducer() 。

 

8-11 testWordCountReducer()方法

 

@Test

public void testWordCountReducer() throws Exception { 
Text key = new Text("map");

List<IntWritable> values = new ArrayList<IntWritable>(); 
values.add(new IntWritable(1));

values.add(new IntWritable(1)); reduceDriver.withInput(key, values)

.withOutput(key, new IntWritable(2))

.runTest();

Assert.assertEquals(2,

reduceDriver.getCounters()

.findCounter("FIRST_LETTER", "m")

.getValue());

Assert.assertEquals(0,

reduceDriver.getCounters()

.findCounter("FIRST_LETTER",  "n")

.getValue());

}

8-12  testWordCountMapReducer()方法(www.askmac.cn)

 

@Test

public void testWordCountMapReducer() throws Exception { 
this.mapReduceDriver

.withInput(new LongWritable(1), new Text("map"))

.withInput(new LongWritable(2), new Text("map"))

.withInput(new LongWritable(3), new Text("reduce"))

.withOutput(new Text("map"), new IntWritable(2))

.withOutput(new Text("reduce"), new IntWritable(1))

.runTest();



Assert.assertEquals(2,mapReduceDriver.getCounters()

.findCounter("FIRST_LETTER", "m")

.getValue());

Assert.assertEquals(0,

mapReduceDriver.getCounters()

.findCounter("FIRST_LETTER", "n")

.getValue());


}

 

前面被测试的程序只有counters来自ReducerDriver,但于由来自MapDriver的Mappers管理的counters的测试过程是相似的。调用MapDriver(假设mapDriver是MapDriver的一个实例)的方法如下:

mapDriver.getCounter().findCounter(…)

MapReduceDriver也能够验证由Mapper和Reducer管理的counters。(www.askmac.cn)

 

  • 注意 Assert是一个具有一系列重载的assertEquals()方法的JUnit类,assertEquals()方法比较预期值(第一个参数)和实际值(第二个参数)。该例子用assertEquals(long expected,long actual)方法与长值比较。

 

MRUnit的特征

我们已经向你展示了一些MRUnit的关键特征,并测试了Mappers, Reducers, MapReduce任务, 和 MapReduce Counters。然而,除此外,MRUnit还有很多其他特征。例如,如果一个程序使用分布式缓存(参见第6章),MRUnit可满足此要求。

由MRUnit支持的另一种非常有用的类型是MapReduce任务链的测试。在该方案中,任务被一个接一个地链接起来,其中一个任务的输出为链接中下一个任务的输入。PipeLineMapReduceDriver类是允许一系列这样的任务被测试的MRUnit驱动。 Hadoop库配备有两个类,IdentityMapper和IdentityReducer。当在一个单一的MapReduce程序环境中使用时,输出与输入相同。下面是通过IdentityMapper和IdentityReducer用一系列三个MapReduce任务如何使用PipeLineMapReduceDriver的示例(www.askmac.cn):

 

PipelineMapReduceDriver<Text, Text, Text, Text> driver = PipelineMapReduceDriver.newPipelineMapReduceDriver(); driver.withMapReduce(new IdentityMapper<Text, Text>(),

new IdentityReducer<Text, Text>())

.withMapReduce(new IdentityMapper<Text, Text>(),

new IdentityReducer<Text, Text>())

.withMapReduce(new IdentityMapper<Text, Text>(),

new IdentityReducer<Text, Text>())

.withInput(new Text("one"), new Text("two"))

.withOutput(new Text("one"), new Text("two")).runTest();

 

PipeLineMapReducerDriver的真正价值在于,它允许开发人员在内存中将MapReduce任务非常快地链接起来。对比那些先运行一个MapReduce任务,写入到磁盘,然后通过另一个任务从磁盘读取此输出并在磁盘上生成另一个输出的方式。后一种方法由于增加了I / O开销,速度非常慢,即使是很小的数据集。前一种方法允许对链接中的一系列MapReduce任务进行单元测试,而不用写入一个记录到磁盘,使得它非常适合于开发,单元测试,修复和缩短开发周期。

MRUnit网站下的MRUnit源代码有一个全面的测试套件及一些优秀的测试用例。你应该下载源代码,并在下列包中完成测试用例(取决于使用的是MR v1还是MR v2)。

  • MapReducev1: org.apache.hadoop.mrunit package
  • MapReducev2: org.apache.hadoop.mrunit.mapreduce package

基于分布式缓存的测试用例是一个名为TestDistributedCache的类。pipeline–based测试用例位于TestPipelineMapReduceDriver类中。PipelineMapReduceDriver前面所示的代码片段是从TestPipelineMapReduceDriver引入的。(www.askmac.cn)

 

MRUnit的局限性

一直以来,MapReduce程序很难测试。 MRUnit是一个神奇的库,使MapReduce的开发人员很容易就能够测试MapReduce任务。它有一个非常直观的界面和流畅的学习曲线。该库是不干扰的,不要求开发人员以任何特定库的形式修改Mapper或Reducer代码。MRUnit完全在内存中操作,使得测试执行地非常快。

但是,MRUnit也不是没有局限性。例如,它只能测试某些Hadoop组件,如Mappers, Reducers, MapReduce Counters, 和 Distributed Cache。注意Combiner测试和Reducer测试一样,因为它们共享相同的接口。

正如在第7章中讨论的,有时我们必须编写一个自定义的InputFormat或自定义OutputFormat,两个都不能用MRUnit进行测试。此外,MRUnit在单一的Mapper和Reducer前提下运作。依赖于MapReduce多线程或多进程性质的程序的任何方面,如Partitioner,都不能用MRUnit进行测试。

所有这些特征也需要进行测试。 Hadoop配备有LocalJobRunner,第3章中当我们在本地IDE环境下运行程序时,是默认使用它。我们下一节将讨论LocalJobRunner。(www.askmac.cn)

 

用LocalJobRunner测试

通过使用LocalJobRunner可以对MapReduce任务进行端到端的测试。我们先用这种方法来测试WordCount程序。

表8-13展示了使用LocalJobRunner的测试用例程序的框架,只用setUp() 和testXXX()方法填充。

8-13 WordCountLocalRunnerUnitTest类

/*Package and Import Declarations*/



public class WordCountLocalRunnerUnitTest extends TestCase { 
private Job job = null;

private Path inputPath = null; private Path outputPath = null;


@Before

public void setUp() throws Exception{ 
this.inputPath =new Path("src/main/resources/input/wordcount/"); 
this.outputPath =new Path("src/main/resources/output/wordcount/"); Configuration conf = new Configuration(); 
conf.set("mapred.job.tracker", "local"); 
conf.set("fs.default.name",      "file:////");

 
FileSystem fs = FileSystem.getLocal(conf); if (
fs.exists(outputPath)) {

fs.delete(outputPath, true);

}

this.job = this.configureJob(conf, inputPath, outputPath);

}



private Job configureJob(Configuration conf,Path inputPath, Path outputPath)

throws Exception {

/*

Configure the Job instance with input/output paths and input/output formats

*/

this.job = Job.getInstance(conf); 
job.setJarByClass(WordCountLocalRunnerUnitTest.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(IntWritable.class); 
job.setMapperClass(WordCountMapper.class); 
job.setReducerClass(WordCountReducer.class); 
job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class); 
FileInputFormat.setInputPaths(job, inputPath); 
FileOutputFormat.setOutputPath(job, outputPath);

}



private Map<String, Integer> getCountsByWord(File outDir)

throws Exception {

/*

Return a Map instance of counts by word for the job output

*/

}



@Test

public void testWordCount() throws Exception { 
boolean status = job.waitForCompletion(true);

assertTrue(job.isSuccessful());

File outFile = new File(outputPath.toUri() + "/"); 
Map<String, Integer> countsByWord = getCountsByWord(outFile);

assertEquals(new Integer(1),countsByWord.get("java")); 
assertEquals(new Integer(1),countsByWord.get("job")); 
assertEquals(new Integer(3),countsByWord.get("test")); 
assertEquals(new     Integer(1),countsByWord.get("word"));

}

}

 

下一节解释了表8-13的关键部件。

setUp( )方法

首先,我们配置任务,定义输入和输出路径。因为测试是用LocalJobRunner,我们不用HDFS进行该测试;用本地文件系统代替。具有下述特点的Configuration实例定义如下:(www.askmac.cn)

Configuration conf = new Configuration(); 
conf.set("mapred.job.tracker", "local"); 
conf.set("fs.default.name", "file:////");

第一行实例化Configuration实例,第二行将JobRunner配置为本地,最后一行将FileSystem配置为本地文件系统。

接下来的三行是删除输出路径,如已存在的(如已存在,则MapReduce任务不运行)。

FileSystem fs = FileSystem.getLocal(conf); 
if (fs.exists(outputPath)) {

fs.delete(outputPath, true);

}

 

值得注意的关键行是FileSystem.getLocal(conf)它检索本地文件系统。

最后,我们调用configureJob()配置Job实例。到目前为止,你已经熟悉了方法列表,因此我们这里不做详细描述。(www.askmac.cn)

我们运行任务,然后验证任务是否顺利运行。我们调用getCountsByWord(),它从输出目录读取输出文件,用词构造一个字数统计map,并验证字数统计。

表8-14展示了getCountsByWord()的源代码,表8-15展示了用于计算字数的样例文件。

 

8-14  getCountsByWord

private Map<String, Integer> getCountsByWord(File outDir)

throws Exception { Map<String, Integer> countsByWord =new HashMap<String, Integer>();
Collection<File> files = FileUtils.listFiles(outDir,

TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);



for (File f : files) {

if (f.getName().startsWith("part")

&& !f.getAbsolutePath().endsWith("crc")) { 
List<String> lines = FileUtils.readLines(f); 
System.out.println(lines.size());

for (String l : lines) {

String[] counts = l.split("\t"); 
countsByWord.put(counts[0],

Integer.parseInt(counts[1]));

}

break;

}

}

return countsByWord;

}

 

8-15 测试输入文件

test test word test java job

 

LocalJobRunner的局限性

使用LocalJobRunner的一个局限性是,它是单线程的,且只有一个Reducer可以启动。即使调用setNumReducers用一个大于1的值也是如此。没有Reducers的Map-only任务也是支持的。(www.askmac.cn)

单线程是它和MRUnit都有的限制:依赖多线程或多进程性质的任何代码都无法进行测试。这种代码的一个例子是自定义的Partitioner类。在第6章中我们开发了多种自定义Partitioners。

但是,由于LocalJobRunner在一个适当的Hadoop环境中运行,我们可以测试MapReduce的所有其他功能,如自定义的InputFormat和OutputFormat。现在,我们可以端到端测试MapReduce程序。

相比MRUnit的另一主要限制是对于LocalJobRunner,虽然所使用的文件系统是本地的,单元测试速度很慢,有真实的文件I / O。MRUnit没有任何开销; 所有的测试都发生在内存中。因此,基于MRUnit的单元测试运行相当快。测试套件应该更倾向于MRUnit测试用例胜于LocalJobRunner用例,且只有在必要时才使用后者。

 

用MiniMRCluster测试

到目前为止,我们已经用MRUnit单元测试了任务。使用LocalJobRunner来测试MapReduce的各个方面,但它也有局限性,我们希望有一种能够完全测试MapReduce任务的测试方法。(www.askmac.cn)

Hadoop API的一个名为Mini Hadoop集群的功能,可以在一个真实模拟完全分布式Hadoop环境中测试测试套件。Hadoop提供类,如MiniMRCluster,MiniYarnCluster和MiniDFSCluster以在内存中模拟一个真实的HDFS和MapReduce环境。在不同的JVM中启动不同的任务,最终我们拥有一个真实实际运行时的Hadoop环境的环境,使我们能够测试依赖于多个Reducers或Partitioners的功能。当然,此功能是有代价的:由于要启动多个JVM运行job任务,执行测试的速度更慢了,调试也更复杂了。

Mini-Clusters被广泛用于Hadoop的测试套件。作为开发人员,我们也应该好好利用。Hadoop提供一个抽象基类通过setup() 和teardown()方法抽象掉启动/停止HDFS和MapReduce框架的具体细节。此类为ClusterMapReduceTestCase,但此类的文档是非常有限的。本节将指导你通过设置开发环境,在Hadoop2.X中启用此功能。

 

设置开发环境

 

  1. 首先我们需要设置开发环境以使用MiniMRCluster。按以下步骤启用MiniMRCluster功能:

以下JAR文件供MiniMRCluster使用:

hadoop-mapreduce-client-jobclient-2.2.0-tests.jar

hadoop-common-2.2.0-tests.jar

hadoop-hdfs-2.2.0-tests.jar

hadoop-yarn-server-tests-2.2.0-tests.jar

hadoop-yarn-server-nodemanager-2.2.0.jar

hadoop-mapreduce-client-hs-2.2.0.jar
  1. 通过为前面每个JAR文件添加依赖项,在你的POM中启用这些JAR文件。(注意<classifier>标记的大量使用)(www.askmac.cn)

 

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>

hadoop-mapreduce-client-jobclient

</artifactId>

<version>2.2.0</version>

<classifier>tests</classifier>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>2.2.0</version>

<classifier>tests</classifier>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>2.2.0</version>

<classifier>tests</classifier>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-yarn-server-tests</artifactId>

<version>2.2.0</version>

<classifier>tests</classifier>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-yarn-server-nodemanager</artifactId>

<version>2.2.0</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-hs</artifactId>

<version>2.2.0</version>

</dependency>

 

  1. 确保下列依赖关系也被配置:
<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.2.0</version>

</dependency>

这一步完成了对开发环境中MiniMRCluster的设置。(www.askmac.cn)

 

MiniMRCluster示例

WordCountTestWithMiniCluster是包含使用MiniMRCluster的TestCase示例的源文件。

表8-16展示了该TestCase的关键部分。

 

8-16 WordCountTestWithMiniCluster

/*Package and import declarations*/



public class WordCountTestWithMiniCluster

extends ClusterMapReduceTestCase { 
public static final String INPUT_STRING ="test\ntest\nword\ntest\njava\njob\n"; Path inputPath = null;

Path outputPath = null; 
Configuration conf = null;

/*

*Read output file and construct a Map of

*word count by word

*/

protected Map<String, Integer> getCountsByWord() throws Exception {

Map<String, Integer> results = new HashMap<String, Integer>(); 
FileSystem fs = FileSystem.get(conf);

FileStatus[] fileStatus = fs.listStatus(outputPath); 
for (FileStatus file : fileStatus) {

String name = file.getPath().getName(); 
if (name.contains("part")) {

Path outFile = new Path(outputPath, name); 
BufferedReader reader = new BufferedReader(

new InputStreamReader(fs.open(outFile))); 
String line;

line = reader.readLine(); while (line != null) {

String[] vals = line.split("\t"); 
results.put(vals[0],

Integer.parseInt(vals[1])); 
line = reader.readLine();

}

reader.close();

}

}

return results;

}



private void createFile(FileSystem fs, Path filePath)

throws IOException { 
FSDataOutputStream out = fs.create(filePath); 
out.write(INPUT_STRING.getBytes());

out.close();

}



private void prepareEnvironment() throws Exception { 
this.conf = this.createJobConf();

this.inputPath = new Path("input/wordcount/"); 
this.outputPath = new Path("output/wordcount/");
FileSystem fs = FileSystem.get(conf); 
fs.delete(inputPath, true); 
fs.delete(outputPath, true); 
fs.mkdirs(inputPath);

this.createFile(fs, new Path(inputPath, "test.txt"));

}



public Job configureJob() throws Exception { 
Job job = Job.getInstance(conf);

job.setJarByClass(WordCountTestWithMiniCluster.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(IntWritable.class); 
job.setMapperClass(WordCountMapper.class); 
job.setReducerClass(WordCountReducer.class); 
job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class); 
FileInputFormat.setInputPaths(job, inputPath); 
FileOutputFormat.setOutputPath(job, outputPath); 
return job;

}



@Test

public void testWordCount() throws Exception { 
this.prepareEnvironment();

Job job = this.configureJob();

boolean status = job.waitForCompletion(true);
 assertTrue(job.isSuccessful());

Map<String, Integer> countsByWord = getCountsByWord();
 assertEquals(new Integer(1), countsByWord.get("java"));
assertEquals(new Integer(1), countsByWord.get("job")); 
assertEquals(new Integer(3), countsByWord.get("test"));
 assertEquals(new Integer(1), countsByWord.get("word"));

}

}

 

该程序的以下关键特征需要进一步的解释:(www.askmac.cn)

  • 该TestCase类扩展 ClusterMapReduceTestCase,它是Hadoop发行版自带的类,并实现了每个测试用例Hadoop集群的启动和关闭。
  • 我们创建TestCase需要HDFS目录和文件。prepareEnvironment()方法实现了这一功能。
  • 不要创建如单元测试中典型的setUp()方法。setUp()方法是由自定义单元测试扩展的ClusterMapReduceTestCase类提供的。在每次执行测试之前,调用超类setUp ( )方法。这种方法为执行测试,启动内存中的YARN集群。自定义prepareEnvironment()方法须在测试开始运行时调用。
  • 因为每个测试经由ClusterMapReduceTestCase的setUp()方法启动mini-YARN集群,通过tearDown()方法停止minicluster。拥有太多的测试方法不见得是好的。当使用MiniMRCluster时,使你所有的测试用一个统一的测试方法,以优化测试性能。

 

MiniMRCluster 的局限性

对MiniMRCluster的讨论结束了测试Hadoop工具包的话题。虽然MiniMRCluster使我们能够测试MapReduce任务的各个方面,但也有一些限制。请记住以下几点:

  • MiniMRCluster启动内存中的Hadoop集群,是一个对Map Reduce框架真正的多线程模拟。注意,使用这种方法的风险是用尽内存。确保你的测试数据容量不大。
  • ClusterMapReduceTestCase基类实现了为每个测试用例,启动和关闭MRCluster,但过程可能很慢。用单一方法捆绑测试用例,以确保测试快速运行,是一个好主意。传统的单元测试方法要求通过它自己的测试函数将每个测试用例隔离开。然而,使用MiniMRCluster的测试并不是单元测试;它们更像是集成测试。由于在内存中启动和停止Hadoop集群的开销较大,集成测试比执行典型的单元测试花费的时间更长。将他们捆绑起来确实是个好主意。

(www.askmac.cn)

通过访问网络资源测试MR Jobs

最后,你将编写访问网络资源的MapReduce任务。已经编写了写入开放源码的搜索引擎Apache Solr的MapReduce任务。还编写了写入Hadoop NoSQL数据库HBase(在第14章你将会看到)的任务。同样地,还必须写入Cassandra,这是另一种分布式NoSQL数据库。使用本章提到的任意一种方法对于这样的测试都是不够的,除非你愿意将你的MapReduce测试套件或外部服务写入这些网络资源,提供类似MiniMRCluster的类。这也大大增加运行测试用例所需的时间。

我已采取以下过程来测试依赖这些网络资源的任务:

  1. .用一个单独的服务类和网络资源进行通信。这个服务类可在Hadoop MapReduce以外的环境中单独测试。
  2. 用Mapper 或 Reducer的configure()方法初始化该服务类,并用close()方法关闭网络资源。
  3. 给MapReduce任务提供一个配置参数(使用–D <name> <value>)方法。这个参数是一个布尔值,表示是否应使用外部服务。对于测试套件,将这个参数设置为false。
  4. 在Mapper和 Reducer中读取该参数,并决定是用configure()方法初始化该服务还是用close()方法关闭该服务。
  1. 进行单元测试时,提供另一个代码路径,将针对网络资源的数据写入HDFS。
  1. 读取HDFS输出来测试针对网络资源的数据。(www.askmac.cn)

 

 

小结

本章涵盖了测试Hadoop任务的各个方面。首先,你了解到使用某种面向服务的编程风格,其中服务是从Mapper 或Reducer调用的,这是一个很好的实践。服务比Mapper 和Reducer更容易测试。

我们详细地讨论了MRUnit。MRUnit是一个Mapper 和 Reducer类测试的神奇的开放源码库。但是,它只允许有限数量的Hadoop类被测试。

自定义InputFormat,OutputFormat和RecordReader类的测试需要的不仅仅是MRUnit。LocalJobRunner使我们能够在一个本地开发环境内存中端到端地测试MapReduce任务。然而LocalJobRunner是一个单线程的进程; 那些依赖多线程的MapReduce的,如Reducer 或 Partitioner的实例,不能用LocalJobRunner进行测试。

最后,我们研究了MiniMRCluster,这是一个在本地开发环境内存中启动的真正的MRCluster。在单独的线程中创建多个任务,使它能够真实模拟现实世界中的分布式Hadoop集群。这种方法最终使我们能够测试MapReduce任务的各个方面。

 

 

Comment

*

沪ICP备14014813号-2

沪公网安备 31010802001379号