在Docker容器中使用Hadoop执行Python MapReduce作业


在 Apple Silicon Mac 上的 Docker 容器中使用 Hadoop 执行 Python MapReduce 作业。

开始时需要的步骤是。

  1. 安装 Apple Silicon的docker桌面。
  2. 克隆这个Hadoop Docker容器的 repo。
  3. 为容器启用docker-compose构建步骤。
  4. 在Docker文件中安装python3(并将python3设为默认)。
  5. 使用docker-compose构建容器并启动你的Hadoop集群。
  6. 创建mapper.py和reducer.py脚本。
  7. 创建你的数据输入文件。
  8. 在namenodeservice中启动一个终端。
  9. 使用hadoop dfs将你的数据输入文件复制到HDFS。
  10. 使用mapred执行你的工作,并在输出文件夹中查看结果。

1、安装Docker Desktop
为Apple Silicon安装#Docker Desktop。考虑增加可用的内存量。

2、在Docker上克隆Hadoop
请看here的文章中的更多细节,但你可以直接去克隆 repo。

注意,这里显示$是为了提醒你,你是在docker容器之外执行这些命令。

$ git clone git@github.com:wxw-matt/docker-hadoop.git ~/docker-hadoop

3、启用容器的构建步骤
确保docker-compose build会触发各种组件的构建,例如在docker-compose.yml中的namenode。

注意,这里显示的是+,表示在现有文件中增加了几行。

namenode:
    image: wxwmatt/hadoop-namenode:2.1.1-hadoop3.3.1-java8
+   build:
+       context: namenode
+       dockerfile: Dockerfile
    container_name: namenode


针对以下重复此步骤:

  • namenode
  • datanode
  • resourcemanager
  • nodemanager1

4、将Python3安装为默认值
在每个组件的docker文件中,安装python3。你可以在一开始就这样做,因为它是Docker容器环境的高级依赖,与Hadoop没有具体关系。

FROM wxwmatt/hadoop-base:2.1.1-hadoop3.3.1-java8
+RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends python3.6
+RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
HEALTHCHECK CMD curl -f http://localhost:9870/ || exit 1

针对以下重复此步骤:

  • namenode
  • datanode
  • resourcemanager
  • nodemanager1

5、构建容器并启动你的Hadoop集群
一旦你构建了其中一个容器,基本镜像就应该被缓存起来。你可能需要等待所有的容器都运行起来,以便它们可以看到彼此。

$ docker-compose up --build -d

6、编写你的映射器和还原器脚本
在 /jobs/jars 文件夹中,创建 mapper.py。

# -*-coding:utf-8 -*
import sys
import string
import collections
wordCount = collections.Counter()
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # ignore case to avoid duplicates
    lower = line.lower()
    # replace punctuation with spaces, except for hyphens
    punc = string.punctuation
    punc_nohyphens = punc.replace('-', '')
    justText = lower.translate(
        str.maketrans(' ', ' ', punc_nohyphens))
    # allow hyphenated words, but watch out for
    # double-hyphens which might be a stand-in for em-dash
    emdashRemoved = justText.replace('--', ' ')
    # split the line into words
    words = emdashRemoved.split()
    # now we can remove leading and trailing curly quotes 
    # and hyphens, but leave apostrophes in the middle 
    # and allow hyphenated words.
    # note that we lose possessive ending in s, e.g Angus'
    punc_curly = '”’‘“-'
    for word in words:
        word = word.strip(punc_curly)
        if len(word) > 0:
            wordCount[word] += 1
# write the results, tab delimited
# which will be the input for reducer.py
for word, count in wordCount.most_common():
    print ('%s\t%s' % (word, count))

reducer.py代码:

# -*-coding:utf-8 -*
import sys
import collections
import statistics
wordCount = collections.Counter()
wordLengthCount = collections.Counter()
wordLengthList = []
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # ensure that it's a string followed by an int
    try:
        word = str(word)
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    length = len(word)
    wordCount[word] += count
    wordLengthCount[length] += count
    # expand the counted words to a full list
    # to enable median to be calculated later
    lengthRepeated = [length] * count
    wordLengthList.extend(lengthRepeated)
for commonWord, commonWordCount in wordCount.most_common(100):
    print ("{}\t{}".format(commonWord, commonWordCount))
print (
"\n")
for commonLength, lengthCount in wordLengthCount.most_common():
    print (
"{}\t{}".format(commonLength, commonLengthCount))
print (
"\n")
print(
"Number of words: {}".format(
    len(wordLengthList)))
print(
"Median word length: {:.0f}".format(
    statistics.median(wordLengthList)))

这些文件将在Docker容器内的/app/jars文件夹中可用。

7、创建你的数据输入文件
在./jobs/data/input文件夹中创建你的文本输入文件(在Docker容器之外)。

接着,我们从Docker容器内将文件复制到HDFS中,文件被映射到/app/data/input。

8、在名称节点内启动一个终端
你可以点击Docker桌面内的图标,也可以从命令行运行它。
$ docker exec -it namenode bash

9、将你的数据输入文件复制到HDFS中

> hadoop fs -mkdir -p /test/
> hadoop fs -copyFromLocal -f /app/data/input /test/。

不要忘记清理任何过去的作业。

> hadoop fs -rm -r -f /test/output

10、执行你的工作并查看结果
重要的一点是要避免使用Hadoop流媒体jar,而要使用mapred。

> cd /app/jars
> mapred streaming \
-files mapper.py,reducer.py \
-input /test/input \
-output /test/output \
-mapper “/usr/bin/python mapper.py” \
-reducer “/usr/bin/python reducer.py”

输出需要为通配符加上引号:

> hadoop fs cat “/test/output/*”
  • Matt Wang扩展了 BigDataEurope在 Hadoop for Docker 上的工作。
  • 来自Grégory Lang的[url=https://stackoverflow.com/questions/52076577/error-java-lang-runtimeexception-pipemapred-waitoutputthreads-subprocess-fa/61624913#61624913]建议[/url]是对 mapred 的重大启示。
  • 感谢Bert Freeba的鼓励和鼓舞人心的建议!