2009年11月4日水曜日

#appengine でスキーマ変更に対応するバッチ処理を行う

2009/11/05追記

ひがさんより指摘を頂いて、30秒制限に関する補足を本文中に青字で追記しました。いつもありがとうございます、助かります>ひがさん

ここから本文

タイトルの処理について、いくつかノウハウを書いておきます。ポイントは以下の2点。

  • 全てのエンティティにスキーマバージョンを保持する
  • ローカル環境からデプロイ環境へ直結してバッチ処理を実行する事で、30秒制限なんて無視してしまう

実例をもとに説明してみます。最近、appengine java night用のまとめページとかに使おうとしているサイトを運営していて、そこに「TwitterでAppEngine関連についてつぶやかれた内容を収集する」という機能を実装しました。しかし、つぶやきを保存する際の投稿者の情報として「Name」を保持しているものの「ScreenName」を保持しておらず、投稿者のタイムラインページへのリンクを作成できないという問題がおこっていました。なので、つぶやき保存用のエンティティに「ScreenName」という属性を新たに追加し、つぶやきの保存時にはその値を取得するように修正しました。しかし、この修正以前に保存されたつぶやきについては、ScreenName属性を持たない状態になってしまっているので、その値を設定してやる必要があります。今回はこれをバッチ処理しようと思います。
あ、この問題は、バッチ処理の説明(このエントリ)を書くための布石であって、不具合では無いんですよっっ?

全てのエンティティにスキーマバージョンを保持する

主に今回の修正のような「属性の追加」が危険なのです。AppEngineでは「特定の属性を持たないエンティティ」というスキャンができません。インデックスを定義していても、そのインデックス定義に関する属性を持たないエンティティはそもそもインデックスのエントリが作成されないからです。そこで、エンティティの定義にスキーマバージョンを最初から用意しておく事をおすすめします。今回の修正だと、エンティティ用のクラスの修正は以下のようになっています。

ローカル環境からデプロイ環境へ直結してバッチ処理を実行

移行処理をしてやるエンティティの抽出は上に書いた「スキーマバージョン」の存在により、簡単に抽出できるようになりましたので、移行処理の準備は問題ありません。
が、移行処理自体はどのような手段を使うのか?が問題です。AppEngineでは1リクエスト30秒の制限があるので、一度に大量のデータは処理できません。

  • 特定の件数で制限してcronする?
  • 特定の件数ずつばらしてtaskqueueを使う?
まぁ、どちらでも可能です。でも、本来動作させたいプロダクトコードとは関係ないモジュールをサーバにデプロイするのもあんまり気持ちよいものではありません。そこでローカル環境からバッチ処理を行おう、というのが今回の趣旨です。データストアに対する一回の操作(Datastore#put()とかDatastore#get()とか、データストアサービスに対する一回のメソッド実行)に関しては30秒制限がある事は変わりませんが、それらを何度でも実行できるので、処理全体としては30秒制限も関係無くなりますしね。今回の処理だと、以下のような手順を行います。

  1. schemaVersion == 1の条件で移行対象のエンティティを取得する。
  2. 念のため、それらのエンティティのバックアップを保存しておく。
  3. UserId(Twitterのユーザ)で集約し、Twitterのユーザに対応するscreenNameを取得して、そのユーザに該当するエンティティを以下のように更新する
    • 取得した screenNameを設定
    • スキーマバージョンを2に設定
  4. 更新したエンティティをデプロイ環境へ保存する。

それぞれの手順内の細かい処理は省いたスケルトン部分のコードは次のようになります。

setUpBeforeClass(); // ローカルのAppEngine環境を開始する
try {
  // デプロイ環境のMakeSyncCallServletに接続するためのアカウント情報を入力させる
  getAccountInfo();
  // 移行対象のエンティティを取得する
  final List<Tweet> tweets = getOldEntities(); 
  // ローカルのデータストアにバックアップを作成する
  backupToLocalDatastore(tweets);
  // データ移行済みのエンティティを作成する
  final List<Tweet> updated = createUpdatedEntity(tweets, 50);
  // デプロイ環境側のデータストアに、データ移行済みのエンティティを保存する
  MakeSyncCallServletDelegate.runInDelegateWithAuth(new Runnable() {

    @Override
    public void run() {
      executeBatch(updated);
    }
  }, email, password, SERVER, SERVLET);
} finally {
  tearDownAfterClass(); // ローカルのAppEngine環境を終了する
}

デプロイ環境との直結には、デプロイ側にMakeSyncCallServlet、ローカル側MakeSyncCallServletDelegateという、自作のリモート接続の仕組みを使っています。ソースコードの全体は次のリンクから見る事ができます。このソースだと、AppEngineの制限よりもTwitterの制限が回避できなくて困ったりするんで、そのあたり上限の件数を設定したりしていますけど、実際にアプリ内の用件でバッチ処理を行うときにはあまり関係ないですね。あと、処理後にデプロイ環境で使われているMemcacheを全てクリアしたりもしています。プロダクトコード内の処理が走らない限りMemcacheにキャッシュされたデータを使用する設計も多いだろうし、そういう事も想定されるならMemcacheのクリアも忘れずに実行しておかないといけません。

MakeSyncCall関連については以下の資料を参考にしてください。LT用であんまり詳しくはないですけれども、大枠は資料の通りです。実際にはこのサーブレットをweb.xml内でセキュリティをかけて配置しています。

ちなみに、この直結の仕組みを使うとクライアント側アプリケーションから直接Datastore.query()..とかDatastore.put()...といった操作をする、昔のクラサバのような組み方でアプリケーションを作れるって事ですね!

1 件のコメント:

takezaki さんのコメント...

これはいいですね~~。クライアントから大量のデータを送信したいときに使えますね。