Kafka
Kafka プラグインを使用すると、Kafka(英語) イベントストリーミングプロセスを監視し、コンシューマー、プロデューサー、トピックを作成できます。また、スキーマレジストリに接続し、スキーマを作成および更新することもできます。
Kafka プラグインをインストールする
この機能は、インストールして有効にする必要がある Kafka(英語) プラグインに依存しています。
Ctrl+Alt+S を押して設定を開き、
を選択します。マーケットプレースタブを開き、Kafka プラグインを見つけて、インストールをクリックします (プロンプトが表示されたら、IDE を再起動します)。
Kafka プラグインを使用すると、次のことが可能になります。
接続先:
Kafka プラグインがインストールされ有効になっている場合は、Kafka ツールウィンドウ ( ) を使用して Kafka に接続し、操作することができます。あるいは、リモートファイルシステムまたは Zeppelin プラグインがインストールされ有効になっている場合は、ビッグデータツールツールウィンドウ ( ) を使用して Kafka 接続にアクセスすることもできます。
Kafka に接続する
クラウドプロバイダーを使用して Kafka に接続する
Confluent クラスターに接続する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストでクラウドを選択し、次にプロバイダーリストで Confluent を選択します。
https://confluent.cloud/home(英語) に進みます。ページの右側で、設定メニューをクリックし、環境を選択し、クラスターを選択して、 を選択します。
クライアントの構成スニペットをコピーするブロックで、Kafka API キーを指定し、コピーをクリックします。
IDE に戻り、コピーしたプロパティを構成フィールドに貼り付けます。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。

オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は、チェックボックスをオフにしてください。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
AWS MSK クラスターに接続する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストでクラウドを選択し、次にプロバイダーリストで AWS MSK を選択します。
「Bootstrap サーバー」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
AWS 認証リストで認証方法を選択します。
デフォルトの資格情報プロバイダーチェーン : デフォルトのプロバイダーチェーンの資格情報を使用します。チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語)」を参照してください。
資格情報ファイルからのプロファイル : ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵 : 資格情報を手動で入力します。
オプションで、スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、トンネリングを有効にするを選択し、SSH 構成リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。

オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は、チェックボックスをオフにしてください。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
カスタム Kafka サーバーに接続する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストで、カスタムを選択します。
「Bootstrap サーバー」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
認証で、認証方法を選択します。
なし : 認証なしで接続します。
SASL : SASL メカニズム (Plain、SCRAM-SHA-256、SCRAM-SHA-512、Kerberos) を選択し、ユーザー名とパスワードを入力します。
SSL
ブローカーのホスト名がブローカー証明書のホスト名と一致することを確認する場合は、サーバーのホスト名を検証するを選択します。チェックボックスをオフにすることは、
ssl.endpoint.identification.algorithm=
プロパティを追加することと同じです。トラストストアの場所で、SSL トラストストアの場所へのパスを指定します。(
ssl.truststore.location
プロパティ)。トラストストアのパスワードで、SSL トラストストアのパスワードへのパスを指定します。(
ssl.truststore.password
プロパティ)。キーストアクライアント認証を使用するを選択し、鍵ストアのロケーション (
ssl.keystore.location
)、鍵ストアパスワード (ssl.keystore.password
)、および鍵パスワード (ssl.key.password
) の値を指定します。
AWS IAM : Amazon MSK には AWS IAM を使用します。AWS 認証リストで、次のいずれかを選択します。
デフォルトの資格情報プロバイダーチェーン : デフォルトのプロバイダーチェーンの資格情報を使用します。チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語)」を参照してください。
資格情報ファイルからのプロファイル : ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵 : 資格情報を手動で入力します。
オプションで、スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、トンネリングを有効にするを選択し、SSH 構成リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。

オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は、チェックボックスをオフにしてください。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
プロパティを使用して Kafka に接続する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストで、Properties を選択します。
「Bootstrap サーバー」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
Kafka ブローカー構成プロパティを提供する方法を選択します。
暗黙的 : 提供された構成プロパティを貼り付けます。または、コード補完と WebStorm が提供するクイックドキュメントを使用して手動で入力することもできます。
ファイルから : プロパティファイルを選択します。
オプションで、スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、トンネリングを有効にするを選択し、SSH 構成リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は、チェックボックスをオフにしてください。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
Kafka サーバーへの接続を確立すると、この接続を含む新しいタブが Kafka ツールウィンドウに表示されます。これを使用して、データの生成と消費、トピックの作成と削除を行うことができます。スキーマレジストリに接続している場合は、スキーマを表示、作成、更新することもできます。
Kafka ツールウィンドウの任意のタブで をクリックして、接続の名前変更、削除、無効化、リフレッシュを行ったり、接続の設定を変更したりできます。

すべてのクラスタートピックがトピックセクションに表示されます。 をクリックしてお気に入りのトピックのみを表示するか、
をクリックして内部トピックを表示または非表示にすることができます。トピックをクリックすると、パーティション、構成、スキーマに関する情報などの詳細が表示されます。
トピックを作成する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
トピックを選択し、
をクリックします (または Alt+Insert を押します)。
新しいトピックに名前を付け、パーティションの数とレプリケーション係数を指定して、「OK 」をクリックします。
トピックからレコードを削除する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
トピックでトピックを右クリックし、明確なトピックを選択します (またはその左側にある
をクリックします)。OK をクリックして削除を確認します。
データの生産と消費
データを作成する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
Kafka 接続を選択し、プロデューサーをクリックします。
これにより、新しいエディタータブでプロデューサーが開きます。
トピックリストで、メッセージを書き込むトピックを選択します。
キーおよび値で、メッセージキーと値を選択します。
スキーマレジストリに接続している場合は、スキーマレジストリを選択して、送信されたデータを選択したスキーマと照合してチェックできます。
ランダムな値を生成できます:
をクリックすると、選択したタイプに基づいてランダムな値が生成されます。これには、選択したスキーマレジストリに基づいて JSON オブジェクト全体の生成が含まれます。
ランダムな値を生成する柔軟性を高めるには、
${random...}
変数を使用します。JSON、Avro、Protobuf ファイルを編集するときに、値フィールドにrandom
と入力し始めると、ランダムな値の自動補完オプションが表示されます。例:"${random.integer(1,10)}"
を使用して、1 から 10 までのランダムな整数を生成できます。
ヘッダーで、カスタムヘッダーを指定します。JSON または CSV 形式で保存されている場合は、このセクションに貼り付けることができます。
Flow では、レコードフローを制御できます。
複数のレコードを同時に送信する場合は、一度に記録に数値を入力します。
レコードデータをランダムに生成する場合は、ランダムキーを生成するおよびランダムな値を生成するを選択します。
レコード送信間の間隔をミリ秒単位で設定します。
指定されたレコード数に達した場合、または指定された時間が経過した場合に、プロデューサーがメッセージの送信を停止するようにするには、停止条件を指定します。
オプションで、追加のオプションを指定します。
パーティション : レコードを送信する必要があるトピックパーティションを指定します。指定しない場合は、デフォルトのロジックが使用されます。プロデューサーは、パーティション数を法としたキーのハッシュを取得します。
圧縮 : プロデューサーによって生成されたデータの圧縮タイプを選択します: なし、Gzip、Snappy、Lz4、または Zstd。
冪等性 : 各メッセージのコピーが 1 つだけストリームに書き込まれるようにする場合に選択します。
Ack: リーダーがローカルログにレコードを書き込み、すべてのフォロワーからの完全な確認を待たずに応答するようにする場合は、リーダーを選択します。リーダーが同期レプリカの完全なセットがレコードを確認するのを待機するには、すべてを選択します。サーバーからの確認応答を待たないように、プロデューサー用のなしを保持します。
生産をクリックします。

データタブ内の任意のレコードをクリックすると、その詳細が表示されます。 をクリックして統計を有効にすることもできます。
データを消費する
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
Kafka 接続を選択し、消費者をクリックします。
これにより、新しいエディタータブでコンシューマーが開きます。
トピックリストで、サブスクライブするトピックを選択します。
キーおよび値で、使用するレコードのキーと値のデータ型を選択します。
範囲とフィルターを使用して、使用するデータを絞り込みます。
開始リストで、データを消費する期間またはオフセットを選択します。トピックからすべてのレコードを取得するには、最初からを選択します。
制限リストで、データの受信をいつ停止するかを選択します (たとえば、トピック内のレコード数が特定の数に達したとき)。
フィルターを使用して、キー、値、またはヘッダーの部分文字列によってレコードをフィルターします。
その他で、追加のパラメーターを設定します。
パーティションボックスにパーティション ID またはコンマ区切りの ID リストを入力して、特定のパーティションからのレコードのみを取得します。
新しいコンシューマーを追加する場合は、消費者団体リストでコンシューマーグループを選択します。
消費を開始するをクリックします。

データタブ内の任意のレコードをクリックすると、その詳細が表示されます。 をクリックして統計を有効にすることもできます。
データのエクスポート
生成または消費されたデータを CSV、TSV、JSON 形式でダウンロードできます。
プロデューサーまたはコンシューマーのプリセットを保存する
同じキー、値、ヘッダー、またはその他のパラメーターを使用してデータを頻繁に生成または使用する場合は、プリセットとして保存できます。その後、プリセットを再利用して、プロデューサーまたはコンシューマーをすばやく作成できます。
Kafka ツールウィンドウで、プロデューサーまたは消費者をクリックします。
必要なパラメーターを指定し、プロデューサーまたはコンシューマー作成フォームの上部で
( プリセットの保存 ) をクリックします。
パラメーターはプリセットとして保存され、プリセットタブで利用できます。プリセットをクリックして適用します。
スキーマレジストリの操作
プロデューサーとコンシューマーはスキーマを使用して、レコードのキーと値の一貫性を検証し、保証できます。Kafka プラグインはスキーマレジストリと統合され、Avro、Protobuf、JSON スキーマをサポートします。これにより、次のことが可能になります。
スキーマレジストリに接続する
スキーマの作成、更新、削除、クローン作成
生の形式またはツリービューでスキーマをプレビューする
スキーマのバージョンを比較する
スキーマのバージョンを削除する
スキーマレジストリに接続する
クラウドプロバイダー、カスタムサーバー、またはプロパティを使用して、Kafka ブローカーへの接続を作成します。
Confluent を使用する場合は、ブローカーレジストリプロパティとスキーマレジストリプロパティの両方を構成フィールドに貼り付けることができます。
それ以外の場合は、「スキーマレジストリ」セクションを展開し、プロバイダー (Confluent または Glue) を選択します。
URL : スキーマレジストリの URL を入力します。
構成ソース : 接続パラメーターを提供する方法を選択します。
カスタム : 認証方法を選択し、資格情報を提供します。
Kafka ブローカーの SSL 設定とは異なる SSL 設定を使用する場合は、ブローカー SSL 設定を使用するチェックボックスをオフにして、トラストストアのパスを指定します。
Properties: 提供された構成プロパティを貼り付けます。または、コード補完と WebStorm が提供する簡単なドキュメントを使用して、プロパティを手動で入力することもできます。
地域 : スキーマレジストリリージョンを選択します。
AWS 認証 : 認証方法を選択します:
デフォルトの資格情報プロバイダーチェーン : デフォルトのプロバイダーチェーンの資格情報を使用します。チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語)」を参照してください。
資格情報ファイルからのプロファイル : ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵 : 資格情報を手動で入力します。
レジストリ名 : 接続するスキーマレジストリの名前を入力するか、
をクリックしてリストから選択します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
スキーマの作成
Kafka ツールウィンドウを開きます: 。
( 新規接続 ) をクリックします。
スキーマレジストリを選択し、
をクリックします (または Alt+Insert を押します)。
フォーマットリストで、スキーマ形式 (Avro、Protobuf、JSON) を選択します。
戦略リストで、命名戦略(英語)を選択し、選択した戦略に応じて、名前接尾辞を設定するか、トピックを選択します。または、カスタム名を選択し、任意の名前を入力します。

スキーマをツリーおよび生のビューでプレビューできます。


スキーマのバージョンを比較する
スキーマレジストリに接続する場合は、スキーマレジストリのスキーマを選択します。
生のビューに切り替えて、比較をクリックします。このボタンは、スキーマに複数のバージョンがある場合に使用できます。

スキーマのバージョンを削除する
スキーマに複数のバージョンがある場合は、特定のバージョンを削除できます。スキーマレジストリは、ソフト (バージョンの削除後にスキーマメタデータと ID がレジストリから削除されない) とハード (スキーマ ID を含むすべてのメタデータが削除される) の 2 種類の削除(英語)をサポートしています。選択できるかどうかは、Confluent スキーマレジストリと AWS Glue スキーマレジストリのどちらを使用するかによって異なります。
Confluent Schema Registry では、デフォルトで論理的な削除が使用されます。永久削除チェックボックスを選択すると、完全な削除の使用を選択できます。
AWS Glue Schema Registry は常に完全削除を使用します。
スキーマレジストリでスキーマを選択します。
その右側にある
をクリックし、バージョンの削除を選択します。