Skip to content

Commit

Permalink
Merge branch 'master' into new_offset_committing_bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m authored Aug 16, 2024
2 parents 67ebd39 + d863e8f commit 9bf86d6
Show file tree
Hide file tree
Showing 47 changed files with 417 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;

import java.util.function.ToDoubleFunction;


public class ConsistencyMetrics {
private final MeterRegistry meterRegistry;

ConsistencyMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;

}

public <T> void registerStorageConsistencyGauge(T stateObject, ToDoubleFunction<T> valueFunction) {
meterRegistry.gauge("storage.consistency", stateObject, valueFunction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class MetricsFacade {
private final OffsetCommitsMetrics offsetCommitsMetrics;
private final MaxRateMetrics maxRateMetrics;
private final BrokerMetrics brokerMetrics;
private final ConsistencyMetrics consistencyMetrics;

public MetricsFacade(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
Expand All @@ -45,6 +46,7 @@ public MetricsFacade(MeterRegistry meterRegistry) {
this.offsetCommitsMetrics = new OffsetCommitsMetrics(meterRegistry);
this.maxRateMetrics = new MaxRateMetrics(meterRegistry);
this.brokerMetrics = new BrokerMetrics(meterRegistry);
this.consistencyMetrics = new ConsistencyMetrics(meterRegistry);
}

public TopicMetrics topics() {
Expand Down Expand Up @@ -107,6 +109,10 @@ public BrokerMetrics broker() {
return brokerMetrics;
}

public ConsistencyMetrics consistency() {
return consistencyMetrics;
}

public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) {
Collection<Meter> meters = Search.in(meterRegistry)
.tags(subscriptionTags(subscription))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public abstract class ZookeeperBasedRepository {

Expand Down Expand Up @@ -75,6 +78,13 @@ protected List<String> childrenOf(String path) {
}
}

protected List<String> childrenPathsOf(String path) {
List<String> childNodes = childrenOf(path);
return childNodes.stream()
.map(child -> ZKPaths.makePath(path, child))
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
protected byte[] readFrom(String path) {
return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get();
Expand Down Expand Up @@ -156,6 +166,20 @@ protected void createInTransaction(String path, Object value, String childPath)
.commit();
}

protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and();

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}

transaction.commit();
}

protected void create(String path, Object value) throws Exception {
ensureConnected();
zookeeper.create().forPath(path, mapper.writeValueAsBytes(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.group.GroupAlreadyExistsException;
import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException;
Expand Down Expand Up @@ -65,14 +66,23 @@ public void updateGroup(Group group) {
}
}

/**
* Atomic removal of <code>group</code> and <code>group/topics</code>
* nodes is required to prevent lengthy loop during removal, see:
* {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)}.
*/
@Override
public void removeGroup(String groupName) {
ensureGroupExists(groupName);
ensureGroupIsEmpty(groupName);

logger.info("Removing group: {}", groupName);
List<String> pathsToDelete = List.of(
paths.topicsPath(groupName),
paths.groupPath(groupName)
);
try {
remove(paths.groupPath(groupName));
deleteInTransaction(pathsToDelete);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe
@Override
public List<MessagePreview> loadPreview(TopicName topicName) {
try {
return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH))
return Optional.of(paths.topicPreviewPath(topicName))
.filter(this::pathExists)
.flatMap(p -> readFrom(p, new TypeReference<List<MessagePreview>>() {}, true))
.orElseGet(ArrayList::new);
Expand All @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List<MessagePreview> messages) {
logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName());
try {
if (pathExists(paths.topicPath(topic))) {
String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH);
String previewPath = paths.topicPreviewPath(topic);
ensurePathExists(previewPath);
overwrite(previewPath, messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail);
}

public String topicPreviewPath(TopicName topicName) {
return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH);
}

public String topicMetricsPath(TopicName topicName) {
return topicPath(topicName, METRICS_PATH);
}

public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -77,12 +78,67 @@ public void createTopic(Topic topic) {
}
}

/**
* To remove topic node, we must remove topic node and its children. The tree looks like this:
* <ul>
* <li>- topic
* <li>----- /subscriptions (required)
* <li>----- /preview (optional)
* <li>----- /metrics (optional)
* <li>--------------- /volume
* <li>--------------- /published
* </ul>
*
* <p>One way to remove the whole tree for topic that would be to use <code>deletingChildrenIfNeeded()</code>:
* e.g. <code>zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath)</code>.
* However, <code>deletingChildrenIfNeeded</code> is not atomic. It first tries to remove the node <code>topic</code>
* and upon receiving <code>KeeperException.NotEmptyException</code> it tries to remove children recursively
* and then retries the node removal. This means that there is a potentially large time gap between
* removal of <code>topic/subscriptions</code> node and <code>topic</code> node, especially when topic removal is being done
* in remote DC.
*
* <p>It turns out that <code>PathChildrenCache</code> used by <code>HierarchicalCacheLevel</code> in
* Consumers and Frontend listens for <code>topics/subscriptions</code> changes and recreates that node when deleted.
* If the recreation happens between the <code>topic/subscriptions</code> and <code>topic</code> node removal
* than the whole removal process must be repeated resulting in a lengthy loop that may even result in <code>StackOverflowException</code>.
* Example of that scenario would be
* <ol>
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> DELETE <code>topic/subscriptions</code> - issued by management, succeeds
* <li> CREATE <code>topic/subscriptions</code> - issued by frontend, succeeds
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> [...]
* </ol>
*
* <p>To solve this we must remove <code>topic</code> and <code>topic/subscriptions</code> atomically. However, we must also remove
* other <code>topic</code> children. Transaction API does not allow for optional deletes so we:
* <ol>
* <li> find all children paths
* <li> delete all children in one transaction
* </ol>
*/
@Override
public void removeTopic(TopicName topicName) {
ensureTopicExists(topicName);
logger.info("Removing topic: " + topicName);

List<String> pathsForRemoval = new ArrayList<>();
String topicMetricsPath = paths.topicMetricsPath(topicName);
if (pathExists(topicMetricsPath)) {
pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath));
pathsForRemoval.add(topicMetricsPath);
}

String topicPreviewPath = paths.topicPreviewPath(topicName);
if (pathExists(topicPreviewPath)) {
pathsForRemoval.add(topicPreviewPath);
}

pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

try {
remove(paths.topicPath(topicName));
deleteInTransaction(pathsForRemoval);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, 'remove'))
}

def "should remove topic with metrics but without subscriptions"() {
def "should remove topic with metrics and without preview"() {
given:
def topicName = "topicWithMetrics"

repository.createTopic(topic(GROUP, topicName).build())
wait.untilTopicCreated(GROUP, topicName)

def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext()
def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext()
.withGroup(GROUP)
.withTopic(topicName)
.withSubscription("sample")
.build())
zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes)
wait.untilZookeeperPathIsCreated(path)
Expand All @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, topicName))
}

def "should remove topic with metrics and preview"() {
given: "a topic"
Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build()
repository.createTopic(topic)
wait.untilTopicCreated(GROUP, topic.getName().getName())

and: "volume metric in zk for that topic"
String metricPath = paths.topicMetricPath(topic.getName(), "volume")
zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes)
wait.untilZookeeperPathIsCreated(metricPath)

and: "preview in zk for that topic"
String previewPath = paths.topicPreviewPath(topic.getName())
zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes)
wait.untilZookeeperPathIsCreated(previewPath)

when:
repository.removeTopic(topic.getName())

then:
!repository.topicExists(topic.getName())
}

def "should not throw exception on malformed topic when reading list of all topics"() {
given:
zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public void tearDown() {
@Override
public void updateSubscription(Subscription subscription) {
this.subscription = subscription;
receiver.updateSubscription(subscription);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import org.springframework.boot.context.properties.ConfigurationProperties;

import java.time.Duration;

@ConfigurationProperties(prefix = "consistency-checker")
public class ConsistencyCheckerProperties {

private int threadPoolSize = 2;
private boolean periodicCheckEnabled = false;
private Duration refreshInterval = Duration.ofMinutes(15);
private Duration initialRefreshDelay = Duration.ofMinutes(2);

public int getThreadPoolSize() {
return threadPoolSize;
Expand All @@ -14,4 +19,30 @@ public int getThreadPoolSize() {
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}


public boolean isPeriodicCheckEnabled() {
return periodicCheckEnabled;
}

public void setPeriodicCheckEnabled(boolean periodicCheckEnabled) {
this.periodicCheckEnabled = periodicCheckEnabled;
}


public Duration getRefreshInterval() {
return refreshInterval;
}

public void setRefreshInterval(Duration refreshInterval) {
this.refreshInterval = refreshInterval;
}

public Duration getInitialRefreshDelay() {
return initialRefreshDelay;
}

public void setInitialRefreshDelay(Duration initialRefreshDelay) {
this.initialRefreshDelay = initialRefreshDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch
AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
BrokerStorage storage = brokersStorage(brokerAdminClient);
BrokerTopicManagement brokerTopicManagement =
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper);
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper, kafkaProperties.getDatacenter());
KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBrokerList());
KafkaRawMessageReader kafkaRawMessageReader =
new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void execute(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> ho
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder, Exception exception) {
holder.getRepository().remove(qualifiedTopicName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void execute(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> ho
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder, Exception exception) {
if (exists) {
holder.getRepository().add(qualifiedTopicName);
}
Expand Down
Loading

0 comments on commit 9bf86d6

Please sign in to comment.