环境配置

在IntelliJ IDEA 2017.2.5下新建Gradle项目,在项目中配置build.gradle,添加如下的依赖项。

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.9.1'
    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.9.1'
    compile group: 'com.google.guava', name: 'guava', version: '23.3-jre'
    compile group: 'commons-logging', name: 'commons-logging', version: '1.2'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.8.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-common', version: '2.8.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.8.1'
    compile group: 'org.json', name: 'json', version: '20171018'
}

将WorldCount.java添加到项目中。

获取数据

使用如下脚本获取大量英文文本数据。

import json
import requests
import threadpool
import time
import os


def dumpArticle(page: int):
    response = requests.get(url='https://search.api.cnn.io/content', params={'size': 20, 'q': 'international', 'from': (page - 1) * 20, 'page': page, 'type': 'article'})
    raw_text = response.text
    result = json.loads(raw_text)['result']
    articles = [{'title': page['headline'].replace(' ', '').replace(',', '').replace('/', '').replace('.', '').replace('?', '').replace('\'', '').replace('!', '').replace(':', ''), 'content': page['body']} for page in result]
    for article in articles:
        with open('articles/' + article['title'] + '.txt', 'w') as text_file:
            text_file.write(article['content'])
    print('Dumped {0}'.format(page))


if __name__ == '__main__':
    pool = threadpool.ThreadPool(0x20, 0x40)
    if not os.path.exists('articles'):
        os.mkdir('articles')
    page_index = 1
    while True:
        if page_index % 0x20 == 0:
            time.sleep(1)
            pool.wait()
        request = threadpool.WorkRequest(dumpArticle, [page_index])
        pool.putRequest(request)
        page_index += 1

获取英文文本后放入resources/articles中,将WordCount的命令行参数设置为

resources/articles resources/output

执行并生成正确结果,环境配置完成。

编写程序

先编写Mapper将所有文本文件拆分为<Key, Value>形式,

package io.github.chenrz925.InvertedIndex;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by chenrz925 on 02/11/2017.
 * 9:13 AM Project: hadoop
 *
 * @author chenrz925
 */
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, IndexWritable> {
    public enum CounterEnum {WORD_COUNT}

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\r\n\f?!,.\";:()");
        long wordNumber = 0;
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            Text word = new Text(token);
            if (!token.equals("")) {
                IndexWritable index = new IndexWritable(key.get(), wordNumber, context.getInputSplit().toString());
                context.write(word, index);
                wordNumber++;
            }
            context.getCounter(CounterEnum.WORD_COUNT).increment(1);
        }
    }
}

其中,自己实现了一个IndexWritable,用来记录单词的索引,

package io.github.chenrz925.InvertedIndex;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

/**
 * Created by chenrz925 on 02/11/2017.
 * 9:14 AM Project: hadoop
 *
 * @author chenrz925
 */
public class IndexWritable implements Writable, WritableComparable<IndexWritable> {
    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public Long getLineOffset() {
        return lineOffset;
    }

    public void setLineOffset(Long lineOffset) {
        this.lineOffset = lineOffset;
    }

    public Long getWordNumber() {
        return wordNumber;
    }

    public void setWordNumber(Long wordNumber) {
        this.wordNumber = wordNumber;
    }

    private String fileName;
    private Long lineOffset;
    private Long wordNumber;

    public IndexWritable() {
        setFileName("");
        setLineOffset(0L);
        setWordNumber(0L);
    }

    public IndexWritable(long lineOffset, long wordNumber, String fileName) {
        setFileName(fileName);
        setLineOffset(lineOffset);
        setWordNumber(wordNumber);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(getLineOffset());
        out.writeLong(getWordNumber());
        out.writeUTF(getFileName());
        //System.out.println(this);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        setLineOffset(in.readLong());
        setWordNumber(in.readLong());
        setFileName(in.readUTF());
        //System.out.println(this);
    }

    @Override
    public int compareTo(IndexWritable o) {
        if (!getLineOffset().equals(o.getLineOffset()))
            return getLineOffset().compareTo(o.getLineOffset());
        else if (!getWordNumber().equals(o.getWordNumber()))
            return getWordNumber().compareTo(o.getWordNumber());
        else
            return getFileName().compareTo(o.getFileName());
    }

    @Override
    public String toString() {
        return "(" + getLineOffset() + "," + getWordNumber() + "," + getFileName() + ")";
    }
}

然后实现了Reducer用来规约所有键值对到IndexListWritable中,Reducer实现如下,

package io.github.chenrz925.InvertedIndex;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

/**
 * Created by chenrz925 on 02/11/2017.
 * 10:04 AM Project: hadoop
 *
 * @author chenrz925
 */
public class InvertedIndexReducer extends Reducer<Text, IndexWritable, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<IndexWritable> values, Context context) throws IOException, InterruptedException {
        IndexListWritable indexList = new IndexListWritable();
        ArrayList<IndexWritable> list = new ArrayList<>();
        StringBuilder builder = new StringBuilder();
        builder.append("[");
        boolean isStart = true;
        for (IndexWritable value: values) {
            if (!isStart) {
                builder.append(",");
            } else {
                isStart = false;
            }
            builder.append(value.toString());
        }
        builder.append("]");
        indexList.addAll(list);
        context.write(key, new Text(builder.toString()));
    }
}

IndexListWritable如下,

package io.github.chenrz925.InvertedIndex;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;

/**
 * Created by chenrz925 on 02/11/2017.
 * 9:33 AM Project: hadoop
 *
 * @author chenrz925
 */
public class IndexListWritable extends ArrayList<IndexWritable> implements Writable {
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.size());
        for (IndexWritable indexWritable: this) {
            indexWritable.write(out);
        }
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        clear();
        long limit = in.readLong();
        for (int index = 0; index < limit; ++index) {
            IndexWritable indexWritable = new IndexWritable();
            indexWritable.readFields(in);
            this.add(index, indexWritable);
        }
        assert limit == size();
    }
}

然后设置Driver类,编写如下,

package io.github.chenrz925.InvertedIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
 * Created by chenrz925 on 02/11/2017.
 * 9:13 AM Project: hadoop
 *
 * @author chenrz925
 */
public class InvertedIndex {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionsParser.getRemainingArgs();
        if (remainingArgs.length != 2) {
            System.err.println("Usage: <input> <output>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "InvertedIndex");
        job.setMapperClass(InvertedIndexMapper.class);
        //job.setCombinerClass(InvertedIndexReducer.class);
        job.setReducerClass(InvertedIndexReducer.class);
        job.setMapOutputValueClass(IndexWritable.class);
        job.setMapOutputKeyClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        job.setJarByClass(InvertedIndex.class);
        job.waitForCompletion(true);
    }
}

设置命令行参数,使用刚刚使用过的命令行参数。

resources/articles resources/output

得到了所有词的索引,部分结果如下

按文件、行偏移量、单词数排序,由于数据大部分为单行文本所以显示为0。