Задачка из курса Распределённых Объектных технологий.
Скрипты mapper{Uno,Double,Triple}.py соответственно выделяют из текста слова, пары и триплеты. И подсчитывают число вхождений каждого из них.
Для универсальности, игнорируют знаки препинания (кроме конца предложения) и регистр.
Требуют работающего Hadoop с включенным HadoopStreaming
Подготовка Запуск
- В файле main.sh необходимо указать верные пути до JRE и HADOOP_HOME, а также указать пользователя с правами на запись в HDFS
- В папку books положить нужные тексты
- Скопировать тексты в HDFS: ./main.sh copy
- Для запуска сразу всех задач сделан конвейер ./run.sh
- Для отдельных задач используем ./main.sh run , где name = {Uno,Double,Triple}
- Для того, чтобы просмотреть содержимое каталога используем ./main.sh ls
- Содержимое файла ./main show
- Удалить (rm -r) -> ./main.sh delete
Запуск всех задач подряд
#!/bin/bash
./main.sh run Uno && ./main.sh run Double && ./main.sh run Triple || echo "Something shit happens."
main.sh
#!/bin/bash
export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-i386/jre/
export HADOOP_USER=hduser
case "$1" in
run)
sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/contrib/streaming/hadoop-streaming.jar \
-file mapper${2}.py -mapper mapper${2}.py -file reducer.py -reducer reducer.py \
-input books/* -output books-output-$2
;;
show)
sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -cat books-output-$2/part-00000
;;
delete)
sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -rmr books-output-$2
#sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -rmr books
;;
ls)
sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop fs -ls $2
;;
copy)
sudo -u ${HADOOP_USER} ${HADOOP_HOME}/bin/hadoop dfs -copyFromLocal books books
;;
*)
echo "Usage $0 {run|show|delete|copy|ls} " >&2
exit 3
;;
esac
mapperUno.py
#!/usr/bin/env python3
import sys
import re
def read_input(file):
for line in file:
# split the line into words
yield re.sub(r'\W', ' ', line.lower()).split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for triplets in data:
for triplet in triplets:
print('{}{}{}'.format(triplet, separator, 1))
if __name__ == "__main__":
main()
mapperDouble.py
#!/usr/bin/env python3
import sys
import re
def read_input(file):
for line in file:
# split the line into couples
unpunct_line = re.sub(r'\W', ' ', line.lower())
words = unpunct_line.split()
yield ["{} {}".format(words[i - 1],
words[i]) for i in range(1, len(words))]
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for triplets in data:
for triplet in triplets:
print('{}{}{}'.format(triplet, separator, 1))
if __name__ == "__main__":
main()
mapperTriple.py
#!/usr/bin/env python3
import sys
import re
def read_input(file):
for line in file:
# split the line into triplets
unpunct_line = re.sub(r'\W', ' ', line.lower())
words = unpunct_line.split()
yield ["{} {} {}".format(words[i - 2],
words[i - 1],
words[i]) for i in range(2, len(words))]
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for triplets in data:
for triplet in triplets:
print('{}{}{}'.format(triplet, separator, 1))
if __name__ == "__main__":
main()
reducer.py
#!/usr/bin/env python3
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
for current_unit, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_unit, count in group)
print("{}{}{}".format(current_unit, separator, total_count))
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()