HadoopはJavaで作られている。
だからHadoopに何か操作をさせたい場合には、通常、Javaで記述する必要がある。
しかしHadoopにはHadoop Streamingという仕組みがあり、早い話UNIXのStandard Stream要するに標準入出力を扱うことができる。
すなわち、UNIXの標準入出力の流儀に則ってさえいれば、お好きな言語で操作ができる。
Javaがまったく合わない私としては、Hadoop Streamはとてもありがたい。
これがなければHadoopに手を付ける気にはならなかった。
Hadoop Streamに必要なもの。
mapperとreducerを、好きな言語で書くだけ。
べつにmapperだけでもよいけど。
私はPython。
Hadoop streamにおけるmapperとreducerの概要。
mapperは何らかの入力を得て、キーと値(key, value)を出力する。
reducerはmapperからのkey, valueを受けて、keyごとにvalueを処理する。
なお、mapperの出力がreducerに渡されるとき、Hadoopがkeyごとにソートしてくれる。
この点はreducerの処理を簡単にする。詳細は後述。
試しにやってみること。
LAN向けのApacheのアクセス状況をカウントしてみる。
IPアドレスごとのアクセス回数だ。
ログは以下のようなもの。
アクセス元はすべてIPアドレスで記録されている。
192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/js/common/Editor/Iframe.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0"
192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/js/common/Editor/Textarea.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0"
192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/mt.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0"
処理の流れ
1.mapperは、アクセスログからIPアドレスを抜き出す。
そして「<IPアドレス><タブ>1」を出力する。
これは、たとえば「192.168.1.1」から「1」回アクセスがあったよ、という意味。
2.hadoopがIPアドレスをキーにソート。
3.reducerは、IPアドレスごとに回数をカウントし、
「<IPアドレス><タブ><集計回数>」を出力する。
mapper.py
IPアドレスは、Apacheログにおいて、スペースを区切りにした第一フィールドに記載される。
だから一行ずつログを読んで、行頭のIPアドレスを抜き出し、その都度「<IPアドレス><タブ>1」を出力する。
「if “newsyslog” not in line:」は、システムメッセージ行を読み飛ばすため。
#!/usr/bin/env python
import sys
for line in sys.stdin:
if "newsyslog" not in line:
fields = line.strip().split()
print '%s\t%s' % (fields[0],1)
実行権限も忘れずにつける。
$ chmod a+x ./mapper.py
実験。意図したとおり動いていますね。
$ cat ../input/httpd-access.log |./mapper.py
192.168.100.107 1
192.168.100.106 1
192.168.100.110 1
192.168.100.107 1
192.168.100.107 1
192.168.100.107 1
reducer.py
mapper.pyからの出力は、ソートされてreducer.pyに入力される。
上記のmapper.py出力例は、以下のようにソートされる。
192.168.100.106 1
192.168.100.107 1
192.168.100.107 1
192.168.100.107 1
192.168.100.107 1
192.168.100.110 1
だからreducerとしては、第一フィールドを上から読んでいって、keyが変化したら、そこまでのカウント数を出力する。
そしてそのkeyの事は、さっぱり忘れて次のkeyのカウントに移ることができる。
もしソートがなされていないならば、入力の終わりまですべてのkeyを保持しなければならない。Hadoopに感謝である。
#!/usr/bin/env python
import sys
(last_key, count) = (None, 0)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, count)
(last_key, count) = (key,int(val))
else:
last_key = key
count += int(val)
if last_key:
print "%s\t%s" % (last_key, count)
実行権限を付ける。
$ chmod a+x reducer.py
実験。間にsortを入れること。
問題なし。
$ cat ../input/httpd-access.log |./mapper.py |sort|./reducer.py
127.0.0.1 567
192.168.100.106 327
192.168.100.107 671
192.168.100.109 2
192.168.100.150 193
Hadoopで動かしてみよう。
まずカウント対象となるログをHDFSにコピーする。
[hadoop@isis ~]$ hadoop dfs -put apachelog apachelog
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -ls
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2013-07-28 12:57 /user/hadoop/apachelog
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -ls apachelog
Found 1 items
-rw-r--r-- 1 hadoop supergroup 999290 2013-07-28 12:57 /user/hadoop/apachelog/httpd-access.log
[hadoop@isis ~]$
実行。
hadoopにhadoop-streaming-1.0.0.jarを与え、input、outputのほかに、mapperとreducerも指定する。
-mapper -reducerとしてローカルファイルシステムでのパスを与える。
同時に、-fileでそれぞれのスクリプトを指定すると、スクリプトファイルをリモートのノードへ送ってくれる。
コマンドはすごく長くなる。
エスケープシーケンスを使って適宜改行し、見やすくしてタイプミスを防ぐ。
hadoop jar \
/usr/local/share/hadoop/contrib/streaming/hadoop-streaming-1.0.0.jar \
-input apachelog \
-output apachelog.out \
-mapper /home/hadoop/sandbox/mapper/mapper.py \
-reducer /home/hadoop/sandbox/mapper/reducer.py \
-file /home/hadoop/sandbox/mapper/mapper.py \
-file /home/hadoop/sandbox/mapper/reducer.py
実際のログ
[hadoop@isis ~]$ hadoop jar \
> -output apachelog.out \
> -mapper /home/hadoop/sandbox/mapper/mapper.py \
> /usr/local/share/hadoop/contrib/streaming/hadoop-streaming-1.0.0.jar \
> -file /home/hadoop/sandbox/mapper/mapper.py \
> -file /home/hadoop/sandbox/mapper/reducer.py \
> -input apachelog \
> -output apachelog.out \
> -mapper /home/hadoop/sandbox/mapper/mapper.py \
> -reducer /home/hadoop/sandbox/mapper/reducer.py \
> -file /home/hadoop/sandbox/mapper/mapper.py \
> -file /home/hadoop/sandbox/mapper/reducer.py
packageJobJar: [/home/hadoop/sandbox/mapper/mapper.py, /home/hadoop/sandbox/mapper/reducer.py, /tmp/hadoop-hadoop/hadoop-unjar1190814954490199586/] [] /tmp/streamjob8960127756596123730.jar tmpDir=null
13/07/28 13:30:15 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/28 13:30:43 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
13/07/28 13:30:43 INFO streaming.StreamJob: Running job: job_201307281252_0001
13/07/28 13:30:43 INFO streaming.StreamJob: To kill this job, run:
13/07/28 13:30:43 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201307281252_0001
13/07/28 13:30:43 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201307281252_0001
13/07/28 13:30:46 INFO streaming.StreamJob: map 0% reduce 0%
13/07/28 13:39:14 INFO streaming.StreamJob: map 100% reduce 0%
13/07/28 13:40:27 INFO streaming.StreamJob: map 100% reduce 33%
13/07/28 13:40:33 INFO streaming.StreamJob: map 100% reduce 67%
13/07/28 13:40:49 INFO streaming.StreamJob: map 100% reduce 100%
13/07/28 13:42:39 INFO streaming.StreamJob: Job complete: job_201307281252_0001
13/07/28 13:42:41 INFO streaming.StreamJob: Output: apachelog.out
結果の確認
hadoop dfs -catなどで。
[hadoop@isis ~]$ hadoop dfs -ls
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2013-07-28 12:57 /user/hadoop/apachelog
drwxr-xr-x - hadoop supergroup 0 2013-07-28 13:42 /user/hadoop/apachelog.out
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -ls apachelog.out
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2013-07-28 13:42 /user/hadoop/apachelog.out/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2013-07-28 13:30 /user/hadoop/apachelog.out/_logs
-rw-r--r-- 1 hadoop supergroup 1438 2013-07-28 13:40 /user/hadoop/apachelog.out/part-00000
[hadoop@isis ~]$
[hadoop@isis ~]$ hadoop dfs -cat apachelog.out/part-00000
127.0.0.1 567
192.168.100.106 327
192.168.100.107 671
192.168.100.109 2
192.168.100.150 193
[hadoop@isis ~]$
以上