Fluentdとは?
Fluentd はオープンソースのデータコレクターです。 データソースとバックエンドの間に位置することで、両方の環境を独立させつつログ収集を行うことができます。
社内で Fluentd に触れる機会が多かったため、今回取り上げました。 Fluentd は簡単に動作確認ができ、さまざまなプラグインを利用できます。今回はその中から Buffer プラグインを試してみます。
この内容は、まだ Fluentd に触れたことがない、知らない人へ向けたものです。 記事内の手順で手元で動かすこともできるので、気になる方は実際に触ってみてください。
Fluentdの動作確認
動作確認は、以下の環境を使用して行いました。
- WSL2(Amazon Linux 2023)
- Docker version 25.0.3
Fluentd については Docker イメージを使用します。
まず、Fluentd の設定ファイル(fluentd.conf)を用意し、次にディレクトリを作成して準備します。 準備するディレクトリ名を変数にしていますので、お好みの名前にしてください。
$ directory_name=fluentd_test_tmp $ mkdir ~/$directory_name
ファイルの内容は以下になります。 設定ファイルの中身については、後ほど見ていきます。
<source> @type http port 9880 bind 0.0.0.0 </source> <match **> @type stdout </match> <label @FLUENT_LOG> <match **> @type stdout </match> </label>
お好みのテキストエディタで設定ファイルを作成してください。
$ vim ~/$directory_name/fluentd.conf
設定ファイルが用意できたので、実際に動かしてみます。 今回は Docker イメージを使用して動作確認を行います。
以下を実行して起動します。
$ docker run -p 9880:9880 -d --name test_fluentd -v ~/$directory_name:/fluentd/etc fluent/fluentd:edge-debian -c /fluentd/etc/fluentd.conf
テスト用のログを送ってみましょう。 curl で JSON を送信します。
$ curl -X POST -d 'json={"json":"message"}' http://127.0.0.1:9880/sample.test
それでは、実際にログがどのように出力されるかを確認してみます。
$ docker logs test_fluentd
以下は実行例です。 上記の手順に従えば、以下とほぼ同じ内容が表示されると思います。
fluentd -c /fluentd/etc/fluentd.conf 2024-11-20 11:03:30 +0000 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil 2024-11-20 11:03:30 +0000 [info]: parsing config file is succeeded path="/fluentd/etc/fluentd.conf" 2024-11-20 11:03:30 +0000 [info]: gem 'fluentd' version '1.17.1' 2024-11-20 11:03:30 +0000 [info]: using configuration file: <ROOT> <source> @type http port 9880 bind "0.0.0.0" </source> <match **> @type stdout </match> <label @FLUENT_LOG> <match **> @type stdout </match> </label> </ROOT> 2024-11-20 11:03:30 +0000 [info]: starting fluentd-1.17.1 pid=8 ruby="3.2.5" 2024-11-20 11:03:30 +0000 [info]: spawn command to main: cmdline=["/usr/local/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/local/bundle/bin/fluentd", "-c", "/fluentd/etc/fluentd.conf", "--plugin", "/fluentd/plugins", "--under-supervisor"] 2024-11-20 11:03:30 +0000 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil 2024-11-20 11:03:30 +0000 [info]: adding match in @FLUENT_LOG pattern="**" type="stdout" 2024-11-20 11:03:30 +0000 [info]: adding match pattern="**" type="stdout" 2024-11-20 11:03:30 +0000 [info]: adding source type="http" 2024-11-20 11:03:30 +0000 [info]: #0 starting fluentd worker pid=17 ppid=8 worker=0 2024-11-20 11:03:30 +0000 [info]: #0 fluentd worker is now running worker=0 2024-11-20 11:03:30.690414959 +0000 fluent.info: {"pid":17,"ppid":8,"worker":0,"message":"starting fluentd worker pid=17 ppid=8 worker=0"} 2024-11-20 11:03:30.692076326 +0000 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"} 2024-11-20 11:07:32.876610578 +0000 sample.test: {"json":"message"}
最後の1行の部分を見ると、先ほど curl で送った JSON が表示されていることが分かります。
設定ファイルについて
次に、先ほど用意した設定ファイルの内容について詳しく見ていきます。
<source> @type http port 9880 bind 0.0.0.0 </source> <match **> @type stdout </match> <label @FLUENT_LOG> <match **> @type stdout </match> </label>
各項目について解説します。
source
ディレクティブ
外部からデータを受け取るための設定を記載します。
@type
で入力プラグインを指定しており、ここでは http
プラグインを指定しています。
port
は何番 port でリッスンするか、bind
はバインドアドレスを記載しています。
match
ディレクティブ
match
ディレクティブは、一致するタグを持つデータに対しての処理を指定できます。
上記の source
ディレクティブでデータを受け取ると、tag、time、record という3つの要素でデータを格納し、インプットプラグインに渡します。
http インプットプラグインの場合、先ほどの例のような curl でデータを送ると以下のデータが入ります。
tag: sample.test time: (current time) record: {"json":"message"}
例えば、先ほどの curl のログを出力させるためには <match sample.test>
と指定します。 <match **>
でも出力されます。
しかし、<match sample.hoge>
ではタグが違うので出力されません。
また、@type
では出力プラグインを指定しています。stdout
を使用することで、イベントを標準出力するよう指定しています。
label
ディレクティブ
label
ディレクティブは、出力されるデータをグループ化することができます。
複数タグのグループ化が可能で、タグが多くなった場合の管理が容易になります。
Fluentd 自身のログも fluent.info
、fluent.warn
、fluent.error
、fluent.fatal
といったタグをつけて出力します。この Fluentd のログをグループ化する場合は <label @FLUENT_LOG>
を使うことが推奨されています。
これを定義しない場合、上記の <match **>
で Fluentd のログを拾ってしまうため、以下のような警告が出ます。
2024-11-20 12:12:27 +0000 [warn]: define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
バッファプラグインとは?
設定ファイルの説明でもありましたが、Fluentd にはプラグインがあります。 2024年11月時点では、以下のようなプラグインタイプがあります。
- Input
- データソースからデータを受け取るためのプラグインです。ファイル、HTTP、TCPなどさまざまなソースからデータを収集します。
- Parser
- 受け取ったデータを解析して構造化するためのプラグインです。ログをJSONや他の形式に変換できます。
- Filter
- 構造化されたデータを変換したり、追加、削除するためのプラグインです。
- Output
- 処理されたデータをストレージや他のシステムへ送るためのプラグインです。ファイル、データベース、Web API等、さまざまな出力先にデータを転送できます。
- Formatter
- データの出力形式をカスタマイズするためのプラグインです。出力データをCSV、JSONなど、必要なフォーマットに変換します。
- Storage
- バッファやキューといった一時的なデータ保存できるプラグインです。データ転送が中断した場合にデータ損失を防ぐために用いられます。
- Service Discovery
- 出力先のサービスやネットワーク変更時に自動的に検出し、設定を更新するプラグインです。
- Buffer
- 出力するデータを一時的に保持し、バッチ処理や障害時のデータ再送に利用できます。通常はデータを出力する際に使用されます。
- Metrics
- システムの運用状態やパフォーマンス測定データを収集、報告するプラグインです。監視や分析に活用できます。
その中でも、Buffer プラグインについて触れていきたいと思います。 Buffer プラグインは、Output プラグインと連携して使用され、受信したログを転送前に一時的に保存する機能があります。
バッファは「チャンク」というまとまりになっており、複数のイベントが一つのブロックにまとめられています。 チャンクはファイル形式(buf_file)や連続したメモリブロック(buf_memory)として管理されます。
チャンクは、データが一定容量になる、もしくは一定時間になると保存先に送信する仕組みになっています。 Buffer プラグインは、データが保存されていく「ステージ」と、送信先に送られる前に待機する「キュー」の2つに分かれており、新しいチャンクはステージでデータを受け取り、時間経過もしくは一定の容量になるとキューに移動し、その後保存先に転送されます。
チャンクの流れについては、公式サイトにある図が分かりやすいです。
このように Buffer プラグインはデータを一時保存し、保存先に届ける役割を担っています。 また、障害でデータ送信に失敗した場合でも、Buffer プラグインを使用することで再度送信をすることができます。
Buffer プラグインを動かしてみる
では、先ほどの設定でログを書き出す前に Buffer プラグインを使って、バッファに書き出してみたいと思います。 以下が Buffer プラグインを使用するための例になります。
<source> @type http port 9880 bind 0.0.0.0 </source> <match **> @type stdout <buffer> @type file path /tmp/fluentd-buffers/ chunk_limit_size 64m total_limit_size 256m flush_interval 30s </buffer> </match> <label @FLUENT_LOG> <match **> @type stdout </match> </label>
<match>
の中に、Buffer プラグインを使用するための設定を記載しました。
各項目について見ていきたいと思います。
@type
- Buffer プラグインのタイプを指定します。デフォルトは
memory
です。 - 今回は
file
を指定しています。
- Buffer プラグインのタイプを指定します。デフォルトは
path
- バッファを格納するディレクトリを指定します。
- 上記設定では、
/tmp/fluentd-buffers/
になります。
chunk_limit_size
- チャンクの最大サイズを指定します。file バッファの場合のデフォルトは 256MB です。
- このサイズを超えた場合は、保存先に転送されます。
total_limit_size
- バッファを格納するための指定したディレクトリ容量の最大サイズを指定します。file バッファの場合のデフォルトは 64GB です。
- 上記設定では、格納するディレクトリは
/tmp/fluentd-buffers/
です。 - 容量を超えた場合は、保存に失敗します。
flush_interval
で設定された時間が来るまで失敗が続きます。時間が過ぎれば再度格納可能になります。
flush_interval
- チャンクをフラッシュ(保存先に転送)する時間間隔を設定します。デフォルトは 60 秒です。
- 設定した時間を迎えると、保存先に転送されます。
では、実際に動作させて、先ほど説明した「チャンク」について見ていきたいと思います。 今まで実行していたターミナルとは別のウィンドウでターミナルを開き、以下を実行してください。
$ count=0; while : ;do count=$(expr $count + 1) ; sleep 1; curl -X POST -d 'json={"json":"message"}' http://127.0.0.1:9880/sample.test.${count}; done
このコマンドは、先ほどの curl にすこし変更を加えたものを1秒ずつ実行します。
分かりやすくするために、sample.test
タグの後ろにカウントされた数字を付けました。
別ウィンドウで実行後、元のターミナルに戻り、Fluentdのコンテナにアクセスします。
$ docker exec -it test_fluentd /bin/sh
設定ファイルで指定したディレクトリに移動します。
$ cd /tmp/fluentd-buffers
以下のコマンドを実行することで、チャンクができる様子と転送される様子をリアルタイムで見ることができます。
$ watch -n 0.1 ls -lh
ここでは、バッファがあるディレクトリのファイルを確認した例を示します。 バッファファイルの存在確認、バッファファイルの内容確認を確認しています。
バッファファイルは、送られてきたログのタグごとにファイルが作成されます。
下の例では、sample.test
にカウントされた数字をつけているため、カウント付きのタグごとのログファイルが作成されています。
# 指定したディレクトリのファイル確認 $ ls buffer.b627cd64e926e50edd47ef88d02bf9da3.log buffer.b627cd6507f1835a805f59a49cc956c61.log.meta buffer.b627cd64e926e50edd47ef88d02bf9da3.log.meta buffer.b627cd65175519c7293415e740fd3dd85.log buffer.b627cd64f88bf2be15b2e69ccec8e7e53.log buffer.b627cd65175519c7293415e740fd3dd85.log.meta buffer.b627cd64f88bf2be15b2e69ccec8e7e53.log.meta buffer.b627cd6526bc6fda40d271633957a1f24.log buffer.b627cd6507f1835a805f59a49cc956c61.log buffer.b627cd6526bc6fda40d271633957a1f24.log.meta # バッファファイル全体の中身を確認 $ cat buffer.*.log 2024-11-26 09:22:55.044139143 +0000 sample.test.1: {"json":"message"} 2024-11-26 09:22:56.053147459 +0000 sample.test.2: {"json":"message"} 2024-11-26 09:22:57.062181713 +0000 sample.test.3: {"json":"message"} 2024-11-26 09:22:58.070724618 +0000 sample.test.4: {"json":"message"} 2024-11-26 09:22:59.080210783 +0000 sample.test.5: {"json":"message"} 2024-11-26 09:23:00.088418103 +0000 sample.test.6: {"json":"message"} 2024-11-26 09:23:01.096158524 +0000 sample.test.7: {"json":"message"} 2024-11-26 09:23:02.104030158 +0000 sample.test.8: {"json":"message"} 2024-11-26 09:23:03.111311124 +0000 sample.test.9: {"json":"message"} 2024-11-26 09:23:04.118440245 +0000 sample.test.10: {"json":"message"} 2024-11-26 09:23:05.127378935 +0000 sample.test.11: {"json":"message"} 2024-11-26 09:23:06.134284250 +0000 sample.test.12: {"json":"message"} 2024-11-26 09:23:07.142919923 +0000 sample.test.13: {"json":"message"} 2024-11-26 09:23:08.151001716 +0000 sample.test.14: {"json":"message"}
バッファファイルが存在し、内容が先ほど紹介した curl の内容であることが分かりました。 しかし、バッファファイルの中身はまだコンテナのログに出力されていません。 バッファファイルの中身を確認した後、docker logs でログを確認した結果が以下になります。
$ docker logs test_fluentd | tail 2024-11-26 09:21:58 +0000 [info]: starting fluentd-1.17.1 pid=7 ruby="3.2.5" 2024-11-26 09:21:58 +0000 [info]: spawn command to main: cmdline=["/usr/local/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/local/bundle/bin/fluentd", "-c", "/fluentd/etc/fluentd.conf", "--plugin", "/fluentd/plugins", "--under-supervisor"] 2024-11-26 09:21:58 +0000 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil 2024-11-26 09:21:58 +0000 [info]: adding match in @FLUENT_LOG pattern="**" type="stdout" 2024-11-26 09:21:58 +0000 [info]: adding match pattern="**" type="stdout" 2024-11-26 09:21:58 +0000 [info]: adding source type="http" 2024-11-26 09:21:58 +0000 [info]: #0 starting fluentd worker pid=16 ppid=7 worker=0 2024-11-26 09:21:58 +0000 [info]: #0 fluentd worker is now running worker=0 2024-11-26 09:21:58.952210182 +0000 fluent.info: {"pid":16,"ppid":7,"worker":0,"message":"starting fluentd worker pid=16 ppid=7 worker=0"} 2024-11-26 09:21:58.954423043 +0000 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}
起動直後のログが出ていますね。
しばらくしてから、もう一度確認すると先ほど確認したバッファファイルのログが記録されていることが分かります。
$ docker logs test_fluentd | tail 2024-11-26 09:21:58 +0000 [info]: adding match in @FLUENT_LOG pattern="**" type="stdout" 2024-11-26 09:21:58 +0000 [info]: adding match pattern="**" type="stdout" 2024-11-26 09:21:58 +0000 [info]: adding source type="http" 2024-11-26 09:21:58 +0000 [info]: #0 starting fluentd worker pid=16 ppid=7 worker=0 2024-11-26 09:21:58 +0000 [info]: #0 fluentd worker is now running worker=0 2024-11-26 09:21:58.952210182 +0000 fluent.info: {"pid":16,"ppid":7,"worker":0,"message":"starting fluentd worker pid=16 ppid=7 worker=0"} 2024-11-26 09:21:58.954423043 +0000 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"} 2024-11-26 09:22:55.044139143 +0000 sample.test.1: {"json":"message"} 2024-11-26 09:22:56.053147459 +0000 sample.test.2: {"json":"message"} 2024-11-26 09:22:57.062181713 +0000 sample.test.3: {"json":"message"}
バッファにあったログが徐々にコンテナのログに記録されていくため、ぜひ手元で動かして確認してみてください。
さいごに
Fluentd を実際に動かし、動作を見ていきました。 ログ収集が可能で、今回紹介したような Buffer プラグインを使用することで、より柔軟な収集が可能になります。 Fluentd には他にも多くのプラグインがありますので、興味のある方はぜひ試してみてください。
お酒を飲みに行くことが好きです。最近は実家付近で推し候補の個人経営居酒屋を見つけました。