※hadoop-1.0.0です。
hadoop streamingで困ること。
作業データ(たとえばログ)を与えると、Hadoopはまず分割をする。
分割されたものはinput splits, あるいは単にsplitsと呼ばれる。
splitsのサイズはデフォルトで64MBytesだ。
Hadoop(JobTracker)は作業ノード(TaskTracker)にsplitsを割り当てる。
作業ノードはsplitsに対してmapタスクを行う。
なお、作業ノードの割り当てにあたっては、splitsの物理的な場所との近さも考慮される。
要するにsplitsの実データが置かれている作業ノードが選ばれる。
これは素晴らしい仕組みなのだが、困ることもある。
例えば以下のような場合。
- ファイルの冒頭にあるヘッダが処理に必要な場合。
→分割されると、ヘッダのないsplitsが出来てしまう。 - ログ上の2点間の時間差分を知りたい場合。
→2点の間で分割されると計算できない。
回避策
下記の通り。
How do I process files, one per map?
つまり、hadoopへの入力に、いきなり「ファイルの内容」を送り込むのではなく、「ファイルのリスト」を渡す。
ファイルのリストだから、いくら分割してもファイル自体は分割されない。
なるほど。
回避策とはいっても。
しかし、この場合mapper側に工夫が必要になる。
通常であれば、標準入力からファイルの中身がドバドバやってくる。
PythonだろうとRubyだろうとstdinをforループで読むだけ。
楽ちん。
ところがこの回避策だと、ファイルリストが入力されるわけだから、ファイルを開くことから始めなければならない。
そしてファイルを開くには、mapperスクリプト内で「hadoop dfs -cat <ファイル>」しないといけない
(ファイルがHDFSにあるとして)。
それも面倒だし、加えて、無駄な通信が発生する可能性がある点も懸念。
先述の通り、Hadoopは作業の振り分けにあたって、実データに近い作業ノードを選ぶ。
しかしファイル「リスト」を渡すということは、ファイル「リスト」と作業ノードの近さは考慮してくれるものの、リストに書かれたファイルと作業ノードの近さは考慮外になるということ。
つまり、作業ノードと実際のデータのある場所が一致するとは限らない。
一致しなければ、作業ノードはデータを別ノードからダウンロードしなければならない。
ノード間通信はコスト高なのでなるべく避けよう、というHadoopの思想にはそぐわない。
mapperの例
しかし選択肢はないので進める。
以下はpythonでファイルリストを受けた場合の処理例。
作業データはHDFS上にあるので、subprocessで「hadoop dfs -cat <ファイル>」する。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys os subprocess
for line in sys.stdin:
line.strip("\n")
filename = line
cat = subprocess.Popen(["hadoop","dfs","-cat",filename],stdout=subprocess.PIPE)
for logline in cat.stdout:
#処理
print line
大したことではないけれど、下記のように必ずテストしてからhdoopに投入すること。
$ cat filelist.txt | mapper.py | sort | reducer.py
hadoopは自前mapper, reducerが失敗しても、エラー内容を教えてはくれないので(たぶん)。
注意点
HDFS上にファイルを置いている場合、絶対パスで指定すること。
相対パスではダメだった。
また、HDFSでは「/user/hadoop/input」のように、「user」であって「usr」でないことに注意。
作業データがローカルディスクにあるなら、hadoop dfs -getとか。
しかしそうするとすべての作業ノードがローカルディスクにアクセスしてくることになる。
それは避けたいので、HDFS上に置いた方がよいでしょう。