Skip to content

Commit

Permalink
Refresh Cluster Provider API when extension are installed/uninstalled
Browse files Browse the repository at this point in the history
Fixes jlandersen#137

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed May 12, 2021
1 parent c66fc7d commit c70728f
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to `Tools for Apache Kafka®` are documented in this file.
- Simplify snippets. See [#180](https://github.com/jlandersen/vscode-kafka/pull/180).
- Hover support in `.kafka` files. See [#149](https://github.com/jlandersen/vscode-kafka/issues/149).
- String encoding serialization support. See [#181](https://github.com/jlandersen/vscode-kafka/issues/181).
- Refresh Cluster Provider API when extensions are installed/uninstalled. See [#137](https://github.com/jlandersen/vscode-kafka/issues/137).

## [0.12.0] - 2021-04-26
### Added
Expand Down
37 changes: 32 additions & 5 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,39 +212,66 @@ class KafkaJsClient implements Client {

// Promise which returns the KafkaJsClient instance when it is ready.
private kafkaPromise: Promise<KafkaJsClient>;


private error: undefined;

constructor(public readonly cluster: Cluster, workspaceSettings: WorkspaceSettings) {
this.metadata = {
brokers: [],
topics: [],
};
// The Kafka client is created in asynchronous since external vscode extension
// can contribute to the creation of Kafka instance.
this.kafkaPromise = createKafka(cluster)
this.kafkaPromise = this.createKafkaPromise();
}

private async createKafkaPromise(): Promise<KafkaJsClient> {
return createKafka(this.cluster)
.then(result => {
this.error = undefined;
this.kafkaJsClient = result;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.kafkaProducer = this.kafkaJsClient.producer();
return this;
}, (error) => {
// Error while create of Kafka client (ex : cluster provider is not available)
this.error = error;
return this;
});
}

private async getKafkaPromise(): Promise<KafkaJsClient> {
if (this.error) {
// This case comes from when a client is created with cluster provider id which is not available
// we try to recreate the client (when the proper extension is installed, the client will able to create)
this.kafkaPromise = this.createKafkaPromise();
}
return this.kafkaPromise;
}

public get state(): ClientState {
return ClientState.disconnected;
}

private async getkafkaClient(): Promise<Kafka> {
const client = (await this.kafkaPromise).kafkaJsClient;
const promise = (await this.getKafkaPromise());
const client = promise.kafkaJsClient;
if (!client) {
if (promise.error) {
throw promise.error;
}
throw new Error('Kafka client cannot be null.');
}
return client;
}

private async getkafkaAdminClient(): Promise<Admin> {
const admin = (await this.kafkaPromise).kafkaAdminClient;
const promise = (await this.getKafkaPromise());
const admin = promise.kafkaAdminClient;
if (!admin) {
if (promise.error) {
throw promise.error;
}
throw new Error('Kafka Admin cannot be null.');
}
return admin;
Expand Down Expand Up @@ -407,7 +434,7 @@ class KafkaJsClient implements Client {
this.kafkaAdminClient.disconnect();
}
}

}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
Expand Down
23 changes: 20 additions & 3 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,32 @@ export class ClientAccessor implements Disposable {
}

const client = this.get(clusterId);
this.changeState(client, ClientState.disconnecting);
client.dispose();
this.changeState(client, ClientState.disconnected);
delete this.clientsById[clusterId];
}

public dispose(): void {
public dispose(clusterProviderIds?: string[]): void {
for (const clusterId of Object.keys(this.clientsById)) {
this.clientsById[clusterId].dispose();
delete this.clientsById[clusterId];
if (this.shouldBeDisposed(clusterId, clusterProviderIds)) {
this.remove(clusterId);
}
}
}

private shouldBeDisposed(clusterId: string, clusterProviderIds?: string[] | undefined): boolean {
if (!clusterProviderIds) {
return true;
}
if (this.has(clusterId)) {
const clusterProviderId = this.get(clusterId).cluster.clusterProviderId;
if (!clusterProviderId) {
return true;
}
return clusterProviderIds.indexOf(clusterProviderId) !== -1;
}
return true;
}

public static getInstance(): ClientAccessor {
Expand Down
9 changes: 8 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { SelectedClusterStatusBarItem } from "./views/selectedClusterStatusBarIt
import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry";
import { getDefaultKafkaExtensionParticipant, refreshClusterProviderDefinitions } from "./kafka-extensions/registry";
import { KafkaExtensionParticipant } from "./kafka-extensions/api";
import { ProducerCollection } from "./client/producer";
import { startLanguageClient } from "./kafka-file/kafkaFileClient";
Expand Down Expand Up @@ -151,6 +151,13 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)
);

// Refresh cluster provider participant when a vscode extension is installed/uninstalled
if (vscode.extensions.onDidChange) {// Theia doesn't support this API yet
context.subscriptions.push(vscode.extensions.onDidChange(() => {
refreshClusterProviderDefinitions();
}));
}

return getDefaultKafkaExtensionParticipant();
}

Expand Down
27 changes: 21 additions & 6 deletions src/kafka-extensions/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Cluster, ConnectionOptions, createDefaultKafkaConfig as createDefaultKa
import { ClusterSettings } from "../settings/clusters";
import { configureDefaultClusters } from "../wizards/clusters";
import { ClusterProviderParticipant, KafkaExtensionParticipant } from "./api";
import { ClientAccessor } from "../client";

/**
* Cluster provider is used to:
Expand Down Expand Up @@ -97,12 +98,12 @@ const defaultClusterProviderId = 'vscode-kafka.manual';
let providers: Map<string, ClusterProvider> = new Map();

export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined {
intializeIfNeeded();
initializeIfNeeded();
return providers.get(clusterProviderId || defaultClusterProviderId);
}

export function getClusterProviders(): ClusterProvider[] {
intializeIfNeeded();
initializeIfNeeded();
// "Configure manually" provider must be the first
const manual = getClusterProvider(defaultClusterProviderId);
// Other providers must be sorted by name ascending
Expand All @@ -121,16 +122,30 @@ function sortByNameAscending(a: ClusterProvider, b: ClusterProvider): -1 | 0 | 1
return 0;
}

function intializeIfNeeded() {
function initializeIfNeeded() {
if (providers.size === 0) {
providers = collectClusterProviderDefinitions(vscode.extensions.all);
refreshClusterProviderDefinitions();
}
}

export interface ClusterProviderDefinition {
id: string;
name?: string;
}
export function refreshClusterProviderDefinitions() {
const oldClusterProviderIds = Array.from(providers.keys());
providers = collectClusterProviderDefinitions(vscode.extensions.all);
const newClusterProviderIds = Array.from(providers.keys());

// Disconnect all kafka client linked to a cluster provider id coming from an installed/uninstalled extension
const oldIdsToDispose = oldClusterProviderIds.filter(id => !newClusterProviderIds.includes(id));
const newIdsToDispose = newClusterProviderIds.filter(id => !oldClusterProviderIds.includes(id));
const allIdsToDispose = [...oldIdsToDispose, ...newIdsToDispose];
if (allIdsToDispose.length > 0) {
const toDispose = [...new Set(allIdsToDispose)];
ClientAccessor.getInstance().dispose(toDispose);
}
}

/**
* Collect cluster providers defined in package.json (see vscode-kafka which implements default cluster provider with 'Manual' wizard.)
Expand All @@ -141,7 +156,7 @@ export interface ClusterProviderDefinition {
* "clusterProviders": [
* {
* "id": "vscode-kafka.manual",
* "name": "Manual"
"name": "Configure manually"
* }
* ]
* }
Expand All @@ -151,7 +166,7 @@ export interface ClusterProviderDefinition {
*
* @returns the map of cluster providers.
*/
export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension<any>[]): Map<string, ClusterProvider> {
function collectClusterProviderDefinitions(extensions: readonly vscode.Extension<any>[]): Map<string, ClusterProvider> {
const result: Map<string, ClusterProvider> = new Map();
if (extensions && extensions.length) {
for (const extension of extensions) {
Expand Down

0 comments on commit c70728f

Please sign in to comment.