获取Hadoop

在~/env/下下载Hadoop 2.8.1。

wget http://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-2.8.1/hadoop-2.8.1.tar.gz
tar -xf hadoop-2.8.1.tar.gz
ln -s hadoop-2.8.1 hadoop
cd hadoop

可以看见Hadoop的文件目录。

HDFS配置

OS Ubuntu 16.04.2 LTS (GNU/Linux 4.4.0-85-generic x86_64)

Hadoop Hadoop 2.8.1

起初我希望在服务器上配置运行Hadoop,同时另外一个同学也在服务器上配置运行Hadoop,为了解决端口占用问题,我对core-site.xml以及hdfs-site.xml进行了如下的配置。

core-site.xml

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://0.0.0.0:9001</value>
  </property>
</configuration>

hdfs-site.xml

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>dfs.datanode.address</name>
    <value>0.0.0.0:50011</value>
  </property>
  <property>
    <name>dfs.http.address</name>
    <value>0.0.0.0:50071</value>
  </property>
  <property>
    <name>dfs.datanode.ipc.address</name>
    <value>0.0.0.0:50021</value>
  </property>
  <property>
    <name>dfs.datanode.http.address</name>
    <value>0.0.0.0:50076</value>
  </property>
  <property>
    <name>dfs.secondary.http.address</name>
    <value>0.0.0.0:50091</value>
  </property>
</configuration>

但是后期运行yarn的时候仍然会因为某些问题而导致进程被kill掉,所以在本机重新配置了hadoop。

本机配置使用了如下的配置。

OS macOS High Sierra 10.13

Hadoop Hadoop 2.8.1

由于mac的JAVA_HOME环境变量可以通过执行/usr/libexec/java_home来确认,所以在~/.bash_profile配置了如下环境变量。

# Setting PATH for Hadoop
export JAVA_HOME=`/usr/libexec/java_home`
export HADOOP_HOME="/Users/chenrz925/env/hadoop"
export PATH="$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH"
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

在hadoop-env.sh中添加了如下配置。

export JAVA_HOME=${JAVA_HOME}
export HADOOP_HOME=/Users/chenrz925/env/hadoop

core-site.xml如下。

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

hdfs-site.xml如下。

<configuration>
  <property>
  <name>dfs.replication</name>
  <value>1</value>
  </property>
</configuration>

接下来,配置完成后访问http://localhost:50070,可以见到如下的页面。

Yarn配置

修改core-site.xml。

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

修改yarn-site.xml

<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
</configuration>

执行start-dfs.sh,访问http://localhost:8088/,可以见到如下的页面。

这个时候Yarn也配置完成了。

测试配置情况

先运行一下官方的测试程序。

hdfs dfs -put etc/hadoop input
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar grep input output 'dfs[a-z.]+'
hdfs dfs -cat output/*

运行结果如下,可以看出我们已经把单节点的Hadoop环境配置好了。

执行WordCount

Hadoop提供了一个官方测试程序,代码如下。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

将WordCount.java编译为jar包wc.jar。

hadoop com.sun.tools.javac.Main WordCount.java
jar cf wc.jar WordCount*.class

在执行WordCount之前先爬取一定数量的文本资料到HDFS中,爬虫代码如下。

import requests
import bs4
import time
from urllib import parse
from hdfs.client import Client

client = Client('http://localhost:50070', root='/user/chenrz925')

def random_article():
    response: requests.Response = requests.request('GET', 'https://en.wikipedia.org/wiki/Special:Random', headers={
        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'})
    url: str = parse.unquote(response.url)
    document = bs4.BeautifulSoup(response.content, 'html.parser')
    filename = url.split('/')[-1].replace(':', '').replace(',', '').replace('.', '') + '.txt'
    content = document.select_one('#mw-content-text').get_text()
    print(filename)
    with client.write(hdfs_path=('WordCount/input/' + filename), encoding='utf-8', permission='777') as writer:
        writer.write(content)


for times in range(0, 2000):
    random_article()
    time.sleep(0.01)

爬取一定数量的wiki页面后,执行WordCount。

hadoop jar wc.jar WordCount WordCount/input WordCount/output

运行完毕后Hadoop会输出如下的状态。

执行

hdfs dfs -cat WordCount/output/* | less

查看输出的结果,部分输出的结果如下。