Skip to content

Commit

Permalink
Filtered topics are considered invalid
Browse files Browse the repository at this point in the history
Fixes jlandersen#177

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed May 12, 2021
1 parent 20ef6c2 commit edd4597
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 61 deletions.
38 changes: 37 additions & 1 deletion src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import * as minimatch from "minimatch";
import { Admin, ConfigResourceTypes, Consumer, ConsumerConfig, Kafka, KafkaConfig, Producer, SeekEntry } from "kafkajs";

import { Disposable } from "vscode";
import { ClientAccessor, ClientState } from ".";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";
import { getWorkspaceSettings, WorkspaceSettings } from "../settings";
import { TopicSortOption } from "../settings/workspace";

export interface ConnectionOptions {
clusterProviderId?: string;
Expand Down Expand Up @@ -471,3 +473,37 @@ export function addQueryParameter(query: string, name: string, value?: string):
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}

export function isVisible(t: Topic): boolean {
const settings = getWorkspaceSettings();
const filters = settings.topicFilters;
if (!filters) {
return true;
}
const id = t.id.toLowerCase();
return !filters.find(f => minimatch(id, f));
}

export function sortTopics(topics: Topic[]): Topic[] {
const settings = getWorkspaceSettings();
switch (settings.topicSortOption) {
case TopicSortOption.name:
topics = topics.sort(sortByNameAscending);
break;
case TopicSortOption.partitions:
topics = topics.sort(sortByPartitionsAscending);
}
return topics;
}

function sortByNameAscending(a: Topic, b: Topic): -1 | 0 | 1 {
if (a.id.toLowerCase() < b.id.toLowerCase()) { return -1; }
if (a.id.toLowerCase() > b.id.toLowerCase()) { return 1; }
return 0;
}

function sortByPartitionsAscending(a: Topic, b: Topic): -1 | 0 | 1 {
if (a.partitionCount < b.partitionCount) { return -1; }
if (a.partitionCount > b.partitionCount) { return 1; }
return 0;
}
43 changes: 6 additions & 37 deletions src/explorer/models/topics.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import * as vscode from "vscode";

import { Topic, TopicPartition } from "../../client";
import { isVisible, sortTopics, Topic, TopicPartition } from "../../client";
import { Icons } from "../../constants";
import { getWorkspaceSettings } from "../../settings";
import { TopicSortOption } from "../../settings/workspace";
import { ClusterItem } from "./cluster";
import { ConfigsItem } from "./common";
import { NodeBase } from "./nodeBase";
import * as minimatch from "minimatch";

export class TopicGroupItem extends NodeBase {
public label = "Topics";
Expand All @@ -20,46 +17,18 @@ export class TopicGroupItem extends NodeBase {

public async computeChildren(): Promise<NodeBase[]> {
const client = this.getParent().client;
const settings = getWorkspaceSettings();
let topics = await client.getTopics();
const allTopics = await client.getTopics();
//Filter topics before sorting them
topics = topics.filter(t => this.isDisplayed(t, settings.topicFilters));

switch (settings.topicSortOption) {
case TopicSortOption.name:
topics = topics.sort(this.sortByNameAscending);
break;
case TopicSortOption.partitions:
topics = topics.sort(this.sortByPartitionsAscending);
break;
}
return topics.map((topic) => {
let visibleTopics = allTopics.filter(t => isVisible(t));
visibleTopics = sortTopics(visibleTopics);
return visibleTopics.map((topic) => {
return new TopicItem(topic, this);
});
}

getParent(): ClusterItem {
return <ClusterItem>super.getParent();
}

private sortByNameAscending(a: Topic, b: Topic): -1 | 0 | 1 {
if (a.id.toLowerCase() < b.id.toLowerCase()) { return -1; }
if (a.id.toLowerCase() > b.id.toLowerCase()) { return 1; }
return 0;
}

private sortByPartitionsAscending(a: Topic, b: Topic): -1 | 0 | 1 {
if (a.partitionCount < b.partitionCount) { return -1; }
if (a.partitionCount > b.partitionCount) { return 1; }
return 0;
}

private isDisplayed(t: Topic, filters: string[]): boolean {
if (!filters) {
return true;
}
const id = t.id.toLowerCase();
return !filters.find(f => minimatch(id, f));
}
}

export class TopicItem extends NodeBase {
Expand Down
48 changes: 25 additions & 23 deletions src/kafka-file/kafkaFileClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,46 @@ import { ConsumerLaunchStateProvider, getLanguageService, LanguageService, Produ
import { runSafeAsync } from "./utils/runner";
import { ThrottledDelayer } from "./utils/async";
import { WorkspaceSettings } from "../settings";
import { ClientAccessor } from "../client";
import { ClientAccessor, isVisible, sortTopics, Topic } from "../client";
import { BrokerConfigs } from "../client/config";
import { KafkaModelProvider } from "../explorer/models/kafka";
import { ClusterItem } from "../explorer/models/cluster";

class ClusterInfo {

private topics?: TopicDetail[];
private allTopics?: Topic[];
private filteredTopics?: TopicDetail[];

private autoCreateTopicEnabled?: BrokerConfigs.AutoCreateTopicResult;

constructor(public readonly cluster?: ClusterItem, public readonly error?: any) {
}

async getTopics(): Promise<TopicDetail[]> {
if (this.topics) {
return this.topics;
}
try {
this.topics = await this.loadTopics();
}
catch (e) {
this.topics = [];
if (!this.filteredTopics) {
try {
this.allTopics = await this.loadTopics();
}
catch (e) {
this.allTopics = [];
}
const visibleTopics = this.allTopics
.filter(topic => isVisible(topic));
this.filteredTopics = sortTopics(visibleTopics);
}
return this.topics;
return this.filteredTopics;
}

private async loadTopics(): Promise<TopicDetail[]> {
private async loadTopics(): Promise<Topic[]> {
if (this.cluster) {
return (await this.cluster.getTopics())
.map(child => (child.topic));
return await this.cluster.client.getTopics();
}
return [];
}

async getTopic(topicId: string): Promise<TopicDetail | undefined> {
const topics = await this.getTopics();
return topics.find(topic => topic.id === topicId);
await this.getTopics();
const topics = this.allTopics;
return topics?.find(topic => topic.id === topicId);
}

async getAutoCreateTopicEnabled(): Promise<BrokerConfigs.AutoCreateTopicResult> {
Expand All @@ -61,7 +63,7 @@ class ClusterInfo {
if (this.cluster) {
return await BrokerConfigs.getAutoCreateTopicEnabled(this.cluster.client);
}
return {type : "unknown"};
return { type: "unknown" };
}
}

Expand Down Expand Up @@ -112,7 +114,7 @@ class DataModelTopicProvider implements TopicProvider {

export function startLanguageClient(
clusterSettings: ClusterSettings,
clientAccessor : ClientAccessor,
clientAccessor: ClientAccessor,
workspaceSettings: WorkspaceSettings,
producerCollection: ProducerCollection,
consumerCollection: ConsumerCollection,
Expand Down Expand Up @@ -199,7 +201,7 @@ export function startLanguageClient(
};
}

function createLanguageService(clusterSettings: ClusterSettings, clientAccessor : ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService {
function createLanguageService(clusterSettings: ClusterSettings, clientAccessor: ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService {
const producerLaunchStateProvider = {
getProducerLaunchState(uri: vscode.Uri): ProducerLaunchState {
const producer = producerCollection.get(uri);
Expand All @@ -217,7 +219,7 @@ function createLanguageService(clusterSettings: ClusterSettings, clientAccessor
const selectedClusterProvider = {
getSelectedCluster() {
const selected = clusterSettings.selected;
const clusterId = selected?.id;
const clusterId = selected?.id;
const clusterState = clusterId ? clientAccessor.getState(clusterId) : undefined;
return {
clusterId,
Expand Down Expand Up @@ -298,7 +300,7 @@ class KafkaFileDiagnostics extends AbstractKafkaFileFeature implements vscode.Di
kafkaFileDocuments: LanguageModelCache<KafkaFileDocument>,
languageService: LanguageService,
clusterSettings: ClusterSettings,
clientAccessor : ClientAccessor,
clientAccessor: ClientAccessor,
modelProvider: KafkaModelProvider,
settings: WorkspaceSettings
) {
Expand Down Expand Up @@ -371,7 +373,7 @@ class KafkaFileHoverProvider extends AbstractKafkaFileFeature implements vscode.
return runSafeAsync(async () => {
const kafkaFileDocument = this.getKafkaFileDocument(document);
return this.languageService.doHover(document, kafkaFileDocument, position);
}, null, `Error while computing hover for ${document.uri}`, token);
}, null, `Error while computing hover for ${document.uri}`, token);
}

}

0 comments on commit edd4597

Please sign in to comment.