ジョブキューイングシステムをどうするかでチームのリーダーとやりあって考えたことがあるのでまとめておく。

Rails で使うジョブキューイングシステムの技術選定で、リーダーは Amazon SQS 推し(レガシーシステムで SQS を使っている)、自分は Sidekiq 推しだった。前職時代に Sidekiq を使ってトラブルに遭遇したことはなかったし、とても簡単に使えるので Sidekiq で十分だと思っていた。 Sidekiq は GitHub でのスター数は 9000 オーバーで、 Rails の ActiveJob バックエンドとしては事実上のデファクトスタンダードだといえると思う。ググれば情報がいっぱい出てくるし、チームメンバーもリーダー以外は全員 Sidekiq の使用経験があった。

mperham/sidekiq

mperham/sidekiq

Simple, efficient background processing for Ruby. Contribute to mperham/sidekiq development by creating an account on GitHub.

github.com

リーダーが Sidekiq に反対する理由は以下だった。

  1. キューに可視性タイムアウトの概念がない( SQS にはある)
    ワーカーがキューメッセッージを取得したあと何らかの事情で一定時間内に処理を終えられなかった(ワーカーが突然死した場合など)未処理のジョブが再度ワーカーから見えるようになるので、ジョブの実行が保証される
  2. Redis が飛んだらジョブをロストする
    ElastiCache を使っているが、たしかに稀にメンテ祭などでフェイルオーバーが発生するなど困ることがあった
  3. Ruby 以外の言語から使えない
    Redis に書き込まれる情報は Sidekiq 専用フォーマットなので他の言語からも使う場合は読み取り君を作る必要がある

一方で自分が SQS に反対した理由は以下。

  1. 依存関係をソースコードに落とし込むことができない
    Sidekiq を使う場合は Redis と Sidekiq worker が動く Docker コンテナの情報を docker-compose.yml に書くことで依存関係を(バージョンまで含めて)宣言的に記述できる。 SQS の場合はそうはいかない。
  2. アプリケーションが AWS にロックインされる

    運用環境はすでにロックインされているが、アプリケーションが SQS という AWS のプロプライエタリな技術に依存すると、ソースコードが AWS と密結合になり他の IaaS に移行するときの障壁となる
  3. ローカル開発で利用することができない

    実際にローカル環境で非同期処理の検証不足が原因で機能の実装が漏れたまま production に deploy されたことが何度かあった。 localstack という AWS の機能をローカルに再現する技術はあるが、 SQS はオープンソースではないので完全に再現されるわけではない。

このような議論を経て、結局ジョブキューイングシステムには RabbitMQ を使うことになった。 RabbitMQ はリーダーが求める三つの要件を満たすし、オープンソースなので自分が SQS に反対する理由にも抵触しない。開発環境では Docker で RabbitMQ を動かし、 production では AWS にフルマネージドの RabbitMQ サービスはないので( ActiveMQ のマネージドサービス、 Amazon MQ というのはある)、 RabbitMQ の運用に特化した SaaS を利用することにした。

SQS に対する考えを整理する上で The Twelve-Factor App を改めて読んだが非常に参考になった。特に以下の三つの部分について、 SQS は Twelve-Factor App に反しており使うべきではないと思った。

II. 依存関係

アプリケーションが将来に渡って実行され得るすべてのシステムに存在するかどうか、あるいは将来のシステムでこのアプリケーションと互換性のあるバージョンが見つかるかどうかについては何の保証もない。アプリケーションがシステムツールを必要とするならば、そのツールをアプリケーションに組み込むべきである。

IV. バックエンドサービス

Twelve-Factor Appのコードは、ローカルサービスとサードパーティサービスを区別しない。アプリケーションにとっては、どちらもアタッチされたリソースであり、設定に格納されたURLやその他のロケーター、認証情報でアクセスする。Twelve-Factor Appのデプロイは、アプリケーションのコードに変更を加えることなく、ローカルで管理されるMySQLデータベースをサードパーティに管理されるサービス(Amazon RDSなど)に切り替えることができるべきである。同様に、ローカルのSMTPサーバーも、コードを変更することなくサードパーティのSMTPサービス(Postmarkなど)に切り替えることができるべきである。どちらの場合も、変更が必要なのは設定の中のリソースハンドルのみである。

X. 開発/本番一致

Twelve-Factor Appでは、継続的デプロイしやすいよう開発環境と本番環境のギャップを小さく保つ

たとえ理論的にはアダプターがバックエンドサービスの違いをすべて抽象化してくれるとしても、 Twelve-Factorの開発者は、開発と本番の間で異なるバックエンドサービスを使いたくなる衝動に抵抗する。 バックエンドサービスの違いは、わずかな非互換性が顕在化し、開発環境やステージング環境では正常に動作してテストも通過するコードが本番環境でエラーを起こす事態を招くことを意味する。この種のエラーは継続的デプロイを妨げる摩擦を生む。この摩擦とそれに伴って継続的デプロイが妨げられることのコストは、アプリケーションのライフサイクルに渡ってトータルで考えると非常に高くつく。

The Twelve-Factor App (日本語訳)

The Twelve-Factor App (日本語訳)

A methodology for building modern, scalable, maintainable software-as-a-service apps.

12factor.net

AWS の技術がどんなに優れていたとしても、自分はオープンソースではない AWS 独自のプロプライエタリな技術に依存してアプリケーションを作りたい訳ではない。運用の煩雑さ・手間から解放されたい、スケーラビリティを提供してほしい、というのが AWS に期待するところだ。 SQS はアプリケーションのソースコードの中に入り込んでくる。開発環境ではローカルの PostgreSQL 、 production では RDS の PostgreSQL インスタンスに接続先を変えるだけ、という風にプラガブルに切り替えることができない。開発効率性や移行可能性(ほかの IaaS に移ることができるか)を考えると、運用の効率性に特化して AWS を使いたいと思った。 Redshift とか DynamoDB とか Kinesis とか AWS の技術でしか実現できないことをやりたいときに手を出すのは悪くないと思うけど、AWS が提供するものなら何でも素晴らしいからすぐに飛びつくというのは間違っていると思う。

ちなみに CircleCI との距離の取り方はうまくいってると思う。いま deploy を CircleCI から行なっているが、 CircleCI が止まると deploy できなくなるのは困るので deploy 処理自体はシェルスクリプト化してある(👺 Hubot で Slack から AWS ECS にデプロイ)。 CircleCI が死んだら手元から deploy コマンドを実行するだけでよい。 CircleCI にやってもらっているのは、人間が手でも実行できることの自動化の部分だけだ。 CircleCI というサービスが終了したとしても恐らく簡単にほかのサービスに乗り換えられる。

まとめると、 IaaS / SaaS / PaaS を使う場合は以下に気をつけるべきだと思う。

  • ソースコードの中に特定のプラットフォームのプロプライエタリな技術に依存した部分が出てこないか
  • アプリケーションをローカル環境でも動かすことができるか
  • 運用やスケーラビリティに関してのみ依存するようにする
  • 人間が手でもできることの自動化のみに利用する

GW 中、十分インスタンスを用意しておいたが想定を超えるアクセスがあって負荷が高まり、 Alert が飛んでくる事態となった。車を運転中に iPhone をカーステにつないでいたところ Slack がピコピコ鳴り、嫁さんから「休みなのか仕事なのかハッキリしろ!」と言われたので Alert が上がらないようにオートスケールを仕込むことにした。 いみゅーたぶるいんふらすとらくちゃー諸兄からしたら「そんなの常識じゃん」みたいな話ばかりだけど、自分でやってみて得られた知見をまとめておきます。

なおここで言っているのは EC2 インスタンスのオートスケール( EC2 Auto Scaling )であり、 AWS の様々なリソースを包括的にオートスケールする AWS Auto Scaling とは異なります。

Amazon EC2 Auto Scaling(需要に合わせてコンピューティング性能を拡張)| AWS

Amazon EC2 Auto Scaling(需要に合わせてコンピューティング性能を拡張)| AWS

aws.amazon.com

AWS Auto Scaling(需要に合わせて複数のリソースをスケール)| AWS

AWS Auto Scaling(需要に合わせて複数のリソースをスケール)| AWS

aws.amazon.com

オートスケールをやるにあたって必要なこと

1. インスタンス起動時に最新のコードを pull してきてアプリケーションを起動させる

オートスケールしてきたインスタンスだけコードが古いとエラーが発生する。

2. インスタンス停止時にアプリケーションのログファイルをどっか別のところに書き出す

Auto Scaling Group のインスタンスは Stop ではなく Terminate されるため、インスタンス破棄後もログを参照できるように S3 に上げるとかして永続化させる必要がある。 Fluentd や CloudWatch Logs に集約するのでも良い。

3. AMI を定期的にビルドする

オートスケール対象のアプリケーションは枯れていて今更新しいミドルウェアが追加されたりすることはなくてソースコードを git clone してくるだけで十分なのだが、 Gemfile に変更があった場合を想定して少しでもサービスインを早めるため( bundle install を一瞬で終わらせるため)、 master ブランチへの変更が行われなくなる定時間際のタイミングで Packer でビルドして AMI にプッシュするようにしている。

Packer by HashiCorp

Packer by HashiCorp

Packer is a free and open source tool for creating golden images for multiple platforms from a single source configuration.

www.packer.io

4. Launch Configuration を自動作成

AMI のプッシュが成功したら最新の AMI を利用する Launch Configuration を作成し、 Auto Scaling Group も最新の Launch Configuration を参照するように変更する。 AWS CLI でできるので自動化してある。

5. Auto Scaling Group の設定をいい感じにやる

Auto Scaling Policy を決め( CPU 使用率が一定水準を超えたらとか、 Load Balancer へのリクエスト数が一定以上になったらとか)、時間指定で Desired Count や Minimum Count を指定したければ Schedule をいい感じに組む。 AWS Management Console 上でポチポチするだけでよい。

6. deploy 対象の調整を頑張る

当初は Auto Scaling Group のインスタンスには deploy を行わない(業務時間中はオートスケールしない、夜間と土日だけオートスケールさせる)つもりだった。

8bd51139504996f811a14c1dd04e4c25.jpeg

しかしメトリクスを確認すると朝の通勤時間帯や平日の昼休み時間帯などにもアクセス数が多いことがわかったので一日中 Auto Scaling Group インスタンスを稼働させることにした。となると deploy 対象が動的に増減する、ということなので Capistrano の deploy 対象もいい感じに調整しないといけない。 AWS SDK Ruby で稼働中の EC2 インスタンスの情報はわかるので、 deploy 時には動的に deploy 対象を判定するようにした。

auto-scaling-all-day.png

本当は push 型 deploy をやめて pull 型 deploy にするのがナウでヤングなのだろうが、レガシーアプリケーションに対してそこまでやるのは割に合わない。そのうちコンテナで動くもっとナウでヤングなやつに置き換えるのでこういう雑な対応でお茶を濁すことにした。

注意点

冒頭に書いているけどあくまで上記は EC2 インスタンスの Auto Scaling であり、周辺のミドルウェアは Scaling されない。例えば RDS を使っていたとして、 RDS インスタンスの方は拡張されないので Connection 数が頭打ちになったり、 CPU を使うクエリが沢山流れたりしたらそこがボトルネックになって障害になってしまう。周辺ミドルウェア、インフラ構成に余力を持たせた状態で行う必要がある場合は AWS Auto Scaling の方を使うことになると思う。

所感

1 と 2 のステップはすでに実現できていたので、自分は 3 、 4 、 5 、 6 をやった。オートスケール、めっちゃむずかしいものというイメージを持っていたけど、まぁまぁすんなり行った(二日くらいで大枠はできて、連休後半には実戦投入した)。負荷に応じて EC2 インスタンスがポコポコ増えて、週末の夜にパソコンを持たずに出かけられるようになった。これで家庭円満です。

A summer storm

問題点

Rails でデファクトスタンダードとなっているページネーション gem に Kaminari というのがある。

kaminari/kaminari

kaminari/kaminari

⚡ A Scope & Engine based, clean, powerful, customizable and sophisticated paginator for Ruby webapps - kaminari/kaminari

github.com

めっちゃ最高便利で大好きなのだけど、巨大なテーブルに対して COUNT 文を投げると遅いという問題にぶち当たった。このような巨大なテーブルで Kaminari を使うために COUNT 文を発行しない without_count というメソッドが用意されている( Kaminari 1.0.0 でやってくる 5 つの大きな変更 - Qiita )が、これを使うと next_pageprev_pagetotal_pages が取れなくなる(当たり前)。次のページがあるかどうかはばくち状態になってしまう。

本当は DB のスキーマを見直すべき(インデックスがちゃんと効くようにスキーマ変更するべき)だが、 Rails からもレガシーアプリからも同時に同じ DB にアクセスしており、並行運用しているような状況ではなかなか大胆な変更は実行できない。

DB 構造をなおせないとなるとキャッシュを思いつく。 HTML も Rails でレンダリングするのであれば partial cache などでページャー部分だけをキャッシュすれば良いが、 API 選任野郎と化した Rails ではビューのキャッシュはできない。

どうしたか

total_count をキャッシュする。公開範囲を設定できるようなリソースだと全員同一のキーでキャッシュするわけにはいかないのでユーザーごとにキーを作ってキャッシュする必要あり。全ユーザーの全リクエストでスロークエリになってたやつが 5 分に一回スロークエリになるくらいだったら何とか許容できる。

例えば以下のようなコントローラーがあったとする。 HeavyModel には数千万レコードあって、普通に COUNT 文を投げると遅い。 Paginatable という名前でモジュールを定義して、 render メソッドを上書きし、ページネーションを間に挟み込む。

class HeavyModelController < ApplicationController
  include Paginatable

  before_action :login_required

  def index
    resources = HeavyModel.all
    render json: resources, paginate: true
  end
end

モジュールはこんな感じ。車輪の再発明をしている感はあるが、 COUNT 文の結果が current_user 、コントローラーのクラス名、アクション名のそれぞれをつなげたものをキーにしてキャッシュされる。

module Paginatable
  def render(*args)
    options = args.extract_options!
    resources = options[:json]
    if options[:paginate]
      resources, meta = paginate(resources, cache_total_count: options[:cache_total_count])
      options[:json] = resources
      options[:meta] ||= {}
      options[:meta].merge!(meta)
    end
    args << options
    super(*args)
  end

  def paginate(resources, options = {})
    parse_params_for_pagination
    paginator = Paginator.new(
      resources:         resources,
      page:              @page,
      per:               @per,
      cache_total_count: options[:cache_total_count],
      cache_key:         total_count_cache_key
    )
    [paginator.resources, paginator.meta]
  end

  def total_count_cache_key
    @total_count_cache_key ||= "#{current_user&.id}_#{self.class.name}_#{action_name}_count"
  end

  class Paginator
    attr_reader :cache_total_count

    UNCOUNTABLE = -1

    def initialize(resources:, page:, per:, cache_total_count: false, cache_key: nil)
      @_resources        = resources
      @page              = page.to_i
      @per               = per.to_i
      @cache_total_count = cache_total_count
      @cache_key         = cache_key
    end

    def resources
      @resources ||= if cache_total_count?
                       @_resources.page(page).per(per).without_count
                     else
                       @_resources.page(page).per(per)
                     end
    end

    def page
      @page.zero? ? 1 : @page
    end

    def per
      @per.zero? ? Kaminari.config.default_per_page : @per
    end

    def meta
      {
        current_page: current_page,
        next_page:    next_page,
        prev_page:    prev_page,
        total_pages:  total_pages,
        total_count:  total_count
      }
    end

    alias cache_total_count? cache_total_count

    def paginatable?
      !cache_total_count? && resources.respond_to?(:total_count)
    end

    def current_page
      resources.current_page || page
    end

    def next_page
      paginatable? ? (resources.next_page || UNCOUNTABLE) : next_page_fallback
    end

    def next_page_fallback
      return UNCOUNTABLE if page < 1
      return UNCOUNTABLE if per > resources.length
      total_count_fallback > current_page * per ? current_page + 1 : UNCOUNTABLE
    end

    def prev_page
      paginatable? ? (resources.prev_page || UNCOUNTABLE) : prev_page_fallback
    end

    def prev_page_fallback
      return UNCOUNTABLE if page < 2
      (total_count_fallback.to_f / per).ceil >= page ? current_page - 1 : UNCOUNTABLE
    end

    def total_pages
      paginatable? ? resources.total_pages : (total_count.to_f / per).ceil
    end

    def total_count
      paginatable? ? resources.total_count : total_count_fallback
    end

    def total_count_fallback
      @total_count_fallback ||=
        begin
          cached_total_count = Rails.cache.read(@cache_key)
          if cached_total_count
            cached_total_count
          else
            real_total_count = @_resources.page(page).total_count
            Rails.cache.write(@cache_key, real_total_count, expires_in: 5.minutes)
            real_total_count
          end
        end
    end
  end
end

これで 5 分間はキャッシュが効くようになる。

Say Farewell to Fat Model.png

ルビーオンレイルザーの皆さん、 Fat モデル対策やってますか。 Fat モデル対策と言えば Concern ですね。 app/models/concerns/ ディレクトリに module を置いてモデルに include させるというアレです。

Put chubby models on a diet with concerns

Put chubby models on a diet with concerns

Different models in your Rails application will often share a set of cross-cutting concerns. In Basecamp, we have almost forty such conce...

signalvnoise.com

しかしただ module を作って Fat モデルのコードを移動し、元のモデル側に include させるだけでは結局モデルのインスタンスに生えるメソッドの数に変わりはないので臭いものに蓋をしてるだけになります。 Rubocop の Metrics/LineLength 警告を逃れるためだけの module 乱立はあんまり意味がないでしょう。間違って別の module で同名のメソッドを定義してしまい意図しない挙動になってしまうことも考えられます。

最近自分がやってるのは、 include される module に定義するメソッドはせいぜい一つか二つにして、このメソッドから別クラス( Plain Old Ruby Object)に定義したメソッドを呼び出す(委譲する)というものです。モデルに得体の知れないメソッドが増えないので便利。

例えば以下のようなモデルがあるとします。

  • app/models/entry.rb
  • app/models/comment.rb

両方ともレコードの新規登録があったときに通知を行いたい。共通の処理なので Notifiable モジュールを作ってそれを Entry モデルと Comment モデルでそれぞれ include しましょう。ここまでは皆さんよくやると思います。

  • app/models/concerns/notifiable.rb
module Notifiable
  private

  def notify
    # do something
  end
end
class Entry < ApplicationRecord
  include Notifiable
  has_many :comments
  after_commit :notify, on: :create
end
class Comment < ApplicationRecord
  include Notifiable
  belongs_to :entry
  after_commit :notify, on: :create
end

しかし Entry と Comment では通知内容が異なるので単純に #notify メソッドを callback で実行すればよいというわけではない。通知用パラメーターを生成する処理をモデルに書くとよいのですが、そういうのを繰り返した結果が Fat モデル地獄なので通知内容を生成するクラスを別に作ります。こんな感じ。

  • app/models/concerns/notifiable/entry_notification.rb
  • app/models/concerns/notifiable/comment_notification.rb

Base クラスを作って共通処理をまとめ、継承させると便利でしょう。

  • app/models/concerns/notifications/base_notification.rb
module Notifiable
  class BaseNotification
    def initialize(object)
      @object = object
    end

    def perform
      NotificationJob.perform(notification_params)
    end
  end
end
module Notifiable
  class EntryNotification < BaseNotification
    def notification_params
      {
        recipient_ids: @object.subscriber_ids,
        title: @object.title
      }
    end
  end
end
module Notifiable
  class CommentNotification < BaseNotification
    def notification_params
      {
        recipient_ids: @object.thread_joiner_ids,
        title: @object.body.truncate(25)
      }
    end
  end
end

Notifiable モジュールはこんな感じになります。

module Notifiable
  private

  def notifiy
    notification.perform
  end

  def notification
    "#{self.class}Notification".constantize.new(self)
  end
end

図にするとこんな感じ。

Rails Fat Model Strategy.png

この Notifiable モジュールを include しても Model には #notify メソッドと #notification メソッドしか追加されず、通知処理の実装をモデルから分離することができます。 Entry クラスも Comment クラスも #notify メソッドより先のことは何も気にしなくてよくなる。リソースが追加されたときに #notify メソッドを実行することだけに責任を持てばよいし、通知を飛ばすという処理自体は Notifiable::EntryNotificationNotifiable::CommentNotification クラスの責任になります。

私はこれで Fat モデルのコードを concerns ディレクトリにしまい込んで臭いものに蓋をするような対応におさらばしました。よろしければお試し下さい。またもし他に良い方法をご存じであれば教えて下さい。

仕事面で 2017 年を振り返ると、いろいろやったけど自分でなんか作ったというのはほとんどない。 人のふんどしで相撲をとっていた一年(転職してからは半年強)だと言える。SaaS として提供されているツールを導入したり、 OSS の分析ツールを導入・構築したり、会社の仕組みを調整したりしてただけだった。各ライブラリを作ってくれた人には感謝しかない 🙏🏻

組織方面

  • チーム横断の定例 MTG 働きかけ
    • 人が増えて「あの人何やってるかわからない」「仕事を横からいきなり依頼される」などの問題が出てきたため、チーム横断の定例ミーティングを開催してお互いの状況を確認したり依頼しそうなことがあれば前もって共有するように
  • 全体ミーティングフォーマット整え&司会業
    • かつては社長が考えていることを聞くだけの場だったが、チームごとに資料を作ってみんなで発表し、議論をする場に変えた
  • Slack 導入
    • Slack に変えるまでも別のチャットツール使ってたけど、平アカウントでは大したことできず、窮屈な感じがした
    • Slack は平アカウントでも外部ツール連携したり API 使ってなんかやったりできて便利
    • ベンチャーには平社員でも必要なことをやれるようなシステムの方が向いてると思う。 Slack はデザインだけじゃなくてそういうところが優れている。フラットで雰囲気が明るい。使うのが楽しくなる。
  • Kibela 導入
    • Wiki と Blog 、 Board ( Group )の概念がちょうどよい。 Qiita:Team で厳しかったところが解消されている。
    • Kibela 導入以前、情報共有は Issue Tracker に何でも書いてる感じだったが、 Issue Tracker は Issue Tracker なので close することができない問題を扱うのに向いていない
    • タスクには落とし込めないけど社内で見解を表明しておきたい事柄を社内ブログに書いて問題意識をみんなと共有する文化を構築できた
    • Kibela は PlantUML で図を書けるのがとにかくすばらしい。込み入った処理フローをシーケンス図にすることで設計・実装がはかどる。
  • HRT について説く
  • OKR 導入
    • OKR を設定してやっていきましょうという風にした
      • とはいえ自分は HR の専門家ではないのでちゃんと運用して行くにはそういう人に入ってもらわないと厳しいと思ってる

エンジニアリング方面

  • t_wada (テスト文化根付かせ)業
    • No Test, No Merge
  • CI が回る仕組み構築業
    • テストコードは別の人が書いてたけど回せてなくて fail しっ放しになってたので気合いで通るようにして CircleCI で Pull Request ごとにビルドするようにした✌🏻
  • Pull Request テンプレート導入
    • どんな問題を解決する Pull Request なのか、何をやったのか、完了条件を明記する✅
  • Pull Request レビューフォーマット提案
  • gitignore されていた Gemfile.lock をリポジトリに突っ込み業
    • Gemfile でバージョンが固定されてた😢
  • Embulk で分析用データ書き出し業
  • autodoc で API ドキュメント自動生成の仕組み構築
  • Git Flow から GitHub Flow へブランチ戦略変更
    • 1日に何回もデプロイするような製品はこっちの方が向いてる
  • Rubocop 導入& .rubocop.yml 番人業
    • 👮🏻‍♂️
  • CircleCI から勝手に deploy される仕組み導入
  • docker-compose 導入
    • Docker は使われてたがクラスターの管理は手運用だったので docker-compose 使うようにした
  • AWS ECS 導入
  • 社内 Gyazo 導入
  • Redash 導入
    • 経営陣しか数値に関心を持ってなかったので全員が見るように毎朝 Slack に KPI を通知するようにした
    • 複雑なクエリを組んでテーブルごとに値を集計しているだけでは見えてこない値を追えるようになった
    • 独自に KPI/KGI を設定して Growth Hack に取り組むエンジニアも
  • リードレプリカ作ってデータ分析がやりやすくなる仕組み導入
    • RDS で Multi AZ にはなっていたがリードレプリカがなく重いクエリを投げられなかった
    • 複雑な JOIN クエリも書けるようになりデータ分析し放題
    • 来年は BigQuery とかも使えるようにしてさらに分析が捗るようにしたい
  • Itamae でプロビジョニング( Linux アカウントの管理)
    • Itamae 一発で Linux アカウント追加できるようにしてサーバーサイドのエンジニアしか DB にクエリを投げられない状況を改善
  • cronbot 導入
    • 他人が作ったスケジュールも更新できて便利
    • KPI 通知は redashbot と cronbot を組み合わせて実現
  • iOS と Android のダウンロード数自動取得
    • iOS 側はタイムゾーンがずれる、 Android 側は更新が異常に遅いという問題があるものの、ある程度の目安となる数値が毎朝自動で Slack に通知されるように
  • お問い合わせがあったときに Slack に通知する仕組み導入
    • お問い合わせはカスタマーサポートの人が一手に引き受ける感じだったけどみんなが関心を持って見るようになった
    • カスタマーサポートの人からエスカレーションされる前にエンジニアが回答
    • 不具合あったときはいち早く対応可能に
  • Ruby app の前段に CloudFront 導入
    • app サーバーへのリクエストが半減
    • Nginx でキャッシュしきれてなかった静的ファイルを CloudFront でキャッシュするようになり爆速に
  • サイト全面 HTTPS 化
    • CSS/JS が並列で配信されるようになり爆速に

自分でまともな OSS を作れないことにコンプレックスを感じていた時期もあったが( OSS コミュニティでの活動が評価軸となるような職場では全然評価されない)、自分が作れなくても他の人が作ってくれるので、それをいかに組み合わせて有効活用し、価値を生み出せるかに注力すればいいかなぁと思うようになった。

もちろん、 OSS 使っててバグを見つけたり不便なところあったら改善する Pull Request なんかは出していきたいと思ってる。ただ自分は頭がよくないし、抽象的な思考は苦手で個別具体的なコードを書くことしかできないので、自分で OSS を生み出すことは諦めて個別具体的な事象に特化してやっていく方が自分的にも世の中的にも幸せだよね、という風に割り切れるようになった。

こういう割り切りができるようになったのは Kaizen Platform で仕事する機会を得たからだよなぁと思う。 OSS への考え方に限らず、コードを書く部分以外で組織を変革したりだとかオペレーションの仕組みを変えたりだとかは全部 Kaizen Platform で学んだ気がする。1年11ヶ月と短い期間だったけれど、いまの自分の血となり肉となっていると思う。

Kaizen を辞めたときの記事で以下のように書いてたけど、いまのところ失敗を糧にしていい方向に向かってるのではないかと思う。

Kaizen でのリモートワーク失敗経験をどう今後の人生に生かすか。以下のツイートを繰り返し眺めながら悔い改めていきたいと思う。

Kaizen Platform という会社について - portal shit!

というわけでいまは YAMAP という会社で働いています。元同僚の pyama86 さんに比べたら知名度では全然負けててミジンコみたいなもんだと思うけど、そのうち逆転できるようにプロダクトの完成度を高めていって pyama86 さんの方が YAMAP のパクりであるような雰囲気を醸成していきたい。今後ともよろしくお願いいたします🙏🏻

YAMAP

cho45 さんの以下の記事を参考に関連記事を表示するようにしてみた。

TF-IDFとコサイン類似度による類似エントリー機能の実装 | tech - 氾濫原

TF-IDFとコサイン類似度による類似エントリー機能の実装 | tech - 氾濫原

lowreal.net

ほとんど cho45 さんの記事に書いてある SQL を実行しているだけだけど、関連記事の表示用に Lokka 側に Similarity というモデルを追加して、以下のようなスキーマにしてる。

similar-entries-erd.png

Similarity テーブルの更新は cho45 さんの記事にあるように SQLite で行った計算の結果を反映することで行う。以下のような Rake タスクを定義した。

desc "Detect and update similar entries"
task similar_entries: %i[similar_entries:extract_term similar_entries:vector_normalize similar_entries:export]

namespace :similar_entries do
  require 'sqlite3'
  desc "Extract term"
  task :extract_term do
    require 'natto'
    nm = Natto::MeCab.new
    db = SQLite3::Database.new('db/tfidf.sqlite3')
    create_table_sql =<<~SQL
      DROP TABLE IF EXISTS tfidf;
      CREATE TABLE tfidf (
        `id` INTEGER PRIMARY KEY,
        `term` TEXT NOT NULL,
        `entry_id` INTEGER NOT NULL,
        `term_count` INTEGER NOT NULL DEFAULT 0, -- エントリ内でのターム出現回数
        `tfidf` FLOAT NOT NULL DEFAULT 0, -- 正規化前の TF-IDF
        `tfidf_n` FLOAT NOT NULL DEFAULT 0 -- ベクトル正規化した TF-IDF
      );
      CREATE UNIQUE INDEX index_tf_term ON tfidf (`term`, `entry_id`);
      CREATE INDEX index_tf_entry_id ON tfidf (`entry_id`);
    SQL
    db.execute_batch(create_table_sql)

    entries = Entry.published.all(fields: [:id, :body])
    entry_frequencies = {}
    entries.each do |entry|
      words = []
      body_cleansed = entry.body.
        gsub(/<.+?>/, '').
        gsub(/!?\[.+?\)/, '').
        gsub(/(```|<code>).+?(```|<\/code>)/m, '')
      begin
        nm.parse(body_cleansed) do |n|
          next if !n.feature.match(/名詞/)
          next if n.feature.match(/(サ変接続|数)/)
          next if n.surface.match(/\A([a-z][0-9]|\p{hiragana}|\p{katakana})\Z/i)
          next if %w[これ こと とき よう そう やつ とこ ところ 用 もの はず みたい たち いま 後 確か 中 気 方 頃 上 先 点 前 一 内 lt gt ここ なか どこ まま わけ ため 的 それ あと].include?(n.surface)
          words << n.surface
        end
      rescue ArgumentError
        next
      end
      frequency = words.inject(Hash.new(0)) {|sum, word| sum[word] += 1; sum }
      entry_frequencies[entry.id] = frequency
    end
    entry_frequencies.each do |entry_id, frequency|
      frequency.each do |word, count|
        db.execute("INSERT INTO tfidf (`term`, `entry_id`, `term_count`) VALUES (?, ?, ?)", [word, entry_id, count])
      end
    end
  end

  desc "Vector Normalize"
  task :vector_normalize do
    db = SQLite3::Database.new('db/tfidf.sqlite3')

    load_extension_sql =<<~SQL
      -- SQRT や LOG を使いたいので
      SELECT load_extension('/usr/local/Cellar/sqlite/3.21.0/lib/libsqlitefunctions.dylib');
    SQL
    db.enable_load_extension(true)
    db.execute(load_extension_sql)

    update_tfidf_column_sql = <<~SQL
      -- エントリ数をカウントしておきます
      -- SQLite には変数がないので一時テーブルにいれます
      CREATE TEMPORARY TABLE entry_total AS
          SELECT CAST(COUNT(DISTINCT entry_id) AS REAL) AS value FROM tfidf;

      -- ワード(ターム)が出てくるエントリ数を数えておきます
      -- term と entry_id でユニークなテーブルなのでこれでエントリ数になります
      CREATE TEMPORARY TABLE term_counts AS
          SELECT term, CAST(COUNT(*) AS REAL) AS cnt FROM tfidf GROUP BY term;
      CREATE INDEX temp.term_counts_term ON term_counts (term);

      -- エントリごとの合計ワード数を数えておきます
      CREATE TEMPORARY TABLE entry_term_counts AS
          SELECT entry_id, LOG(CAST(SUM(term_count) AS REAL)) AS cnt FROM tfidf GROUP BY entry_id;
      CREATE INDEX temp.entry_term_counts_entry_id ON entry_term_counts (entry_id);

      -- TF-IDF を計算して埋めます
      -- ここまでで作った一時テーブルからひいて計算しています。
      UPDATE tfidf SET tfidf = IFNULL(
          -- tf (normalized with Harman method)
          (
              LOG(CAST(term_count AS REAL) + 1) -- term_count in an entry
              /
              (SELECT cnt FROM entry_term_counts WHERE entry_term_counts.entry_id = tfidf.entry_id) -- total term count in an entry
          )
          *
          -- idf (normalized with Sparck Jones method)
          (1 + LOG(
              (SELECT value FROM entry_total) -- total
              /
              (SELECT cnt FROM term_counts WHERE term_counts.term = tfidf.term) -- term entry count
          ))
      , 0.0);
    SQL
    db.execute_batch(update_tfidf_column_sql)

    vector_normalize_sql = <<~SQL
      -- エントリごとのTF-IDFのベクトルの大きさを求めておきます
      CREATE TEMPORARY TABLE tfidf_size AS
          SELECT entry_id, SQRT(SUM(tfidf * tfidf)) AS size FROM tfidf
          GROUP BY entry_id;
      CREATE INDEX temp.tfidf_size_entry_id ON tfidf_size (entry_id);

      -- 計算済みの TF-IDF をベクトルの大きさで割って正規化します
      UPDATE tfidf SET tfidf_n = IFNULL(tfidf / (SELECT size FROM tfidf_size WHERE entry_id = tfidf.entry_id), 0.0);
    SQL
    db.execute_batch(vector_normalize_sql)
  end

  desc "Export calculation result to MySQL"
  task :export do
    db = SQLite3::Database.new('db/tfidf.sqlite3')
    create_similar_candidate_sql = <<~SQL
      DROP TABLE IF EXISTS similar_candidate;
      DROP INDEX IF EXISTS index_sc_parent_id;
      DROP INDEX IF EXISTS index_sc_entry_id;
      DROP INDEX IF EXISTS index_sc_cnt;
      CREATE TABLE similar_candidate (
        `id` INTEGER PRIMARY KEY,
        `parent_id` INTEGER NOT NULL,
        `entry_id` INTEGER NOT NULL,
        `cnt` INTEGER NOT NULL DEFAULT 0
      );
      CREATE INDEX index_sc_parent_id ON similar_candidate (parent_id);
      CREATE INDEX index_sc_entry_id ON similar_candidate (entry_id);
      CREATE INDEX index_sc_cnt ON similar_candidate (cnt);
    SQL
    db.execute_batch(create_similar_candidate_sql)

    extract_similar_entries_sql = <<~SQL
      -- 類似していそうなエントリを共通語ベースでまず100エントリほど出します
      INSERT INTO similar_candidate (`parent_id`, `entry_id`, `cnt`)
          SELECT ? as parent_id, entry_id, COUNT(*) as cnt FROM tfidf
          WHERE
              entry_id <> ? AND
              term IN (
                  SELECT term FROM tfidf WHERE entry_id = ?
                  ORDER BY tfidf DESC
                  LIMIT 50
              )
          GROUP BY entry_id
          HAVING cnt > 3
          ORDER BY cnt DESC
          LIMIT 100;
    SQL

    search_similar_entries_sql = <<~SQL
      -- 該当する100件に対してスコアを計算してソートします
      SELECT
          ? AS entry_id,
          entry_id AS similar_entry_id,
          SUM(a.tfidf_n * b.tfidf_n) AS score
      FROM (
          (SELECT term, tfidf_n FROM tfidf WHERE entry_id = ? ORDER BY tfidf DESC LIMIT 50) as a
          INNER JOIN
          (SELECT entry_id, term, tfidf_n FROM tfidf WHERE entry_id IN (SELECT entry_id FROM similar_candidate WHERE parent_id = ?)) as b
          ON
          a.term = b.term
      )
      WHERE similar_entry_id <> ?
      GROUP BY entry_id
      ORDER BY score DESC
      LIMIT 10;
    SQL

    results = {}
    Entry.published.all(fields: [:id]).each do |entry|
      db.execute(extract_similar_entries_sql, [entry.id, entry.id, entry.id])
      db.results_as_hash = true
      similarities = db.execute(search_similar_entries_sql, [entry.id, entry.id, entry.id, entry.id])
      results[entry.id] = similarities
    end

    Similarity.destroy

    results.each do |entry_id, similarities|
      if similarities.present?
        similarities.each do |s|
          conditions = { entry_id: s["entry_id"], similar_entry_id: s["similar_entry_id"] }
          similarity = Similarity.new(conditions)
          similarity.score = s["score"]
          similarity.save
        end
      end
    end
  end
end

やってることとしては、全エントリーを拾ってきて本文を MeCab で品詞分解して名詞だけを取り出し記事ごとの term 一覧を作り、そこから TF-IDF を求めてベクトル正規化し、最後に関連していそうなエントリを探し出して similarities テーブル(こちらは SQLite のテーブルではない)を更新している。詳しいアルゴリズムはバカなのでわからないが、 cho45 さんが書いているやり方を Lokka のスキーマに素直に適用した感じ。

結構この処理は遅いので parallel.gem を使って高速化できないか試してみたが、スレッドによる並行処理ではあまり速くできなかった。 4 コアある CPU のうち一つが 100% で処理を実行してもまだ 3 コアは余っている。プロセスを増やして並列処理するのがよさそうだが、分散をプロセスレベルで行おうとすると MySQL server has gone というエラーが出る。 DataMapper が MySQL とのコネクションをロストするようである。 ActiveRecord であれば reconnect するだとか回避方法があるようなのだけど DataMapper は情報が少なく、対応方法が見つけられなかったので一旦並列処理はあきらめた。

何回か動かしてみて大体正しく関連記事を表示できてそうなのでさくらの VPS で稼働させたいところなのだけど、関連記事の更新はいまのところ手動でやっている。本番 DB の entries テーブルを dump してきて Mac に取り込み、 similarities テーブルを更新して今度はローカルで similarities テーブルを dump して本番にインポートするという手順をとっている。

これにはいろいろ理由があって、一つには利用している mecab-ipadic-neologd (新語にも対応している MeCab の辞書)が空きメモリ 1.5GB 以上でないとインストールできずさくらの VPS にインストールできなかったから。もう一つには cho45 さんのブログにもあるけど SQLite で LOGSQRT を使うためには libsqlitefunction.so の読み込みが必要で、 load_extension() できるようにしないといけないが、そのためには sqlite3 をソースからビルドする必要があり若干面倒だった( Mac では Homebrew で sqlite を入れた)。

関連記事の更新は自分が記事を書いたときにしか発生しないのでいまの手動運用でもまぁ問題ないが、このブログは Docker でも動くようにしてあるので Docker イメージを作ればさくら VPS でも問題なく動かせそうな気はする。正月休みにでもチャレンジしたい。

感想

関連記事表示、結構面白くてちゃんと関連性の高いエントリーが表示される。例えば人吉に SL に乗り行った記事の関連記事にはちゃんと山口に SL に乗りに行ったときの記事 が表示される。いまのところ Google Adsense の関連コンテンツよりも精度が高いようである。

無限に自分の黒歴史を掘り返すことができるのでおすすめです。

DEAD FISH

fish-shell に移行 してこれで .zshrc のお守り業から解放されたと思ってたが、最近異常にシェルの新規セッションの開始が遅くて死にそうになってた。特にやばいのが git mergetool したときで、これは内部的には沢山の fish のプロセスが起動して diff を調べて最終的に vimdiff で表示しているように見えた(僕は git config editor=vim です)。下手するとちょっとしたコンフリクトを修正するために vimdiff が起動するまで 15 分くらい待たないといけないことがあって、 20 年くらい前の Photoshop 作業の現場1みたいな感じになってた。これはかなりやばくて、生産性ががた落ちになる。 vimdiff が開くまでに待っている間に他のことやり始めて、気がつくと平気で夕方になってたりする。

fish-shell のバージョンを 2.7.0 に上げたことが原因かと思って、かつて快適に使えていた fish のバージョンに下げたりしてみたが解決しなかったが、以下の記事にたどり着いて $fish_user_paths への値の push をやめたところ解決した。

fish shellの起動が遅くなった時の解決方法 - Qiita

fish shellの起動が遅くなった時の解決方法 - Qiita

# TL;DR - fish shellの起動に3〜4秒ぐらいかかるようになった。 - `config.fish`の`$fish_user_paths`の設定方法に問題があった。 - この問題の原因と解決方法をまとめた。 # 何が...

qiita.com

fish-shell での PATH の通し方として以下のようなのがよく出てくる。 Homebrew で keg only なやつを入れたときにも表示される。

set -U fish_user_paths $HOME/Library/Python/2.7/bin $fish_user_paths

しかしこれをやると $fish_user_paths にどんどんパスが積まれていってしまう。どうもこれが原因で遅くなるようだった。自分の環境でも echo $fish_user_paths してみたところかなり酷いことになっていた…。

↑の Qiita 記事では set -U fish_user_paths するときに第三引数を消せとあったが、それでは根本的な問題の解決にならない( Python やら Ruby やらいろんなものの実行ファイルを PATH に通したいはず)。自分は以下の方法で解決した。

set -x PATH /usr/local/bin $PATH

bash や zsh で export PATH=/usr/local/bin:$PATH とやるのと同じオーソドックスなやり方だと思う。

これだけではこれまでに散々肥えた $fish_user_paths は残ったままなので適当に set -U fish_user_paths '' とかやってあげると空になって起動が速くなる。

fish-shell の set -U はセッションをまたいでグローバルかつ永続的に設定される変数定義の方法っぽいので下手するとその環境で fish を使い始めてからずーっと残ってしまう可能性がある(端末を再起動したらリセットされるかも知れないけど)。

まとめ

  • set -U は基本的にしない
  • $fish_user_paths は設定ファイルの中では使わない
  • PATH を通したいときは set -x PATH を使う

  1. 何らかの処理を実行して処理が完了されるまでにめっちゃ時間がかかるのでその間にたばこを吸いに行くことが可能だったらしい