Skip to content

Commit

Permalink
Add ability to remove a consumer
Browse files Browse the repository at this point in the history
Fixes jlandersen#26

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed Feb 10, 2021
1 parent 9041cfe commit 8d2a1a4
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to Kafka extension will be documented in this file.
- Added SASL/SCRAM-256 and SASL/SCRAM-512 authentication support. See [#3](https://github.com/jlandersen/vscode-kafka/issues/3).
- Added the option to enable basic SSL support for clusters without authentication. See [#84](https://github.com/jlandersen/vscode-kafka/issues/84).
- The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).
- Added support for consumer group deletion. See [#26](https://github.com/jlandersen/vscode-kafka/issues/26).

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
18 changes: 18 additions & 0 deletions docs/Explorer.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ The Kafka explorer shows configured clusters with their topics, brokers, consume

![Screenshot-3](assets/screen-3.png)

## Actions

### Copy Label

You can copy the label of any tree node in the clipboard with `Ctrl+C` (`Cmd+C` on Mac) or via the `Copy Label` contextual menu:

![Copy Label](assets/kafka-explorer-copylabel.png)

### Delete

You can delete clusters, topics and consumer groups with the `Delete` (`Cmd+Delete` on Mac) key, the `Trashcan` icon or `Delete` contextual menu:

![Delete Consumer Group](assets/kafka-explorer-delete-consumergroup.png)

Multiple delete is not supported for the moment. See [issue 107](https://github.com/jlandersen/vscode-kafka/issues/107).

To delete a consumer group, it first must be stopped, else the `Delete` action will report an error.

## Preferences

### `kafka.explorer.topics.sort`
Expand Down
Binary file added docs/assets/kafka-explorer-copylabel.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 8 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"onCommand:vscode-kafka.consumer.list",
"onCommand:vscode-kafka.consumer.toggle",
"onCommand:vscode-kafka.consumer.clear",
"onCommand:vscode-kafa.consumer.deletegroup",
"onCommand:vscode-kafka.explorer.copylabel",
"onCommand:vscode-kafka.explorer.deleteselected",
"onView:kafkaExplorer",
Expand Down Expand Up @@ -280,6 +281,11 @@
"light": "images/light/clear-all.svg",
"dark": "images/dark/clear-all.svg"
}
},
{
"command": "vscode-kafka.consumer.deletegroup",
"title": "Delete Consumer Group",
"category": "Kafka"
}
],
"menus": {
Expand Down Expand Up @@ -333,12 +339,12 @@
},
{
"command": "vscode-kafka.explorer.deleteselected",
"when": "view == kafkaExplorer && viewItem =~ /^cluster$|^selectedCluster$|^topic$/ && !listMultiSelection",
"when": "view == kafkaExplorer && viewItem =~ /^cluster$|^selectedCluster$|^topic$|^consumergroupitem$/ && !listMultiSelection",
"group": "inline"
},
{
"command": "vscode-kafka.explorer.deleteselected",
"when": "view == kafkaExplorer && viewItem =~ /^cluster$|^selectedCluster$|^topic$/ && !listMultiSelection",
"when": "view == kafkaExplorer && viewItem =~ /^cluster$|^selectedCluster$|^topic$|^consumergroupitem$/ && !listMultiSelection",
"group": "3_modification"
}
],
Expand Down
39 changes: 28 additions & 11 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export interface ConsumerGroupMember {
}

export interface Client extends Disposable {
cluster: Cluster;
producer: Producer;
connect(): Promise<void>;
getTopics(): Promise<Topic[]>;
Expand All @@ -87,6 +88,7 @@ export interface Client extends Disposable {
getTopicConfigs(topicId: string): Promise<ConfigEntry[]>;
getConsumerGroupIds(): Promise<string[]>;
getConsumerGroupDetails(groupId: string): Promise<ConsumerGroup>;
deleteConsumerGroups(groupIds: string[]): Promise<void>;
createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]>;
deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void>;
}
Expand All @@ -95,6 +97,10 @@ class EnsureConnectedDecorator implements Client {
constructor(private client: Client) {
}

get cluster(): Cluster {
return this.client.cluster;
}

get producer(): any {
return this.client.producer;
}
Expand Down Expand Up @@ -133,6 +139,11 @@ class EnsureConnectedDecorator implements Client {
return await this.client.getConsumerGroupDetails(groupId);
}

public async deleteConsumerGroups(groupIds: string[]): Promise<void> {
await this.waitUntilConnected();
return await this.client.deleteConsumerGroups(groupIds);
}

public async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
await this.waitUntilConnected();
return await this.client.createTopic(createTopicRequest);
Expand Down Expand Up @@ -174,12 +185,12 @@ class KafkaJsClient implements Client {
brokers: Broker[];
};

constructor(connectionOptions : ConnectionOptions, workspaceSettings: WorkspaceSettings) {
constructor(public readonly cluster: Cluster, workspaceSettings: WorkspaceSettings) {
this.metadata = {
brokers: [],
topics: [],
};
this.kafkaJsClient = createKafka(connectionOptions);
this.kafkaJsClient = createKafka(cluster);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
Expand All @@ -190,7 +201,7 @@ class KafkaJsClient implements Client {
}

connect(): Promise<void> {
return this.kafkaAdminClient.connect();
return this.kafkaAdminClient.connect();
}

async getTopics(): Promise<Topic[]> {
Expand All @@ -199,12 +210,14 @@ class KafkaJsClient implements Client {
this.metadata = {
...this.metadata,
topics: listTopicsResponse.topics.map((t) => {
const partitions = t.partitions.reduce((prev, p) => ({...prev, [p.partitionId.toString()]: {
partition: p.partitionId.toString(),
leader: p.leader.toString(),
replicas: p.replicas.map((r) => (r.toString())),
isr: p.isr.map((r) => (r.toString())),
}}), {});
const partitions = t.partitions.reduce((prev, p) => ({
...prev, [p.partitionId.toString()]: {
partition: p.partitionId.toString(),
leader: p.leader.toString(),
replicas: p.replicas.map((r) => (r.toString())),
isr: p.isr.map((r) => (r.toString())),
}
}), {});

return {
id: t.name,
Expand Down Expand Up @@ -289,6 +302,10 @@ class KafkaJsClient implements Client {
return consumerGroup;
}

async deleteConsumerGroups(groupIds: string[]): Promise<void> {
await this.kafkaAdminClient.deleteGroups(groupIds);
}

async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
await this.kafkaAdminClient.createTopics({
validateOnly: false,
Expand Down Expand Up @@ -325,13 +342,13 @@ export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
});
}
return kafkaJsClient;
}
5 changes: 2 additions & 3 deletions src/commands/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { dump } from "js-yaml";
import { Broker, ClientAccessor } from "../client";
import { BrokerItem } from "../explorer/models/brokers";
import { OutputChannelProvider } from "../providers";
import { pickBroker, pickCluster } from "./common";
import { pickBroker, pickClient, pickCluster } from "./common";
import { ClusterSettings } from "../settings";
import { KafkaExplorer } from "../explorer";
import { addClusterWizard } from "../wizards/clusters";
Expand Down Expand Up @@ -100,10 +100,9 @@ export class DumpClusterMetadataCommandHandler {
}

async execute(): Promise<void> {
const client = this.clientAccessor.getSelectedClusterClient();
const client = await pickClient(this.clientAccessor);

if (!client) {
vscode.window.showInformationMessage("No selected cluster");
return;
}

Expand Down
41 changes: 27 additions & 14 deletions src/commands/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ import { Broker, Topic, ClientAccessor, Cluster, Client } from "../client";
import { CommonMessages } from "../constants";
import { ClusterSettings } from "../settings";

export async function pickClient(clientAccessor: ClientAccessor, clusterId?: string) : Promise<Client| undefined> {
let client: Client | undefined = undefined;

if (clusterId) {
client = clientAccessor.get(clusterId);
} else {
client = clientAccessor.getSelectedClusterClient();
}

if (!client) {
CommonMessages.showNoSelectedCluster();
}
return client;
}
export async function pickCluster(clusterSettings: ClusterSettings): Promise<Cluster | undefined> {
const clusters = clusterSettings.getAll();

Expand All @@ -19,17 +33,6 @@ export async function pickCluster(clusterSettings: ClusterSettings): Promise<Clu
return pickedCluster?.cluster;
}

export async function pickTopicFromSelectedCluster(clientAccessor: ClientAccessor): Promise<Topic | undefined> {
const client = clientAccessor.getSelectedClusterClient();

if (!client) {
CommonMessages.showNoSelectedCluster();
return;
}

return pickTopic(client);
}

export async function pickTopic(client: Client): Promise<Topic | undefined> {
const topics = await client.getTopics();
const topicQuickPickItems = topics.map((topic) => {
Expand All @@ -45,11 +48,21 @@ export async function pickTopic(client: Client): Promise<Topic | undefined> {
return pickedTopic?.topic;
}

export async function pickBroker(clientAccessor: ClientAccessor): Promise<Broker | undefined> {
const client = clientAccessor.getSelectedClusterClient();
export async function pickConsumerGroupId(client: Client): Promise<string | undefined> {
const groupIds = await client.getConsumerGroupIds();
const groupIdQuickPickItems = groupIds.map((groupId) => {
return {
label: groupId };
});

const pickedGroupId = await vscode.window.showQuickPick(groupIdQuickPickItems);

return pickedGroupId?.label;
}

export async function pickBroker(clientAccessor: ClientAccessor): Promise<Broker | undefined> {
const client = await pickClient(clientAccessor);
if (!client) {
CommonMessages.showNoSelectedCluster();
return;
}

Expand Down
60 changes: 50 additions & 10 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import * as vscode from "vscode";

import { ConsumerCollection, Topic, ClientAccessor } from "../client";
import { pickTopic } from "./common";
import { ClusterSettings } from "../settings";
import { CommonMessages } from "../constants";
import { pickClient, pickConsumerGroupId, pickTopic } from "./common";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";

Expand All @@ -15,30 +13,27 @@ export interface StartConsumerCommand {
export class StartConsumerCommandHandler {
constructor(
private clientAccessor: ClientAccessor,
private clusterSettings: ClusterSettings,
private consumerCollection: ConsumerCollection,
private explorer: KafkaExplorer
) {
}

async execute(startConsumerCommand?: StartConsumerCommand): Promise<void> {
if (!startConsumerCommand) {
const selectedCluster = this.clusterSettings.selected;

if (!selectedCluster) {
CommonMessages.showNoSelectedCluster();
const client = await pickClient(this.clientAccessor);
if (!client) {
return;
}

const topic = await pickTopic(this.clientAccessor.get(selectedCluster.id));
const topic = await pickTopic(client);

if (topic === undefined) {
return;
}

startConsumerCommand = {
topic,
clusterId: selectedCluster.id,
clusterId: client.cluster.id,
};
}

Expand Down Expand Up @@ -153,6 +148,51 @@ export class ListConsumersCommandHandler {

}

export interface DeleteConsumerGroupCommand {
clusterId: string;
consumerGroupId: string;
}

export class DeleteConsumerGroupCommandHandler {

public static COMMAND_ID = 'vscode-kafka.consumer.deletegroup';

constructor(
private clientAccessor: ClientAccessor,
private explorer: KafkaExplorer
) {
}

async execute(command?: DeleteConsumerGroupCommand): Promise<void> {
const client = await pickClient(this.clientAccessor, command?.clusterId);
if (!client) {
return;
}

const consumerGroupToDelete: string | undefined = command?.consumerGroupId || await pickConsumerGroupId(client);
if (!consumerGroupToDelete) {
return;
}
try {
const warning = `Are you sure you want to delete consumer group '${consumerGroupToDelete}'?`;
const deleteConfirmation = await vscode.window.showWarningMessage(warning, 'Cancel', 'Delete');
if (deleteConfirmation !== 'Delete') {
return;
}

await client.deleteConsumerGroups([consumerGroupToDelete]);
this.explorer.refresh();
vscode.window.showInformationMessage(`Consumer group '${consumerGroupToDelete}' deleted successfully`);
} catch (error) {
if (error.message) {
vscode.window.showErrorMessage(error.message);
} else {
vscode.window.showErrorMessage(error);
}
}
}
}

async function openDocument(uri: vscode.Uri): Promise<void> {

const visibleConsumerEditor = vscode.window.visibleTextEditors.find(te => te.document.uri.toString() === uri.toString());
Expand Down
Loading

0 comments on commit 8d2a1a4

Please sign in to comment.