AD-TECH
Lab BLOG
アドテクLab ブログ

NEWS
  • リクルートデータ組織のブログをはじめました。※最新情報はRecruit Data Blogをご覧ください。

Luigiでデータ処理をきれいに書こう

2017/02/14akiba

このエントリーをはてなブックマークに追加

エンジニアの秋庭です。

本記事では、Pythonのバッチ処理フレームワークLuigiの紹介をしつつ、読みやすいデータ処理の実装について書いていきます。 さて、本記事を書くに至った理由ですが、分析コードのリファクタリングに苦労した経験からです。具体的には、

  • データ処理に必要なファイルや処理の依存関係がわからない
  • 分析用の自作クラスや関数の使い方がわかりづらい

などなどです。 上記のようなちょっと管理が難しいコードが生まれてしまうのは、「試験的な運用だから…」とか「2週間しか動かさないコードだから…」 とか大人の事情があったりする場合もありますが、とにかくコードがきれいなことに越したことはありません。

また、さまざまな開発ツールやフレームワークが提案されているWeb開発と比べて、分析コードの開発手法はまだまだ議論の余地があるように感じます。

今回ブログに載せるコードは、技術的に高度なものではありませんが、データ分析を始めたばかりの方、きれいなデータ処理を書きたいと思っている方に一読いただければ幸いです。

目次

Luigiの紹介

Luigiは、Pythonのバッチ処理フレームワークです。 データ処理をTaskという単位で定義していき、依存関係の記述やワークフローの可視化などを行うことができます。 基本的に、Taskは以下のメソッドを実装します。

  • requires -> 依存Taskを記述
  • output -> 出力を記述
  • run -> 出力を作成するロジックを記述

outputメソッドがTrueを返すときは、runメソッドを実行しません。 outputメソッドがFalseを返すときは、runメソッドを実行して、出力を作成します。 このとき、requiresメソッドで依存TaskのoutputがFalseを返すときは、先に依存Taskのrunを実行します。

以降は、サンプルコードを見ながら、luigiによるデータ処理について説明していきます。 データ処理の流れとしては、

  • Step.1 S3から元データ取得
  • Step.2 ローカルで元データを加工
  • Step.3 加工したデータをS3へアップロード

というものです。

もちろん、これらのデータ処理はluigiなしでも実装できます。 例えば、以下のように個々の処理を書き下して、一個ずつ実行していきます。 このような方法だと、コードが複雑になるにつれ、「process.pyを動かすまえに、get_s3.pyが必要だっけ?」とか、 「このファイルを実行すると、どこが変更されるのか…??」といった管理、運用上の問題が色々と発生しそうです。

$ python get_s3.py
$ python process.py
$ python upload_s3.py

次節以降では、簡単な例からluigiを導入し、徐々にコードを改良していきます。

Exampleその1

まずは、Step.1 ~ Step.3をひとつのTaskに記述してしまいます。 プロジェクトのディレクトリ構造は以下です。

├─ luigi.cfg
├─ data
└─ tasks
       └─ run_luigi.py

各Taskに必要なパラメータはluigi.cfgに書きます。 YourFirstTaskで使用するファイルのロケーションなどをluigi.cfgの[YourFisrtTask]に記述しておきます。 また、luigi.s3.S3Client()を呼び出したときに、[s3]のパラメータを読み込んでくれます。

luigi.cfg

[YourFirstTask]
s3_source_path=s3://luigi.tutorial/path/to/data/user_scores.tsv
local_input_path=data/user_scores.tsv
local_processed_path=data/processed.tsv
s3_destination_path=s3://luigi.tutorial/path/to/results/processed.tsv

[s3]
host=s3-xx-xxxxx-1.amazonaws.com
calling_format=boto.s3.connection.OrdinaryCallingFormat
aws_access_key_id=XXXXXXXXXXXXXXX
aws_secret_access_key=XXXXXXXXXXXXXXXXX

tasks/run_luigi.py

import luigi
import luigi.s3
import lib


class YourFirstTask(luigi.Task):
    # 各パラメータはluigi.cfgで定義
    s3_client = luigi.s3.S3Client()
    s3_source_path = luigi.Parameter()
    local_input_path = luigi.Parameter()
    local_processed_path = luigi.Parameter()
    s3_destination_path = luigi.Parameter()

    def requires(self):
        return None

    def output(self):
        # このTaskの生成物を定義
        return luigi.s3.S3Target(path=self.s3_destination_path)

    def run(self):
        # 入力ファイルをS3からget
        self.s3_client.get(s3_path=self.s3_source_path,
                           destination_local_path=self.local_input_path)

        # getしたファイルをローカルで加工
        with open(self.local_input_path, 'r') as fin, open(self.local_processed_path, 'w') as fout:
            for line in fin:
                user_id, x, y = line.rstrip('\n').split('\t')
                value = int(x) + int(y)
                fout.write('{user}{sep}{value}\n'.format(user=user_id,
                                                         sep='\t',
                                                         value=value))
        # 加工後のファイルをS3へupload
        self.s3_client.put(local_path=self.local_processed_path,
                           destination_s3_path=self.s3_destination_path)


if __name__ == '__main__':
    luigi.run()

以下のように実行します。

python -m tasks.run_luigi YourFirstTask

このExampleでは、luigiのモジュールに従って、S3ファイルの操作を行っています。 ファイル操作などをluigiモジュールに統一することで、チームでの開発・運用の効率化に繋がりそうです。

一方で、run内に複数のロジックが記述されており、各Stepが分離されていません。 次節では、ひとつのStepにひとつのTaskを割り当てます。

Exampleその2

Exampleその2では、設定ファイルを以下のように変更します。

luigi.cfg

[GetFileFromS3]
s3_source_path=s3://luigi.tutorial/path/to/data/user_scores.tsv
local_input_path=data/user_scores.tsv

[ProcessLocalFile]
local_processed_path=data/processed.tsv
sep='\t'

[UploadFileToS3]
s3_destination_path=s3://luigi.tutorial/path/to/results/processed.tsv

[s3]
host=s3-xx-xxxxx-1.amazonaws.com
calling_format=boto.s3.connection.OrdinaryCallingFormat
aws_access_key_id=XXXXXXXXXXXXXXX
aws_secret_access_key=XXXXXXXXXXXXXXXXX

各Taskを実行するPythonファイルは以下です。 各Taskのrequiresとoutputによって、依存しているTaskと生成ファイルが明確になっています。

tasks/run_luigi.py

import luigi
import luigi.s3
import lib


class GetFileFromS3(luigi.Task):
    """
    s3 -> local
    """
    s3_client = luigi.s3.S3Client()
    s3_source_path = luigi.Parameter()
    local_input_path = luigi.Parameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget(path=self.local_input_path)

    def run(self):
        self.output().makedirs()
        # s3からファイルをget.
        self.s3_client.get(s3_path=self.s3_source_path,
                           destination_local_path=self.local_input_path)


class ProcessLocalFile(luigi.Task):
    """
    local -> processed local
    """
    sep = luigi.Parameter(default='\t')
    local_processed_path = luigi.Parameter()

    def requires(self):
        return GetFileFromS3()

    def output(self):
        return luigi.LocalTarget(path=self.local_processed_path)

    def run(self):
        self.output().makedirs()
        # ファイル加工のロジックを記述.
        with self.input().open('r') as fin, self.output().open('w') as fout:
            for line in fin:
                user_id, x, y = line.rstrip('\n').split('\t')
                value = int(x) + int(y)
                fout.write('{user}{sep}{value}\n'.format(user=user_id,
                                                         sep=self.sep,
                                                         value=value))


class UploadFileToS3(luigi.Task):
    """
    processed local -> s3
    """
    s3_client = luigi.s3.S3Client()
    s3_destination_path = luigi.Parameter()

    def requires(self):
        return ProcessLocalFile()

    def output(self):
        return luigi.s3.S3Target(path=self.s3_destination_path)

    def run(self):
        # s3にファイルをupload.
        self.s3_client.put(local_path=self.input().path,
                           destination_s3_path=self.s3_destination_path)


if __name__ == '__main__':
    luigi.run()

実行コマンドは以下です。

python -m tasks.run_luigi UploadFileToS3

Exampleその2では、ファイル操作と依存関係をフレームワークのお作法に従って記述することで、データ処理の流れを明確にすることができました。 最後に、独自に実装したファイル加工の部分にテストコードを書いていきます。

Exampleその3

ProcessLocalFile内の、ロジック部分をメソッドで切り出し、テストを書きます。

├─ luigi.cfg
├─ lib
│      └─ __init__.py
├─ tasks
│      └─ run_luigi.py
└─ tests
      └─ test_sum_value.py

tasks/run_luigi.py

import luigi
import luigi.s3
import lib

...
    def run(self):
        self.output().makedirs()
        with self.input().open('r') as fin, self.output().open('w') as fout:
            for line in fin:
                user_id, x, y = line.rstrip('\n').split('\t')
                value = lib.add_values(x, y)
                fout.write('{user}{sep}{value}\n'.format(user=user_id,
                                                         sep=self.sep,
                                                         value=value)
...

lib/__init__.py

def add_values(x, y):
    return int(x) + int(y)

tests/test_sum_value.py

from unittest import TestCase
import lib


class TestSumValue(TestCase):

    def test_sum_value(self):
        expected = 5
        actual = lib.add_values(2, 3)
        self.assertEqual(expected, actual)

メソッドとして切り出して、メンテンスしやすいコードになりました。 今回書いたコードは、ロジックが簡単なものでしたが、”run内のロジックは単体テストを書く”というルールを共有できていれば、 難解な実装を避けることができるはずです。 (単体テストがうまく書けなければ、ロジックの切り分けを考え直す)

まとめ

本記事では、サンプルコードを作成しながら、luigiを利用したデータ処理について見てきました。

記載したコードは簡単なものでしたが、より複雑な処理を行いたい場合は、 luigiで用意されているモジュールをカスタマイズすることも可能です。(自作したらテスト書く)

フレームワークを導入するというのは、一つの手段でしかありませんが、luigiを使って最初に記載した課題を解決することができました。

  • データ処理に必要なファイルや処理の依存関係がわからない

  → フレームワークに従って、ファイル操作や依存関係を実装する

  • 分析用の自作クラスや関数の使い方がわかりづらい

  → 自前のロジックやカスタマイズしたモジュールにテストを書く

本記事はここまでです。お読みいただきありがとうございました。