Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tubone24 committed Dec 31, 2024
1 parent 7eee3bd commit fbeaa0a
Showing 1 changed file with 75 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ LLMそのものにはあまり興味が持てなかった私ですが、案件

![Langfuse v2 Architecture AWS](https://i.imgur.com/RWzEbMz.jpg)

特徴として、**Langfuse Server**[AWS App Runner](https://aws.amazon.com/jp/apprunner/)でデプロイしており、データベースは[Amazon Aurora serverless v2](https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.html)で構築、それぞれのつなぎ込みはVPC Connectorを利用するというかなりシンプルな構成にしていました
特徴として、**Langfuse Server**[AWS App Runner](https://aws.amazon.com/jp/apprunner/)でデプロイしており、データベースは[Amazon Aurora serverless v2](https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.html)で構築、それぞれのつなぎ込みは[VPC Connector](https://docs.aws.amazon.com/ja_jp/apprunner/latest/dg/network-vpc.html)を利用するというかなりシンプルな構成にしていました

ありがたいことにLangfuse v2はNext.jsで構築されていることから**リクエストベースでコンテナが実行されればよいため**、(常駐する必要がないため)コスト最適化の観点でも**AWS App Runner**でデプロイしておりました。

Expand All @@ -59,7 +59,7 @@ LLMそのものにはあまり興味が持てなかった私ですが、案件

さて、話をLangfuse v3に戻します。

[Langfuse v3がGA]((<https://langfuse.com/changelog/2024-12-09-Langfuse-v3-stable-release>))したということで、早く使ってみたい気持ちがはやりますが、すぐに移行できる代物ではなかったのです
[Langfuse v3がGA]((<https://langfuse.com/changelog/2024-12-09-Langfuse-v3-stable-release>))したということで、早く使ってみたい気持ちがはやりますが、**すぐに移行できる代物ではなかった**のです

Langfuse v3はLangfuse v2とは全く異なるアーキテクチャになっていました。

Expand Down Expand Up @@ -100,11 +100,11 @@ LLMアプリケーション自体の数が増えたり、利用者が増えた

結果として、Langfuse v2のアーキテクチャでは増えゆく需要に対して、**スケーラビリティが追いつかなくなってきた**ということです。

![昨今のLLMアプリケーションのObservation特性](https://i.imgur.com/W0NoiX1.png)
![昨今のLLMアプリケーションのObservation特性](https://i.imgur.com/gJGFk0v.png)

### プロンプトマネジメントAPIの低レイテンシー要件

Langfuseには[プロンプトマネジメント](https://langfuse.com/docs/prompts/get-started)があります。
Langfuseには[プロンプトマネジメント機能](https://langfuse.com/docs/prompts/get-started)があります。

プロダクションコードからプロンプトを切り出すことで、LLMアプリケーションの**再デプロイをせずとも**バージョン管理されたプロンプトを画面から任意のタイミングで差し替えできる機能のことで**近年のLLMアプリケーション開発にはなくてはならない機能**となってきました。

Expand Down Expand Up @@ -144,7 +144,7 @@ Ingestionはプロダクションコードのなかで**非同期・ノンブロ

さて、そんな課題を解決するためにLangfuse v3のアーキテクチャはどのようになっているのでしょうか?

少し[公式ブログ](https://langfuse.com/blog/2024-12-langfuse-v3-infrastructure-evolution)[実際のコード](https://github.com/langfuse/langfuse)からDeep Diveしてみます
少し[公式ブログ](https://langfuse.com/blog/2024-12-langfuse-v3-infrastructure-evolution)[実際のコード](https://github.com/langfuse/langfuse)から**Deep Dive**してみます

(ここでは記載を省きますが、[Langfuse Cloud版](https://langfuse.com/faq/tag/cloud)として取り組んだプロンプトマネジメントAPIをALBのターゲットグループで分けるなどのアーキテクチャ改善のお話もとてもおもしろいのでぜひ[公式ブログ](https://langfuse.com/blog/2024-12-langfuse-v3-infrastructure-evolution)をくまなく読んでいただくことをおすすめします!!!)

Expand Down Expand Up @@ -225,15 +225,15 @@ ENGINE = ReplacingMergeTree()

Langfuse v3では、この問題をAsync Worker内部での**マージ処理**によって解決しています。

Workerはひとまとまりのイベントを処理する際に、ClickHouseに書き込みをする前に**内部的にマージ処理**を行ない、レコードの一貫性を保つ状態を作り出し、ClickHouseに書き込みを行なっています。少し複雑なので図にて説明します。
Async Workerはひとまとまりのイベントを処理する際に、ClickHouseに書き込みをする前に**内部的にマージ処理**を行ない、レコードの一貫性を保つ状態を作り出し、ClickHouseに書き込みを行なっています。少し複雑なので図にて説明します。

![Workerでのマージ処理によるレコードの一貫性の保持](https://i.imgur.com/m5qsPe9.png)

イベントのOLTP書き込み非同期化にて記載の通り、イベントの書き込みは**Redisをキューとして挟むことで非同期化**されています。

ただし、正確には**Blob Storage(S3)にイベント全データをJSONで格納**し、Redisには**イベントのIDのみを格納**しています。
ただし、正確には**Blob Storage(S3)にイベント全データをJSONで格納**し、Redisには**イベントのIDのみを格納**しています。(①②)

これはLLMへのコンテキストや生成結果をRedisに保存してしまうとRedisの容量が足りなくなるためと公式ブログで語られていました。(①②)
これはLLMへのコンテキストや生成結果をRedisに保存してしまうとRedisの容量が足りなくなるためと公式ブログで語られていました。

その後Async WorkerはイベントのIDをRedisから取得し、S3からイベント全データを取得します。(③④)

Expand All @@ -243,7 +243,20 @@ Workerはひとまとまりのイベントを処理する際に、ClickHouseに

そして、ClickHouseに書き込みを行ないます。(⑧)

こうすることで、ClickHouseのReplacingMergeTreeの仕組みを使いつつ、短時間の断続したレコード更新の際でも、Worker内部でのマージ処理によってレコードの一貫性を保つことができるようになっています。素晴らしい...。
こうすることで、ClickHouseの**ReplacingMergeTreeの仕組みを使いつつ**、短時間の断続したレコード更新の際でも、Worker内部でのマージ処理によって**レコードの一貫性を保つ**ことができるようになっています。素晴らしい...。

このあたりの処理は複雑なのでぜひWeb、Async Workerの処理機構を一度コードでじっくり読むと良いでしょう。ざっくり面白いところは下記です。

- [WebでS3へのイベントアップロード処理](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/packages/shared/src/server/ingestion/processEventBatch.ts#L172)
- [エラーになった場合は連携されたデータすべてをRedis経由で連携している(OLTPのみ)](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/packages/shared/src/server/ingestion/processEventBatch.ts#L271)
- [WebでBullMQを駆使してRedisにイベントIDを登録](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/packages/shared/src/server/ingestion/processEventBatch.ts#L216)
- **LANGFUSE_INGESTION_QUEUE_DELAY_MS**に従ってDelayingしながら登録している
- [Async Workerのキュー取得エントリーポイント](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/worker/src/queues/ingestionQueue.ts#L38)
- このなかでS3からデータダウンロード・Redisに連携されたイベントタイプからそれぞれの処理に進んでいる
- [TraceのAsync Worker処理](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/worker/src/services/IngestionService/index.ts#L208)
- [ObservationのAsync Worker処理](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/worker/src/services/IngestionService/index.ts#L321)
- [mergeObservationRecords](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/worker/src/services/IngestionService/index.ts#L462)で⑥の処理を実施している
- [ClickHouseへの書き込みキュー登録処理](https://github.com/langfuse/langfuse/blob/33fddb134450af5c149c8205f42c440abebbe0c1/worker/src/services/ClickhouseWriter/index.ts#L172)

## ここまでのまとめ

Expand Down Expand Up @@ -281,7 +294,7 @@ Langfuse v3のアーキテクチャはLangfuse v2のアーキテクチャが抱

まずv2->v3の移行については[Migrate Langfuse v2 to v3](https://langfuse.com/self-hosting/upgrade-guides/upgrade-v2-to-v3)を参考にしました。

Langfuse v3.0.0の1つ前のv2バージョンがv2.93.7でしたので、まずはLangfuse v2.93.7を最新バージョンにアップデートしました
Langfuse v3.0.0の1つ前のv2バージョンが**v2.93.7**でしたので、まずはLangfuse **v2.93.7**にアップデートしました

Before
![Langfuse v2.92.0](https://i.imgur.com/IKTkzIK.png)
Expand Down Expand Up @@ -322,11 +335,11 @@ ALTER TABLE "traces" DROP CONSTRAINT "traces_session_id_project_id_fkey";

丁寧に移行するなら**v2のApp Runner****v3のApp Runner**を同時に動かし、v3のApp Runnerに**トラフィックを切り替える**のがよいと思いますが、私はめんどくさいので下記の方法を取りました。

- v2のApp RunnerAmazon Aurora serverless v2を動かし続ける
- v2の**AWS App Runner****Amazon Aurora serverless v2**を動かし続ける
- v3で新規に必要になった各種インフラを作成する
- WorkerはRedisのキュー契機で動くため、v3のイメージを当てておく
- WorkerはRedisのキュー契機で動くため、v3のイメージを当てても問題はない。
- v3のすべてのインフラが問題なく動いたことを確認しApp Runnerのイメージをv3に切り替える
- ここでOLTPのマイグレーションが走る
- ここで**OLTP/OLAPのマイグレーション**が走る

図にするとこんな感じです。

Expand Down Expand Up @@ -354,33 +367,33 @@ ClickHouse自体は**外(VPC外)から直接叩く要件はなく**、かつ

Service Connectを使うにはすべてのサービスをECS化する必要があります。これではLangfuse v2のApp Runnerで動かしているLangfuse ServerをECSに置き換えなくてはいけません。

![Langfuse ServerをECS化](https://i.imgur.com/ykoJo4al.png)
![Langfuse ServerをECS化](https://i.imgur.com/VYbMEY3l.png)

Web Serverはスパイクアクセスにも柔軟に対応させつつ、動いていないときのコストを最小限にしたかったので、なんとかApp Runner構成を残す道を考えました。

そこで、[Amazon ECS Service Discovery](https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/service-discovery.html)を使ってClickHouseのプライベートDNSを設定し、Langfuse Web Server、Async Workerは**プライベートDNSの名前解決**を実施することで直接ClickHouseにアクセスできるようにします。

![ClickHouseへのアクセス経路](https://i.imgur.com/kHp2OOtl.png)
![ClickHouseへのアクセス経路](https://i.imgur.com/kHp2OOth.png)

### ClickHouseのデータ永続化

もう1つの課題は**ClickHouseのデータ永続化**でした。

なるべくインフラをマネージドサービスで構築したかったので、ClickHouseのコンピュートリソースはECS Fargateで構築することにしましたが課題はデータの永続化です。

ECS Fargateの**エフェメラルストレージ**はタスクが終了するとデータが消えてしまうため、ClickHouseのデータの永続化が課題となりました
ECS Fargateの**エフェメラルストレージ**はタスクが終了すると**データが消えてしまう**ため、**ClickHouseのデータの永続化が課題**となりました

そこで、今回は[Amazon EFS](https://aws.amazon.com/jp/efs/)を使ってClickHouseのデータを永続化することにしました。

![ClickHouseのデータ永続化](https://i.imgur.com/yekxuN3l.png)
![ClickHouseのデータ永続化](https://i.imgur.com/yekxuN3h.png)

#### 余談: EFSのマウント失敗

完全に余談ですが、構築で検証中に**EFSのマウントがうまくできない問題**にぶつかりました。

理由はEFSをマウントする際に、EFSがroot, ClickHouseがClickHouseユーザーでマウントしていたためアクセスできないことやEFSのセキュリティグループがClickHouseのセキュリティグループに許可されていなかったことが原因でした
理由はEFSをマウントする際に、EFSがrootユーザーのアクセス権限のところ、ClickHouseコンテナがClickHouseユーザーでマウントしていたため**アクセス権が不正で怒られていた**ことやEFSの**セキュリティグループがClickHouseサービスのセキュリティグループに許可されていなかった**ことが原因でした

それ自体は大したことはないのですが、いかんせんタスクデプロイのときに出る失敗ログのみで原因にたどり着くのがEFS初心者としてはとても大変でした。
それ自体は原因さえわかってしまえば修正は大したことはないのですが、いかんせんタスクデプロイのときに出る失敗ログのみで原因にたどり着くのがEFS初心者としてはとても大変でした。

![EFSのマウント失敗](https://i.imgur.com/AuhZn4M.png)

Expand All @@ -400,7 +413,7 @@ line 373, in check_call raise CalledProcessError(retcode, cmd) subprocess.Called

これがLangfuseでは通常の運用としてはサポートされていないため、**データのリカバリーは自分で行なう必要がありました。**

Langfuse v3にアップデートをすると、v2時代に蓄積されたイベントをv3のClickHouseにマイグレーションする必要がありますので、別途[Background Migrations](https://langfuse.com/self-hosting/background-migrations)という仕組みがあります
Langfuse v3にアップデートをすると、v2時代に蓄積されたイベントをv3のClickHouseにマイグレーションする必要がありますので、別途[Background Migrations](https://langfuse.com/self-hosting/background-migrations)という仕組みがLangfuseにあります

![badge](https://i.imgur.com/j7YZLwB.png)

Expand All @@ -410,7 +423,7 @@ Langfuse v3にアップデートをすると、v2時代に蓄積されたイベ

![background migration](https://i.imgur.com/NlPszng.png)

お気づきかもしれませんが、一度成功したマイグレーションについては再実行ができません!
お気づきかもしれませんが、**一度成功したマイグレーションについては再実行ができません!**強制実行もできません!**

上記のキャプチャでは実は永続化前のClickHouseにマイグレーションを実施したあと、永続化に成功したClickHouseに接続し直した直後なのですが、いくつかのマイグレーションが完了しており、もう二度と古いデータにアクセスできなくなってしまったと絶望しました。

Expand Down Expand Up @@ -444,6 +457,47 @@ UPDATE background_migrations SET failed_at = '2024-12-28 12:00:00', failed_reaso

(ClickHouseをセルフホステッドからマーケットプレイス経由でCloud版に契約変更する際に参考になるかもしれません。)

## おまけ(ClickHouseに保存されたデータを見てみる)

[tubone24/langfuse-v3-terraform](https://github.com/tubone24/langfuse-v3-terraform)にはClickHouseのデータを見るための**Grafana**も含まれています。

**AWS App Runner**で構築され、構築時にClickHouseの接続情報に関するYAMLを同梱したDockerイメージを使っています。

GitHub Actionsをお使いであればこんな感じでDockerfileを作ってECRにPushすれば、App Runnerに設定した環境変数・シークレットから接続情報がセットされた形で立ち上がります。

```yaml
- name: Create Grafana Docker context
run: |
mkdir -p grafana
cat << EOF > grafana/Dockerfile
FROM grafana/grafana:latest
RUN grafana-cli plugins install grafana-clickhouse-datasource
COPY datasources.yaml /etc/grafana/provisioning/datasources/
EOF
cat << EOF > grafana/datasources.yaml
apiVersion: 1
datasources:
- name: ClickHouse
type: grafana-clickhouse-datasource
access: proxy
url: http://\${CLICKHOUSE_HOST}:8123
jsonData:
defaultDatabase: default
port: 8123
username: \${CLICKHOUSE_USER}
protocol: http
secureJsonData:
password: \${CLICKHOUSE_PASSWORD}
version: 1
editable: true
EOF
- name: Build and push Grafana image
run: |
docker build -t ${{ env.ECR_URL }}/grafana:latest ./grafana
aws ecr batch-delete-image --repository-name grafana --image-ids imageTag=latest || true
docker push ${{ env.ECR_URL }}/grafana:latest
```
## まとめ
大変長くなりましたが、Langfuse v3のアーキテクチャのDeepDiveとAWSマネージドサービスで構築するためにはどのような工夫が必要だったかをご紹介しました。
Expand Down

0 comments on commit fbeaa0a

Please sign in to comment.