※hadoop-1.0.0です。

hadoop streamingで困ること。

作業データ(たとえばログ)を与えると、Hadoopはまず分割をする。
分割されたものはinput splits, あるいは単にsplitsと呼ばれる。
splitsのサイズはデフォルトで64MBytesだ。
Hadoop(JobTracker)は作業ノード(TaskTracker)にsplitsを割り当てる。
作業ノードはsplitsに対してmapタスクを行う。

なお、作業ノードの割り当てにあたっては、splitsの物理的な場所との近さも考慮される。
要するにsplitsの実データが置かれている作業ノードが選ばれる。

これは素晴らしい仕組みなのだが、困ることもある。
例えば以下のような場合。

  • ファイルの冒頭にあるヘッダが処理に必要な場合。
    →分割されると、ヘッダのないsplitsが出来てしまう。
  • ログ上の2点間の時間差分を知りたい場合。
    →2点の間で分割されると計算できない。

回避策

下記の通り。

How do I process files, one per map?

つまり、hadoopへの入力に、いきなり「ファイルの内容」を送り込むのではなく、「ファイルのリスト」を渡す。
ファイルのリストだから、いくら分割してもファイル自体は分割されない。
なるほど。

回避策とはいっても。

しかし、この場合mapper側に工夫が必要になる。

通常であれば、標準入力からファイルの中身がドバドバやってくる。
PythonだろうとRubyだろうとstdinをforループで読むだけ。
楽ちん。

ところがこの回避策だと、ファイルリストが入力されるわけだから、ファイルを開くことから始めなければならない。
そしてファイルを開くには、mapperスクリプト内で「hadoop dfs -cat <ファイル>」しないといけない
(ファイルがHDFSにあるとして)。

それも面倒だし、加えて、無駄な通信が発生する可能性がある点も懸念。

先述の通り、Hadoopは作業の振り分けにあたって、実データに近い作業ノードを選ぶ。
しかしファイル「リスト」を渡すということは、ファイル「リスト」と作業ノードの近さは考慮してくれるものの、リストに書かれたファイルと作業ノードの近さは考慮外になるということ。

つまり、作業ノードと実際のデータのある場所が一致するとは限らない。
一致しなければ、作業ノードはデータを別ノードからダウンロードしなければならない。

ノード間通信はコスト高なのでなるべく避けよう、というHadoopの思想にはそぐわない。

mapperの例

しかし選択肢はないので進める。
以下はpythonでファイルリストを受けた場合の処理例。
作業データはHDFS上にあるので、subprocessで「hadoop dfs -cat <ファイル>」する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys os subprocess

for line in sys.stdin:

        line.strip("\n")
        filename = line

        cat = subprocess.Popen(["hadoop","dfs","-cat",filename],stdout=subprocess.PIPE)

        for logline in cat.stdout:
		#処理
		print line

大したことではないけれど、下記のように必ずテストしてからhdoopに投入すること。

$ cat filelist.txt | mapper.py | sort | reducer.py

hadoopは自前mapper, reducerが失敗しても、エラー内容を教えてはくれないので(たぶん)。

注意点

HDFS上にファイルを置いている場合、絶対パスで指定すること。
相対パスではダメだった。
また、HDFSでは「/user/hadoop/input」のように、「user」であって「usr」でないことに注意。

作業データがローカルディスクにあるなら、hadoop dfs -getとか。
しかしそうするとすべての作業ノードがローカルディスクにアクセスしてくることになる。
それは避けたいので、HDFS上に置いた方がよいでしょう。