エンジニアの秋庭です。
本記事では、Pythonのバッチ処理フレームワークLuigiの紹介をしつつ、読みやすいデータ処理の実装について書いていきます。 さて、本記事を書くに至った理由ですが、分析コードのリファクタリングに苦労した経験からです。具体的には、
などなどです。 上記のようなちょっと管理が難しいコードが生まれてしまうのは、「試験的な運用だから…」とか「2週間しか動かさないコードだから…」 とか大人の事情があったりする場合もありますが、とにかくコードがきれいなことに越したことはありません。
また、さまざまな開発ツールやフレームワークが提案されているWeb開発と比べて、分析コードの開発手法はまだまだ議論の余地があるように感じます。
今回ブログに載せるコードは、技術的に高度なものではありませんが、データ分析を始めたばかりの方、きれいなデータ処理を書きたいと思っている方に一読いただければ幸いです。
Luigiは、Pythonのバッチ処理フレームワークです。 データ処理をTaskという単位で定義していき、依存関係の記述やワークフローの可視化などを行うことができます。 基本的に、Taskは以下のメソッドを実装します。
outputメソッドがTrueを返すときは、runメソッドを実行しません。 outputメソッドがFalseを返すときは、runメソッドを実行して、出力を作成します。 このとき、requiresメソッドで依存TaskのoutputがFalseを返すときは、先に依存Taskのrunを実行します。
以降は、サンプルコードを見ながら、luigiによるデータ処理について説明していきます。 データ処理の流れとしては、
というものです。
もちろん、これらのデータ処理はluigiなしでも実装できます。 例えば、以下のように個々の処理を書き下して、一個ずつ実行していきます。 このような方法だと、コードが複雑になるにつれ、「process.pyを動かすまえに、get_s3.pyが必要だっけ?」とか、 「このファイルを実行すると、どこが変更されるのか…??」といった管理、運用上の問題が色々と発生しそうです。
$ python get_s3.py
$ python process.py
$ python upload_s3.py
次節以降では、簡単な例からluigiを導入し、徐々にコードを改良していきます。
まずは、Step.1 ~ Step.3をひとつのTaskに記述してしまいます。 プロジェクトのディレクトリ構造は以下です。
├─ luigi.cfg
├─ data
└─ tasks
└─ run_luigi.py
各Taskに必要なパラメータはluigi.cfgに書きます。 YourFirstTaskで使用するファイルのロケーションなどをluigi.cfgの[YourFisrtTask]に記述しておきます。 また、luigi.s3.S3Client()を呼び出したときに、[s3]のパラメータを読み込んでくれます。
luigi.cfg
[YourFirstTask]
s3_source_path=s3://luigi.tutorial/path/to/data/user_scores.tsv
local_input_path=data/user_scores.tsv
local_processed_path=data/processed.tsv
s3_destination_path=s3://luigi.tutorial/path/to/results/processed.tsv
[s3]
host=s3-xx-xxxxx-1.amazonaws.com
calling_format=boto.s3.connection.OrdinaryCallingFormat
aws_access_key_id=XXXXXXXXXXXXXXX
aws_secret_access_key=XXXXXXXXXXXXXXXXX
tasks/run_luigi.py
import luigi
import luigi.s3
import lib
class YourFirstTask(luigi.Task):
# 各パラメータはluigi.cfgで定義
s3_client = luigi.s3.S3Client()
s3_source_path = luigi.Parameter()
local_input_path = luigi.Parameter()
local_processed_path = luigi.Parameter()
s3_destination_path = luigi.Parameter()
def requires(self):
return None
def output(self):
# このTaskの生成物を定義
return luigi.s3.S3Target(path=self.s3_destination_path)
def run(self):
# 入力ファイルをS3からget
self.s3_client.get(s3_path=self.s3_source_path,
destination_local_path=self.local_input_path)
# getしたファイルをローカルで加工
with open(self.local_input_path, 'r') as fin, open(self.local_processed_path, 'w') as fout:
for line in fin:
user_id, x, y = line.rstrip('\n').split('\t')
value = int(x) + int(y)
fout.write('{user}{sep}{value}\n'.format(user=user_id,
sep='\t',
value=value))
# 加工後のファイルをS3へupload
self.s3_client.put(local_path=self.local_processed_path,
destination_s3_path=self.s3_destination_path)
if __name__ == '__main__':
luigi.run()
以下のように実行します。
python -m tasks.run_luigi YourFirstTask
このExampleでは、luigiのモジュールに従って、S3ファイルの操作を行っています。 ファイル操作などをluigiモジュールに統一することで、チームでの開発・運用の効率化に繋がりそうです。
一方で、run内に複数のロジックが記述されており、各Stepが分離されていません。 次節では、ひとつのStepにひとつのTaskを割り当てます。
Exampleその2では、設定ファイルを以下のように変更します。
luigi.cfg
[GetFileFromS3]
s3_source_path=s3://luigi.tutorial/path/to/data/user_scores.tsv
local_input_path=data/user_scores.tsv
[ProcessLocalFile]
local_processed_path=data/processed.tsv
sep='\t'
[UploadFileToS3]
s3_destination_path=s3://luigi.tutorial/path/to/results/processed.tsv
[s3]
host=s3-xx-xxxxx-1.amazonaws.com
calling_format=boto.s3.connection.OrdinaryCallingFormat
aws_access_key_id=XXXXXXXXXXXXXXX
aws_secret_access_key=XXXXXXXXXXXXXXXXX
各Taskを実行するPythonファイルは以下です。 各Taskのrequiresとoutputによって、依存しているTaskと生成ファイルが明確になっています。
tasks/run_luigi.py
import luigi
import luigi.s3
import lib
class GetFileFromS3(luigi.Task):
"""
s3 -> local
"""
s3_client = luigi.s3.S3Client()
s3_source_path = luigi.Parameter()
local_input_path = luigi.Parameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget(path=self.local_input_path)
def run(self):
self.output().makedirs()
# s3からファイルをget.
self.s3_client.get(s3_path=self.s3_source_path,
destination_local_path=self.local_input_path)
class ProcessLocalFile(luigi.Task):
"""
local -> processed local
"""
sep = luigi.Parameter(default='\t')
local_processed_path = luigi.Parameter()
def requires(self):
return GetFileFromS3()
def output(self):
return luigi.LocalTarget(path=self.local_processed_path)
def run(self):
self.output().makedirs()
# ファイル加工のロジックを記述.
with self.input().open('r') as fin, self.output().open('w') as fout:
for line in fin:
user_id, x, y = line.rstrip('\n').split('\t')
value = int(x) + int(y)
fout.write('{user}{sep}{value}\n'.format(user=user_id,
sep=self.sep,
value=value))
class UploadFileToS3(luigi.Task):
"""
processed local -> s3
"""
s3_client = luigi.s3.S3Client()
s3_destination_path = luigi.Parameter()
def requires(self):
return ProcessLocalFile()
def output(self):
return luigi.s3.S3Target(path=self.s3_destination_path)
def run(self):
# s3にファイルをupload.
self.s3_client.put(local_path=self.input().path,
destination_s3_path=self.s3_destination_path)
if __name__ == '__main__':
luigi.run()
実行コマンドは以下です。
python -m tasks.run_luigi UploadFileToS3
Exampleその2では、ファイル操作と依存関係をフレームワークのお作法に従って記述することで、データ処理の流れを明確にすることができました。 最後に、独自に実装したファイル加工の部分にテストコードを書いていきます。
ProcessLocalFile内の、ロジック部分をメソッドで切り出し、テストを書きます。
├─ luigi.cfg
├─ lib
│ └─ __init__.py
├─ tasks
│ └─ run_luigi.py
└─ tests
└─ test_sum_value.py
tasks/run_luigi.py
import luigi
import luigi.s3
import lib
...
def run(self):
self.output().makedirs()
with self.input().open('r') as fin, self.output().open('w') as fout:
for line in fin:
user_id, x, y = line.rstrip('\n').split('\t')
value = lib.add_values(x, y)
fout.write('{user}{sep}{value}\n'.format(user=user_id,
sep=self.sep,
value=value)
...
lib/__init__.py
def add_values(x, y):
return int(x) + int(y)
tests/test_sum_value.py
from unittest import TestCase
import lib
class TestSumValue(TestCase):
def test_sum_value(self):
expected = 5
actual = lib.add_values(2, 3)
self.assertEqual(expected, actual)
メソッドとして切り出して、メンテンスしやすいコードになりました。 今回書いたコードは、ロジックが簡単なものでしたが、”run内のロジックは単体テストを書く”というルールを共有できていれば、 難解な実装を避けることができるはずです。 (単体テストがうまく書けなければ、ロジックの切り分けを考え直す)
本記事では、サンプルコードを作成しながら、luigiを利用したデータ処理について見てきました。
記載したコードは簡単なものでしたが、より複雑な処理を行いたい場合は、 luigiで用意されているモジュールをカスタマイズすることも可能です。(自作したらテスト書く)
フレームワークを導入するというのは、一つの手段でしかありませんが、luigiを使って最初に記載した課題を解決することができました。
→ フレームワークに従って、ファイル操作や依存関係を実装する
→ 自前のロジックやカスタマイズしたモジュールにテストを書く
本記事はここまでです。お読みいただきありがとうございました。