From c70728f0c209b4176c9b8cead34d29a4212134de Mon Sep 17 00:00:00 2001 From: angelozerr Date: Thu, 6 May 2021 20:26:00 +0200 Subject: [PATCH] Refresh Cluster Provider API when extension are installed/uninstalled Fixes #137 Signed-off-by: azerr --- CHANGELOG.md | 1 + src/client/client.ts | 37 +++++++++++++++++++++++++++----- src/client/index.ts | 23 +++++++++++++++++--- src/extension.ts | 9 +++++++- src/kafka-extensions/registry.ts | 27 +++++++++++++++++------ 5 files changed, 82 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51b1c988..d8210684 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to `Tools for Apache Kafka®` are documented in this file. - Simplify snippets. See [#180](https://github.com/jlandersen/vscode-kafka/pull/180). - Hover support in `.kafka` files. See [#149](https://github.com/jlandersen/vscode-kafka/issues/149). - String encoding serialization support. See [#181](https://github.com/jlandersen/vscode-kafka/issues/181). +- Refresh Cluster Provider API when extensions are installed/uninstalled. See [#137](https://github.com/jlandersen/vscode-kafka/issues/137). ## [0.12.0] - 2021-04-26 ### Added diff --git a/src/client/client.ts b/src/client/client.ts index c56c0667..809eb3af 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -212,7 +212,9 @@ class KafkaJsClient implements Client { // Promise which returns the KafkaJsClient instance when it is ready. private kafkaPromise: Promise; - + + private error: undefined; + constructor(public readonly cluster: Cluster, workspaceSettings: WorkspaceSettings) { this.metadata = { brokers: [], @@ -220,31 +222,56 @@ class KafkaJsClient implements Client { }; // The Kafka client is created in asynchronous since external vscode extension // can contribute to the creation of Kafka instance. - this.kafkaPromise = createKafka(cluster) + this.kafkaPromise = this.createKafkaPromise(); + } + + private async createKafkaPromise(): Promise { + return createKafka(this.cluster) .then(result => { + this.error = undefined; this.kafkaJsClient = result; this.kafkaAdminClient = this.kafkaJsClient.admin(); this.kafkaProducer = this.kafkaJsClient.producer(); return this; + }, (error) => { + // Error while create of Kafka client (ex : cluster provider is not available) + this.error = error; + return this; }); } + private async getKafkaPromise(): Promise { + if (this.error) { + // This case comes from when a client is created with cluster provider id which is not available + // we try to recreate the client (when the proper extension is installed, the client will able to create) + this.kafkaPromise = this.createKafkaPromise(); + } + return this.kafkaPromise; + } public get state(): ClientState { return ClientState.disconnected; } private async getkafkaClient(): Promise { - const client = (await this.kafkaPromise).kafkaJsClient; + const promise = (await this.getKafkaPromise()); + const client = promise.kafkaJsClient; if (!client) { + if (promise.error) { + throw promise.error; + } throw new Error('Kafka client cannot be null.'); } return client; } private async getkafkaAdminClient(): Promise { - const admin = (await this.kafkaPromise).kafkaAdminClient; + const promise = (await this.getKafkaPromise()); + const admin = promise.kafkaAdminClient; if (!admin) { + if (promise.error) { + throw promise.error; + } throw new Error('Kafka Admin cannot be null.'); } return admin; @@ -407,7 +434,7 @@ class KafkaJsClient implements Client { this.kafkaAdminClient.disconnect(); } } - + } export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator( diff --git a/src/client/index.ts b/src/client/index.ts index aed5ebad..c78ee386 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -79,15 +79,32 @@ export class ClientAccessor implements Disposable { } const client = this.get(clusterId); + this.changeState(client, ClientState.disconnecting); client.dispose(); + this.changeState(client, ClientState.disconnected); delete this.clientsById[clusterId]; } - public dispose(): void { + public dispose(clusterProviderIds?: string[]): void { for (const clusterId of Object.keys(this.clientsById)) { - this.clientsById[clusterId].dispose(); - delete this.clientsById[clusterId]; + if (this.shouldBeDisposed(clusterId, clusterProviderIds)) { + this.remove(clusterId); + } + } + } + + private shouldBeDisposed(clusterId: string, clusterProviderIds?: string[] | undefined): boolean { + if (!clusterProviderIds) { + return true; + } + if (this.has(clusterId)) { + const clusterProviderId = this.get(clusterId).cluster.clusterProviderId; + if (!clusterProviderId) { + return true; + } + return clusterProviderIds.indexOf(clusterProviderId) !== -1; } + return true; } public static getInstance(): ClientAccessor { diff --git a/src/extension.ts b/src/extension.ts index f08ecbc1..b9c3a717 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -33,7 +33,7 @@ import { SelectedClusterStatusBarItem } from "./views/selectedClusterStatusBarIt import { NodeBase } from "./explorer/models/nodeBase"; import * as path from 'path'; import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; -import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry"; +import { getDefaultKafkaExtensionParticipant, refreshClusterProviderDefinitions } from "./kafka-extensions/registry"; import { KafkaExtensionParticipant } from "./kafka-extensions/api"; import { ProducerCollection } from "./client/producer"; import { startLanguageClient } from "./kafka-file/kafkaFileClient"; @@ -151,6 +151,13 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider) ); + // Refresh cluster provider participant when a vscode extension is installed/uninstalled + if (vscode.extensions.onDidChange) {// Theia doesn't support this API yet + context.subscriptions.push(vscode.extensions.onDidChange(() => { + refreshClusterProviderDefinitions(); + })); + } + return getDefaultKafkaExtensionParticipant(); } diff --git a/src/kafka-extensions/registry.ts b/src/kafka-extensions/registry.ts index b6e98023..0b019d95 100644 --- a/src/kafka-extensions/registry.ts +++ b/src/kafka-extensions/registry.ts @@ -4,6 +4,7 @@ import { Cluster, ConnectionOptions, createDefaultKafkaConfig as createDefaultKa import { ClusterSettings } from "../settings/clusters"; import { configureDefaultClusters } from "../wizards/clusters"; import { ClusterProviderParticipant, KafkaExtensionParticipant } from "./api"; +import { ClientAccessor } from "../client"; /** * Cluster provider is used to: @@ -97,12 +98,12 @@ const defaultClusterProviderId = 'vscode-kafka.manual'; let providers: Map = new Map(); export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined { - intializeIfNeeded(); + initializeIfNeeded(); return providers.get(clusterProviderId || defaultClusterProviderId); } export function getClusterProviders(): ClusterProvider[] { - intializeIfNeeded(); + initializeIfNeeded(); // "Configure manually" provider must be the first const manual = getClusterProvider(defaultClusterProviderId); // Other providers must be sorted by name ascending @@ -121,9 +122,9 @@ function sortByNameAscending(a: ClusterProvider, b: ClusterProvider): -1 | 0 | 1 return 0; } -function intializeIfNeeded() { +function initializeIfNeeded() { if (providers.size === 0) { - providers = collectClusterProviderDefinitions(vscode.extensions.all); + refreshClusterProviderDefinitions(); } } @@ -131,6 +132,20 @@ export interface ClusterProviderDefinition { id: string; name?: string; } +export function refreshClusterProviderDefinitions() { + const oldClusterProviderIds = Array.from(providers.keys()); + providers = collectClusterProviderDefinitions(vscode.extensions.all); + const newClusterProviderIds = Array.from(providers.keys()); + + // Disconnect all kafka client linked to a cluster provider id coming from an installed/uninstalled extension + const oldIdsToDispose = oldClusterProviderIds.filter(id => !newClusterProviderIds.includes(id)); + const newIdsToDispose = newClusterProviderIds.filter(id => !oldClusterProviderIds.includes(id)); + const allIdsToDispose = [...oldIdsToDispose, ...newIdsToDispose]; + if (allIdsToDispose.length > 0) { + const toDispose = [...new Set(allIdsToDispose)]; + ClientAccessor.getInstance().dispose(toDispose); + } +} /** * Collect cluster providers defined in package.json (see vscode-kafka which implements default cluster provider with 'Manual' wizard.) @@ -141,7 +156,7 @@ export interface ClusterProviderDefinition { * "clusterProviders": [ * { * "id": "vscode-kafka.manual", - * "name": "Manual" + "name": "Configure manually" * } * ] * } @@ -151,7 +166,7 @@ export interface ClusterProviderDefinition { * * @returns the map of cluster providers. */ -export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension[]): Map { +function collectClusterProviderDefinitions(extensions: readonly vscode.Extension[]): Map { const result: Map = new Map(); if (extensions && extensions.length) { for (const extension of extensions) {