HadoopはJavaで作られている。
だからHadoopに何か操作をさせたい場合には、通常、Javaで記述する必要がある。
しかしHadoopにはHadoop Streamingという仕組みがあり、早い話UNIXのStandard Stream要するに標準入出力を扱うことができる。

すなわち、UNIXの標準入出力の流儀に則ってさえいれば、お好きな言語で操作ができる。
Javaがまったく合わない私としては、Hadoop Streamはとてもありがたい。
これがなければHadoopに手を付ける気にはならなかった。

Hadoop Streamに必要なもの。

mapperとreducerを、好きな言語で書くだけ。
べつにmapperだけでもよいけど。
私はPython。

Hadoop streamにおけるmapperとreducerの概要。

mapperは何らかの入力を得て、キーと値(key, value)を出力する。
reducerはmapperからのkey, valueを受けて、keyごとにvalueを処理する。

なお、mapperの出力がreducerに渡されるとき、Hadoopがkeyごとにソートしてくれる。
この点はreducerの処理を簡単にする。詳細は後述。

試しにやってみること。

LAN向けのApacheのアクセス状況をカウントしてみる。
IPアドレスごとのアクセス回数だ。
ログは以下のようなもの。
アクセス元はすべてIPアドレスで記録されている。

192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/js/common/Editor/Iframe.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0"
192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/js/common/Editor/Textarea.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0"
192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/mt.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0"

処理の流れ

1.mapperは、アクセスログからIPアドレスを抜き出す。
そして「<IPアドレス><タブ>1」を出力する。
これは、たとえば「192.168.1.1」から「1」回アクセスがあったよ、という意味。

2.hadoopがIPアドレスをキーにソート。

3.reducerは、IPアドレスごとに回数をカウントし、
「<IPアドレス><タブ><集計回数>」を出力する。

mapper.py

IPアドレスは、Apacheログにおいて、スペースを区切りにした第一フィールドに記載される。
だから一行ずつログを読んで、行頭のIPアドレスを抜き出し、その都度「<IPアドレス><タブ>1」を出力する。
「if “newsyslog” not in line:」は、システムメッセージ行を読み飛ばすため。

#!/usr/bin/env python

import sys

for line in sys.stdin:
        if "newsyslog" not in line:
                fields = line.strip().split()
                print '%s\t%s' % (fields[0],1)

実行権限も忘れずにつける。

$ chmod a+x ./mapper.py

実験。意図したとおり動いていますね。

$ cat ../input/httpd-access.log |./mapper.py 

192.168.100.107 1
192.168.100.106 1
192.168.100.110 1
192.168.100.107 1
192.168.100.107 1
192.168.100.107 1

reducer.py

mapper.pyからの出力は、ソートされてreducer.pyに入力される。
上記のmapper.py出力例は、以下のようにソートされる。

192.168.100.106 1
192.168.100.107 1
192.168.100.107 1
192.168.100.107 1
192.168.100.107 1
192.168.100.110 1

だからreducerとしては、第一フィールドを上から読んでいって、keyが変化したら、そこまでのカウント数を出力する。
そしてそのkeyの事は、さっぱり忘れて次のkeyのカウントに移ることができる。
もしソートがなされていないならば、入力の終わりまですべてのkeyを保持しなければならない。Hadoopに感謝である。

#!/usr/bin/env python

import sys

(last_key, count) = (None, 0)

for line in sys.stdin:
        (key, val) = line.strip().split("\t")

        if last_key and last_key != key:
                print "%s\t%s" % (last_key, count)
                (last_key, count) = (key,int(val))
        else:
                last_key = key
                count += int(val)

if last_key:
        print "%s\t%s" % (last_key, count)

実行権限を付ける。

$ chmod a+x reducer.py

実験。間にsortを入れること。
問題なし。

$ cat ../input/httpd-access.log |./mapper.py |sort|./reducer.py 

127.0.0.1       567
192.168.100.106 327
192.168.100.107 671
192.168.100.109 2
192.168.100.150 193

Hadoopで動かしてみよう。

まずカウント対象となるログをHDFSにコピーする。

[hadoop@isis ~]$ hadoop dfs -put apachelog apachelog
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2013-07-28 12:57 /user/hadoop/apachelog
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -ls apachelog
Found 1 items
-rw-r--r--   1 hadoop supergroup     999290 2013-07-28 12:57 /user/hadoop/apachelog/httpd-access.log
[hadoop@isis ~]$

実行。

hadoopにhadoop-streaming-1.0.0.jarを与え、input、outputのほかに、mapperとreducerも指定する。
-mapper -reducerとしてローカルファイルシステムでのパスを与える。
同時に、-fileでそれぞれのスクリプトを指定すると、スクリプトファイルをリモートのノードへ送ってくれる。

コマンドはすごく長くなる。
エスケープシーケンスを使って適宜改行し、見やすくしてタイプミスを防ぐ。

hadoop  jar \
/usr/local/share/hadoop/contrib/streaming/hadoop-streaming-1.0.0.jar \
    -input apachelog \
    -output apachelog.out \
    -mapper /home/hadoop/sandbox/mapper/mapper.py \
    -reducer /home/hadoop/sandbox/mapper/reducer.py \
    -file /home/hadoop/sandbox/mapper/mapper.py \
    -file /home/hadoop/sandbox/mapper/reducer.py

実際のログ

[hadoop@isis ~]$ hadoop  jar \
>    -output apachelog.out \
>    -mapper /home/hadoop/sandbox/mapper/mapper.py \
> /usr/local/share/hadoop/contrib/streaming/hadoop-streaming-1.0.0.jar \
>    -file /home/hadoop/sandbox/mapper/mapper.py \
>    -file /home/hadoop/sandbox/mapper/reducer.py \
>     -input apachelog \
>     -output apachelog.out \
>     -mapper /home/hadoop/sandbox/mapper/mapper.py \
>     -reducer /home/hadoop/sandbox/mapper/reducer.py \
>     -file /home/hadoop/sandbox/mapper/mapper.py \
>     -file /home/hadoop/sandbox/mapper/reducer.py

packageJobJar: [/home/hadoop/sandbox/mapper/mapper.py, /home/hadoop/sandbox/mapper/reducer.py, /tmp/hadoop-hadoop/hadoop-unjar1190814954490199586/] [] /tmp/streamjob8960127756596123730.jar tmpDir=null
13/07/28 13:30:15 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/28 13:30:43 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
13/07/28 13:30:43 INFO streaming.StreamJob: Running job: job_201307281252_0001
13/07/28 13:30:43 INFO streaming.StreamJob: To kill this job, run:
13/07/28 13:30:43 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201307281252_0001
13/07/28 13:30:43 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201307281252_0001
13/07/28 13:30:46 INFO streaming.StreamJob:  map 0%  reduce 0%
13/07/28 13:39:14 INFO streaming.StreamJob:  map 100%  reduce 0%
13/07/28 13:40:27 INFO streaming.StreamJob:  map 100%  reduce 33%
13/07/28 13:40:33 INFO streaming.StreamJob:  map 100%  reduce 67%
13/07/28 13:40:49 INFO streaming.StreamJob:  map 100%  reduce 100%
13/07/28 13:42:39 INFO streaming.StreamJob: Job complete: job_201307281252_0001
13/07/28 13:42:41 INFO streaming.StreamJob: Output: apachelog.out

結果の確認

hadoop dfs -catなどで。

[hadoop@isis ~]$ hadoop dfs -ls
Found 2 items
drwxr-xr-x   - hadoop supergroup          0 2013-07-28 12:57 /user/hadoop/apachelog
drwxr-xr-x   - hadoop supergroup          0 2013-07-28 13:42 /user/hadoop/apachelog.out
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -ls apachelog.out

Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2013-07-28 13:42 /user/hadoop/apachelog.out/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2013-07-28 13:30 /user/hadoop/apachelog.out/_logs
-rw-r--r--   1 hadoop supergroup       1438 2013-07-28 13:40 /user/hadoop/apachelog.out/part-00000
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -cat apachelog.out/part-00000

127.0.0.1       567
192.168.100.106 327
192.168.100.107 671
192.168.100.109 2
192.168.100.150 193

[hadoop@isis ~]$

以上