diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f4732cad38040..b55c9490242e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -159,7 +159,7 @@ protected CompletableFuture internalCreateNamespace(Policies policies) { return CompletableFuture.completedFuture(null); }) .thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies)) - .thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName)); + .thenAccept(__ -> log.info("[{}] Created namespace {} with policies {}", clientAppId(), namespaceName, policies)); } protected CompletableFuture> internalGetListOfTopics(Policies policies, @@ -215,6 +215,8 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie } private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes, @Nonnull CompletableFuture callback) { + log.info("internalRetryableDeleteNamespaceAsync0: namespace={}, force={}, retryTimes={}", + namespaceName, force, retryTimes); precheckWhenDeleteNamespace(namespaceName, force) .thenCompose(policies -> { final CompletableFuture> topicsFuture; @@ -267,13 +269,16 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime } if (!force) { if (hasNonSystemTopic) { - throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); + log.info("Namespace {} has non-system topics, force delete is required. User topics: {}, user partitioned topics: {}", + namespaceName, allUserCreatedTopics, allUserCreatedPartitionTopics); + throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace " + namespaceName); } } final CompletableFuture markDeleteFuture; if (policies != null && policies.deleted) { markDeleteFuture = CompletableFuture.completedFuture(null); } else { + log.info("Marking the namespace {} as deleted", namespaceName); markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> { old.deleted = true; return old; @@ -298,6 +303,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime .thenCompose(bundles -> FutureUtil.waitForAll(bundles.getBundles().stream() .map(bundle -> pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle) .thenCompose(present -> { + log.info("Delete namespace {} check bundle {} ownership present: {}", + namespaceName, bundle, present); // check if the bundle is owned by any broker, // if not then we do not need to delete the bundle if (present) { @@ -309,10 +316,15 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime clientAppId(), ex); return FutureUtil.failedFuture(ex); } + log.info("Deleting namespace bundle {}/{} (force: {})", + namespaceName, bundle.getBundleRange(), force); return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), force); + } else { + log.info("Bundle {}/{} is not owned by any broker, no need to delete", + namespaceName, bundle); + return CompletableFuture.completedFuture(null); } - return CompletableFuture.completedFuture(null); }) ).collect(Collectors.toList()))) .thenCompose(ignore -> internalClearZkSources()) @@ -321,8 +333,11 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime final Throwable rc = FutureUtil.unwrapCompletionException(error); if (rc instanceof MetadataStoreException) { if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { - log.info("[{}] There are in-flight topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); + KeeperException.NotEmptyException notEmptyException = (KeeperException.NotEmptyException) rc.getCause(); + log.warn("[{}] There are in-flight topics created during the namespace deletion or some bundle is still owned, " + + "retry to delete the namespace again. Path is {}", namespaceName, + notEmptyException.getPath()); + final int next = retryTimes - 1; if (next > 0) { // async recursive @@ -330,7 +345,7 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime } else { callback.completeExceptionally( new RestException(Status.CONFLICT, "The broker still have in-flight topics" - + " created during namespace deletion, please try again.")); + + " created during namespace deletion or some bundle is still owned, please try again.")); // drop out recursive } return; @@ -363,6 +378,7 @@ private CompletableFuture internalDeletePartitionedTopicsAsync(Set } private CompletableFuture internalDeleteTopicsAsync(Set topicNames) { + log.info("internalDeleteTopicsAsync: {}", topicNames); if (CollectionUtils.isEmpty(topicNames)) { return CompletableFuture.completedFuture(null); } @@ -453,6 +469,7 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns // clear zk-node resources for deleting namespace protected CompletableFuture internalClearZkSources() { + log.info("internalClearZkSources: {}", namespaceName); // clear resource of `/namespace/{namespaceName}` for zk-node return namespaceResources().deleteNamespaceAsync(namespaceName) .thenCompose(ignore -> namespaceResources().getPartitionedTopicResources() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 0033abf36c78c..8ebc0db876fdf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -208,11 +208,11 @@ public CompletableFuture tryAcquiringOwnership(Namespace */ public CompletableFuture removeOwnership(NamespaceBundle bundle) { ResourceLock lock = locallyAcquiredLocks.remove(bundle); + log.info("Removing ownership of {} (current lock is {})", bundle, lock); if (lock == null) { // We don't own the specified bundle anymore return CompletableFuture.completedFuture(null); } - return lock.release(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 71f78e21f938f..6fb4a8233931b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -315,7 +315,16 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name return CompletableFuture.completedFuture(null); } synchronized (this) { - if (readerCaches.get(namespace) != null) { + CompletableFuture> readerCompletableFuture = + readerCaches.get(namespace); + boolean readerCachedAndNotFailed = readerCompletableFuture != null + && readerCompletableFuture.isDone() + && !readerCompletableFuture.isCompletedExceptionally(); + boolean completedExceptionally = readerCompletableFuture != null + && readerCompletableFuture.isCompletedExceptionally(); + log.info("[{}] addOwnedNamespaceBundleAsync, readerCachedAndNotFailed: {}, completedExceptionally: {}", namespace, + readerCachedAndNotFailed, completedExceptionally); + if (readerCachedAndNotFailed) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); return CompletableFuture.completedFuture(null); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 2f437962002a3..1aa9346fc1070 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -898,10 +898,9 @@ public static CompletableFuture checkLocalOrGetPeerReplicationC if (policiesResult.isPresent()) { Policies policies = policiesResult.get(); if (!allowDeletedNamespace && policies.deleted) { - String msg = String.format("Namespace %s is deleted", namespace.toString()); + String msg = String.format("Namespace %s is marked as deleted", namespace.toString()); log.warn(msg); - validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, - "Namespace is deleted")); + validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg)); } else if (policies.replication_clusters.isEmpty()) { String msg = String.format( "Namespace does not have any clusters configured : local_cluster=%s ns=%s", diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b9051a7dc7df4..1f11e67be51b6 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -91,6 +91,7 @@ public CompletableFuture>> asyncReload( if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) { return readValueFromStore(key); } else { + log.error("Cannot refresh cache item for key {} because we're not connected to the metadata store", key); // Do not try to refresh the cache item if we know that we're not connected to the // metadata store return CompletableFuture.completedFuture(oldValue); @@ -216,6 +217,7 @@ public CompletableFuture create(String path, T value) { } CompletableFuture future = new CompletableFuture<>(); + log.info("Creating path {} with value {}", path, value); store.put(path, content, Optional.of(-1L)) .thenAccept(stat -> { // Make sure we have the value cached before the operation is completed diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java index 93c994b2436b9..59bd3f43d88ff 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java @@ -77,6 +77,8 @@ public synchronized CompletableFuture updateValue(T newValue) { return sequencer.sequential(() -> { synchronized (ResourceLockImpl.this) { if (state != State.Valid) { + log.error("Cannot update value on lock at {} because it's not in valid state: {}", path, state, + new Exception("invalid lock state " + state + " at " + path).fillInStackTrace()); return CompletableFuture.failedFuture( new IllegalStateException("Lock was not in valid state: " + state)); } @@ -210,12 +212,17 @@ synchronized CompletableFuture revalidateIfNeededAfterReconnection() { * This method is thread-safe and it will perform multiple re-validation operations in turn. */ synchronized CompletableFuture silentRevalidateOnce() { + if (state == State.Releasing) { + log.info("Lock on resource {} is being released. Skip revalidation", path); + return CompletableFuture.completedFuture(null); + } return sequencer.sequential(() -> revalidate(value)) .thenRun(() -> log.info("Successfully revalidated the lock on {}", path)) .exceptionally(ex -> { synchronized (ResourceLockImpl.this) { Throwable realCause = FutureUtil.unwrapCompletionException(ex); - if (realCause instanceof BadVersionException || realCause instanceof LockBusyException) { + if (realCause instanceof BadVersionException + || realCause instanceof LockBusyException) { log.warn("Failed to revalidate the lock at {}. Marked as expired. {}", path, realCause.getMessage()); state = State.Released; @@ -237,7 +244,7 @@ private synchronized CompletableFuture revalidate(T newValue) { // Since the distributed lock has been expired, we don't need to revalidate it. if (state != State.Valid && state != State.Init) { return CompletableFuture.failedFuture( - new IllegalStateException("Lock was not in valid state: " + state)); + new IllegalStateException("Lock for " + path + " was not in valid state: " + state)); } if (log.isDebugEnabled()) { log.debug("doRevalidate with newValue={}, version={}", newValue, version);