Hadoop MapReduce

[ ROT ]

13 Sep 2013

Задачка из курса Распределённых Объектных технологий.

Скрипты mapper{Uno,Double,Triple}.py соответственно выделяют из текста слова, пары и триплеты. И подсчитывают число вхождений каждого из них.

Для универсальности, игнорируют знаки препинания (кроме конца предложения) и регистр.

Требуют работающего Hadoop с включенным HadoopStreaming

Подготовка Запуск

  • В файле main.sh необходимо указать верные пути до JRE и HADOOP_HOME, а также указать пользователя с правами на запись в HDFS
  • В папку books положить нужные тексты
  • Скопировать тексты в HDFS: ./main.sh copy
  • Для запуска сразу всех задач сделан конвейер ./run.sh
  • Для отдельных задач используем ./main.sh run , где name = {Uno,Double,Triple}
  • Для того, чтобы просмотреть содержимое каталога используем ./main.sh ls
  • Содержимое файла ./main show
  • Удалить (rm -r) -> ./main.sh delete

Запуск всех задач подряд

#!/bin/bash

./main.sh run Uno && ./main.sh run Double && ./main.sh run Triple || echo "Something shit happens."

main.sh

#!/bin/bash
export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-i386/jre/
export HADOOP_USER=hduser

case "$1" in
    run)
        sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/contrib/streaming/hadoop-streaming.jar \
         -file mapper${2}.py -mapper mapper${2}.py -file reducer.py -reducer reducer.py \
         -input books/* -output books-output-$2
        ;;
    show)
        sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -cat books-output-$2/part-00000
        ;;
    delete)
        sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -rmr books-output-$2
        #sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -rmr books
        ;;
    ls)
        sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -ls $2
        ;;
    copy)
        sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop dfs -copyFromLocal books books
        ;;
    *)
        echo "Usage $0 {run|show|delete|copy|ls} " >&2
        exit 3
        ;;
esac

mapperUno.py

#!/usr/bin/env python3

import sys
import re

def read_input(file):
    for line in file:
        # split the line into words
        yield re.sub(r'\W', ' ', line.lower()).split()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for triplets in data:
        for triplet in triplets:
            print('{}{}{}'.format(triplet, separator, 1))

if __name__ == "__main__":
    main()

mapperDouble.py

#!/usr/bin/env python3

import sys
import re

def read_input(file):
    for line in file:
        # split the line into couples
        unpunct_line = re.sub(r'\W', ' ', line.lower())
        words = unpunct_line.split()
        yield ["{} {}".format(words[i - 1],
                              words[i]) for i in range(1, len(words))]

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for triplets in data:
        for triplet in triplets:
            print('{}{}{}'.format(triplet, separator, 1))

if __name__ == "__main__":
    main()

mapperTriple.py

#!/usr/bin/env python3

import sys
import re

def read_input(file):
    for line in file:
        # split the line into triplets
        unpunct_line = re.sub(r'\W', ' ', line.lower())
        words = unpunct_line.split()
        yield ["{} {} {}".format(words[i - 2],
                                 words[i - 1],
                                 words[i]) for i in range(2, len(words))]

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for triplets in data:
        for triplet in triplets:
            print('{}{}{}'.format(triplet, separator, 1))

if __name__ == "__main__":
    main()

reducer.py

#!/usr/bin/env python3

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_unit, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_unit, count in group)
            print("{}{}{}".format(current_unit, separator, total_count))
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

GitHub