2010年1月22日金曜日

非同期でクエリを実行してORを実現する #appengine

SDK1.2.8-prereleaseの頃にmakeAsyncCall()の導入に気づいて思いついた、クエリを複数に分けて非同期実行して結果をマージすることで、条件をOR結合するのと同じ事ができる!非同期ならそこそこ速いんじゃね?というアイデアの検証がようやく完了しました。

テストデータと仕様

1-1000までのIDを持つキーを作成し、そのキーのIDの数値が2で割り切れるならmod2という属性の値がtrue、割り切れないならfalseというカンジでmod3,mod5とか用意した。で、クエリは「2または3または5で割り切れる」というものを抽出するという簡単なもの。条件的にはmod2 EQUAL true OR mod3 EQUAL true OR mod5 EQUAL trueとなり、分けるとmod2 EQUAL truemod3 EQUAL truemod5 EQUAL trueの3本を走らせて結果をマージするというもの。うん、でも本当はそんな細かいとこはどうでも良くて、重要なのはクエリを分割実行してマージする、しかもクエリは非同期で一気にみっつ実行するという点です。

ソースコード

まずは普通にクエリをみっつ走らせてマージする方法。普通なカンジです。

void sync2or3or5(PrintWriter w) {
  EntityQuery q2 =
      Datastore.query(KIND).filter("mod2", FilterOperator.EQUAL, true).offset(0).limit(
          1000).prefetchSize(1000);
  EntityQuery q3 =
      Datastore.query(KIND).filter("mod3", FilterOperator.EQUAL, true).offset(0).limit(
          1000).prefetchSize(1000);
  EntityQuery q5 =
      Datastore.query(KIND).filter("mod5", FilterOperator.EQUAL, true).offset(0).limit(
          1000).prefetchSize(1000);

  long start = System.currentTimeMillis();
  List<Entity> r2 = q2.asList();
  List<Entity> r3 = q3.asList();
  List<Entity> r5 = q5.asList();
  List<Entity> merged = merge(Arrays.asList(r2, r3, r5));
  w.println("count=" + merged.size() + ", " + (System.currentTimeMillis() - start) + "[ms]");
}

List<Entity> merge(List<List<Entity>> lists) {
  Map<Key, Entity> map = new HashMap<Key, Entity>();
  for (List<Entity> list : lists) {
    for (Entity entity : list) {
      if (map.containsKey(entity.getKey()) == false) {
        map.put(entity.getKey(), entity);
      }
    }
  }
  return new ArrayList<Entity>(map.values());
}

次に、クエリを非同期に実行する方法。makeAsyncCall()を使っています。また、QueryDatastorePb.QueryというProtocolBufferオブジェクトに変換するのにPbUtilというクラスを使ってますが、それについては後述。

void async2or3or5a(PrintWriter w) throws InterruptedException, ExecutionException {
  Query q2 = new Query(KIND).addFilter("mod2", FilterOperator.EQUAL, true);
  Query q3 = new Query(KIND).addFilter("mod3", FilterOperator.EQUAL, true);
  Query q5 = new Query(KIND).addFilter("mod5", FilterOperator.EQUAL, true);
  FetchOptions fetchOptions = FetchOptions.Builder.withOffset(0).limit(1000).prefetchSize(1000);

  long start = System.currentTimeMillis();

  DatastorePb.Query qPB2 = PbUtil.toQueryRequestPb(q2, fetchOptions);
  DatastorePb.Query qPB3 = PbUtil.toQueryRequestPb(q3, fetchOptions);
  DatastorePb.Query qPB5 = PbUtil.toQueryRequestPb(q5, fetchOptions);

  Delegate<Environment> delegate = ApiProxy.getDelegate();
  Environment env = ApiProxy.getCurrentEnvironment();
  ApiConfig config = new ApiProxy.ApiConfig();
  config.setDeadlineInSeconds(5.0);
  Future<byte[]>[] futures = new Future[3];

  futures[0] = delegate.makeAsyncCall(env, "datastore_v3", "RunQuery", qPB2.toByteArray(), config);
  futures[1] = delegate.makeAsyncCall(env, "datastore_v3", "RunQuery", qPB3.toByteArray(), config);
  futures[2] = delegate.makeAsyncCall(env, "datastore_v3", "RunQuery", qPB5.toByteArray(), config);

  List<List<Entity>> lists = new ArrayList<List<Entity>>();
  for (int i = 0; i < futures.length; i++) {
    byte[] bytes = futures[i].get();
    DatastorePb.QueryResult rPb = new DatastorePb.QueryResult();
    rPb.mergeFrom(bytes);
    Iterator<EntityProto> it = rPb.resultIterator();
    List<Entity> entities = new ArrayList<Entity>();
    while (it.hasNext()) {
      entities.add(EntityTranslator.createFromPb(it.next()));
    }
    lists.add(entities);
  }
  w.println("count=" + merge(lists).size() + ", " + (System.currentTimeMillis() - start) + "[ms]");
}

実行結果

それぞれ22回ずつ実行して、一番大きい値と一番小さい値を省いた結果は以下のようになりました。

通常実行非同期実行
623294
710229
596286
760275
539217
521294
572297
1054287
569251
534305
830233
494285
442379
731388
731218
510268
467199
547593
483225

ぱっと見ただけで、非同期実行の方がかなり速いことがわかります。結果件数はどちらも734件で、消費した時間の平均は通常実行が622.1ms、非同期実行が296.6msとなりました。倍くらいの差が出ています。非同期なんだから当然、と思われるかもしれませんが、色々と苦労したのでこの結果が出てくれて嬉しいのですw

疑惑のPbUtil

コレについては何も突っ込まないでください、自分もあまり気持ちよく無いのです。…まぁ、クエリの非同期実行を優先したと言うことで…。

package com.google.appengine.api.datastore;

import com.google.apphosting.api.DatastorePb;

public class PbUtil {
  public static DatastorePb.Query toQueryRequestPb(Query q, FetchOptions fetchOptions) {
    return QueryTranslator.convertToPb(q, fetchOptions);
  }
}

追記:おまけ

上記のPbUtilが存在している前提だけど、ユーティリティ化してみた

コメントを投稿