Kafka
Kafka プラグインを使用すると、Kafka(英語) イベントストリーミングプロセスを監視し、コンシューマー、プロデューサー、トピックを作成できます。また、スキーマレジストリに接続し、スキーマを作成および更新することもできます。
Kafka プラグインをインストールする
この機能は、インストールして有効にする必要がある Kafka(英語) プラグインに依存しています。
Ctrl+Alt+S を押して設定を開き、
を選択します。マーケットプレースタブを開き、Kafka プラグインを見つけて、インストールをクリックします (プロンプトが表示されたら、IDE を再起動します)。
Kafka プラグインを使用すると、次のことが可能になります。
接続先:
Kafka プラグインがインストールされ有効になっている場合は、Kafka ツールウィンドウ ( ) を使用して Kafka に接続し、操作することができます。あるいは、リモートファイルシステムまたは Zeppelin プラグインがインストールされ有効になっている場合は、ビッグデータツールツールウィンドウ ( ) を使用して Kafka 接続にアクセスすることもできます。
Kafka に接続する
クラウドプロバイダーを使用して Kafka に接続する
Confluent クラスターに接続する
Kafka ツールウィンドウを開きます: 。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストでクラウドを選択し、次にプロバイダーリストで合流を選択します。
https://confluent.cloud/home(英語) に進みます。ページの右側で、設定メニューをクリックし、環境を選択し、クラスターを選択して、 を選択します。
クライアントの構成スニペットをコピーするブロックで、Kafka API キーを指定し、コピーをクリックします。
IDE に戻り、コピーしたプロパティを構成フィールドに貼り付けます。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
![Kafka コンフルエント Kafka Confluent](https://resources.jetbrains.com/help/img/idea/2024.1/kafka_confluent.png)
オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は選択を解除します。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にする場合に選択します。この接続を他のプロジェクトで表示する場合は、選択を解除します。
AWS MSK クラスターに接続する
Kafka ツールウィンドウを開きます: 。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストでクラウドを選択し、次にプロバイダーリストで AWS MSK を選択します。
「Bootstrap サーバー」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
AWS 認証リストで認証方法を選択します。
デフォルトの資格情報プロバイダーチェーン : デフォルトのプロバイダーチェーンの資格情報を使用します。チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語)」を参照してください。
資格情報ファイルからのプロファイル : ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵 : 資格情報を手動で入力します。
オプションで、スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、トンネリングを有効にするを選択し、SSH 構成リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
![Kafka AWS MSK Kafka AWS MSK](https://resources.jetbrains.com/help/img/idea/2024.1/kafka_aws_msk.png)
オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は選択を解除します。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にする場合に選択します。この接続を他のプロジェクトで表示する場合は、選択を解除します。
カスタム Kafka サーバーに接続する
Kafka ツールウィンドウを開きます: 。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストで、カスタムを選択します。
「Bootstrap サーバー」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
認証で、認証方法を選択します。
なし : 認証なしで接続します。
SASL : SASL メカニズム (Plain、SCRAM-SHA-256、SCRAM-SHA-512、Kerberos) を選択し、ユーザー名とパスワードを入力します。
SSL
ブローカーのホスト名がブローカー証明書のホスト名と一致することを確認する場合は、サーバーのホスト名を検証するを選択します。チェックボックスをオフにすることは、
ssl.endpoint.identification.algorithm=
プロパティを追加することと同じです。トラストストアの場所で、SSL トラストストアの場所へのパスを指定します。(
ssl.truststore.location
プロパティ)。トラストストアのパスワードで、SSL トラストストアのパスワードへのパスを指定します。(
ssl.truststore.password
プロパティ)。キーストアクライアント認証を使用するを選択し、鍵ストアのロケーション (
ssl.keystore.location
)、鍵ストアパスワード (ssl.keystore.password
)、および鍵パスワード (ssl.key.password
) の値を指定します。
AWS IAM : Amazon MSK には AWS IAM を使用します。AWS 認証リストで、次のいずれかを選択します。
デフォルトの資格情報プロバイダーチェーン : デフォルトのプロバイダーチェーンの資格情報を使用します。チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語)」を参照してください。
資格情報ファイルからのプロファイル : ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵 : 資格情報を手動で入力します。
オプションで、スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、トンネリングを有効にするを選択し、SSH 構成リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
![Kafka カスタム接続 Kafka custom connection](https://resources.jetbrains.com/help/img/idea/2024.1/kafka_connect_custom.png)
オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は選択を解除します。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にする場合に選択します。この接続を他のプロジェクトで表示する場合は、選択を解除します。
プロパティを使用して Kafka に接続する
Kafka ツールウィンドウを開きます: 。
「名前」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソースリストで、Properties を選択します。
「Bootstrap サーバー」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
Kafka ブローカー構成プロパティを提供する方法を選択します。
暗黙的 : 提供された構成プロパティを貼り付けます。または、コード補完と DataGrip が提供するクイックドキュメントを使用して手動で入力することもできます。
ファイルから : プロパティファイルを選択します。
オプションで、スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、トンネリングを有効にするを選択し、SSH 構成リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
オプションで、次を設定できます。
接続を使用可能にする : この接続を無効にする場合は選択を解除します。デフォルトでは、新しく作成された接続は有効になっています。
プロジェクトごと: これらの接続設定を現在のプロジェクトでのみ有効にする場合に選択します。この接続を他のプロジェクトで表示する場合は、選択を解除します。
Kafka サーバーへの接続を確立すると、この接続を含む新しいタブが Kafka ツールウィンドウに表示されます。これを使用して、データの生成と消費、トピックの作成と削除を行うことができます。スキーマレジストリに接続している場合は、スキーマを表示、作成、更新することもできます。
Kafka ツールウィンドウの任意のタブで をクリックして、接続の名前変更、削除、無効化、リフレッシュを行ったり、接続の設定を変更したりできます。
![Kafka 接続: トピック Kafka connection: topics](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_kafka_configuration.png)
すべてのクラスタートピックがトピックセクションに表示されます。 をクリックしてお気に入りのトピックのみを表示するか、
をクリックして内部トピックを表示または非表示にすることができます。トピックをクリックすると、パーティション、構成、スキーマに関する情報などの詳細が表示されます。
トピックを作成する
Kafka ツールウィンドウを開きます: 。
トピックを選択し、
をクリックします (または Alt+Insert を押します)。
新しいトピックに名前を付け、パーティションの数とレプリケーション係数を指定して、「OK 」をクリックします。
トピックからレコードを削除する
Kafka ツールウィンドウを開きます: 。
トピックでトピックを右クリックし、明確なトピックを選択します (またはその左側にある
をクリックします)。OK をクリックして削除を確認します。
データの生成と消費
データを作成する
Kafka ツールウィンドウを開きます: 。
Kafka 接続を選択し、
( プロデューサーの作成 ) をクリックします。
これにより、新しいエディタータブでプロデューサーが開きます。
トピックリストで、メッセージを書き込むトピックを選択します。
キーおよび値で、メッセージのキーと値を選択します。
を使用して、選択したタイプに基づいてランダムな値を生成します。
スキーマレジストリに接続している場合は、スキーマレジストリを選択して、送信されたデータを選択したスキーマと照合してチェックできます。
ヘッダーで、カスタムヘッダーを指定します。JSON または CSV 形式で保存されている場合は、このセクションに貼り付けることができます。
Flow では、レコードフローを制御できます。
複数のレコードを同時に送信する場合は、一度に記録に数値を入力します。
レコードデータをランダムに生成する場合は、ランダムキーを生成するおよびランダムな値を生成するを選択します。
レコード送信間の間隔をミリ秒単位で設定します。
指定されたレコード数に達した場合、または指定された時間が経過した場合に、プロデューサーがメッセージの送信を停止するようにするには、停止条件を指定します。
オプションで、追加のオプションを指定します。
パーティション : レコードを送信する必要があるトピックパーティションを指定します。指定しない場合は、デフォルトのロジックが使用されます。プロデューサーは、パーティション数を法としたキーのハッシュを取得します。
圧縮 : プロデューサーによって生成されたデータの圧縮タイプを選択します: なし、Gzip、キビキビ、Lz4、または Zstd。
冪等性 : 各メッセージのコピーが 1 つだけストリームに書き込まれるようにする場合に選択します。
アックス : リーダーがローカルログにレコードを書き込み、すべてのフォロワーからの完全な確認を待たずに応答するようにする場合は、リーダーを選択します。リーダーが同期レプリカの完全なセットがレコードを確認するのを待機するには、すべてを選択します。サーバーからの確認応答を待たないように、プロデューサー用のなしを保持します。
生産をクリックします。
![Kafka でメッセージを生成する Produce messages in Kafka](https://resources.jetbrains.com/help/img/idea/2024.1/kafka_producer.png)
データタブ内の任意のレコードをクリックすると、その詳細が表示されます。 をクリックして統計を有効にすることもできます。
データを消費する
Kafka ツールウィンドウを開きます: 。
Kafka 接続を選択し、
( コンシューマーの作成 ) をクリックします。
これにより、新しいエディタータブでコンシューマーが開きます。
トピックリストで、サブスクライブするトピックを選択します。
キーおよび値で、使用するレコードのキーと値のデータ型を選択します。
範囲とフィルターを使用して、使用するデータを絞り込みます。
から始まるリストで、データを消費する期間またはオフセットを選択します。トピックからすべてのレコードを取得するには、最初からを選択します。
制限リストで、データの受信をいつ停止するかを選択します (たとえば、トピック内のレコード数が特定の数に達したとき)。
フィルターを使用して、キー、値、またはヘッダーの部分文字列によってレコードをフィルターします。
パーティションにパーティション ID または ID のコンマ区切りリストを入力して、特定のパーティションからのみレコードを取得します。
消費を開始するをクリックします。
![Kafka でメッセージを消費する Consume messages in Kafka](https://resources.jetbrains.com/help/img/idea/2024.1/kafka_consumer.png)
データタブ内の任意のレコードをクリックすると、その詳細が表示されます。 をクリックして統計を有効にすることもできます。
プロデューサーまたはコンシューマーのプリセットを保存する
同じキー、値、ヘッダー、またはその他のパラメーターを使用してデータを頻繁に生成または使用する場合は、プリセットとして保存できます。その後、プリセットを再利用して、プロデューサーまたはコンシューマーをすばやく作成できます。
Kafka ツールウィンドウで、
( プロデューサーの作成 ) または
( コンシューマーの作成 ) をクリックします。
必要なパラメーターを指定し、プロデューサーまたはコンシューマー作成フォームの上部で
( プリセットの保存 ) をクリックします。
パラメーターはプリセットとして保存され、プリセットタブで利用できます。プリセットをクリックして適用します。
スキーマレジストリの操作
プロデューサーとコンシューマーはスキーマを使用して、レコードのキーと値の一貫性を検証し、保証できます。Kafka プラグインはスキーマレジストリと統合され、Avro、Protobuf、JSON スキーマをサポートします。これにより、次のことが可能になります。
スキーマレジストリに接続する
スキーマの作成、更新、削除、クローン作成
生の形式またはツリービューでスキーマをプレビューする
スキーマのバージョンを比較する
スキーマのバージョンを削除する
スキーマレジストリに接続する
クラウドプロバイダー、カスタムサーバー、またはプロパティを使用して、Kafka ブローカーへの接続を作成します。
合流を使用する場合は、ブローカーレジストリプロパティとスキーマレジストリプロパティの両方を構成フィールドに貼り付けることができます。
それ以外の場合は、「スキーマレジストリ」セクションを展開し、プロバイダー ( 合流またはグルー ) を選択します。
URL : スキーマレジストリの URL を入力します。
構成ソース : 接続パラメーターを提供する方法を選択します。
カスタム : 認証方法を選択し、資格情報を提供します。
Kafka ブローカーの SSL 設定とは異なる SSL 設定を使用する場合は、ブローカー SSL 設定を使用するチェックボックスをオフにして、トラストストアのパスを指定します。
Properties: 提供された構成プロパティを貼り付けます。または、コード補完と DataGrip が提供する簡単なドキュメントを使用して、プロパティを手動で入力することもできます。
領域 : スキーマレジストリリージョンを選択します。
AWS 認証 : 認証方法を選択します:
デフォルトの資格情報プロバイダーチェーン : デフォルトのプロバイダーチェーンの資格情報を使用します。チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語)」を参照してください。
資格情報ファイルからのプロファイル : ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵 : 資格情報を手動で入力します。
レジストリ名 : 接続するスキーマレジストリの名前を入力するか、
をクリックしてリストから選択します。
設定を入力したら、接続のテストをクリックして、すべての構成パラメーターが正しいことを確認します。次に OK をクリックします。
スキーマの作成
Kafka ツールウィンドウを開きます: 。
スキーマレジストリを選択し、
をクリックします (または Alt+Insert を押します)。
フォーマットリストで、スキーマ形式 (Avro、Protobuf、JSON) を選択します。
戦略リストで、命名戦略(英語)を選択し、選択した戦略に応じて、名前サフィックスを設定するか、トピックを選択します。または、カスタム名を選択し、任意の名前を入力します。
![スキーマの作成 Create Schema](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_kafka_create_schema.png)
スキーマをツリーおよび生のビューでプレビューできます。
![ツリービュー Tree View](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_kafka_tree_view.png)
![生のビュー Raw View](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_kafka_raw_view.png)
スキーマのバージョンを比較する
スキーマレジストリに接続する場合は、スキーマレジストリのスキーマを選択します。
生のビューに切り替えて、比較をクリックします。このボタンは、スキーマに複数のバージョンがある場合に使用できます。
![バージョンの比較 Compare versions](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_kafka_schema_compare_versions.png)
スキーマのバージョンを削除する
スキーマに複数のバージョンがある場合は、特定のバージョンを削除できます。スキーマレジストリは、ソフト (バージョンの削除後にスキーマのメタデータと ID がレジストリから削除されない場合) とハード (スキーマ ID を含むすべてのメタデータを削除する) の 2 種類の削除(英語)をサポートします。選択できるかどうかは、Confluent と AWS Glue スキーマレジストリのどちらを使用するかによって異なります。
Confluent Schema Registry では、デフォルトで論理的な削除が使用されます。永久削除チェックボックスを選択すると、完全な削除の使用を選択できます。
AWS Glue Schema Registry は常に完全削除を使用します。
スキーマレジストリでスキーマを選択します。
その右側にある
をクリックし、バージョンの削除を選択します。
関連ページ:
![](https://pleiades.io/icons/datagrip.png)
Kerberos
Kerberos は、安全でないネットワーク上でクライアントとサーバーを認証するための安全な方法を提供するネットワーク認証プロトコルです。ビッグデータツールプラグインを使用すると、Kerberos を使用して Kafka、HDFS、Hive Metastore への接続を認証できます。Kerberos を使用して Kafka で認証する「ビッグデータツール」ウィンドウで「」をクリックし、「Kafka」を選択します。または、既存の接続を編集する場合は、それを選択してをクリックします。Kerberos...
![](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_custom_spark_cluster_step_1.png)
カスタム Spark クラスター
Spark 実行構成を送信するでは、AWS EMR または Dataproc をリモートサーバーとして使用してアプリケーションを実行できます。これら 2 つのオプションに加えて、独自のカスタム Spark クラスターを構成することもできます。リモートサーバーに接続するための SSH 構成をセットアップし、オプションで Spark 履歴サーバーへの接続と SFTP 接続を構成します。カスタム Spark クラスターの作成ビッグデータツールウィンドウで、をクリックし、カスタム Spark クラスター...
![](https://resources.jetbrains.com/help/img/idea/2024.1/bdt_remote_file_systems_tool_window.png)
リモートファイルシステム
リモートファイルシステムプラグインを使用すると、リモートストレージに接続し、IDE から直接ストレージ上のデータを管理できます。リモートファイルシステムプラグインをインストールするこの機能は、インストールして有効にする必要があるリモートファイルシステムプラグインに依存しています。を押して設定を開き、を選択します。マーケットプレースタブを開き、リモートファイルシステムプラグインを見つけて、インストールをクリックします (プロンプトが表示されたら、IDE を再起動します)。このプラグインは、次のリモ...