Skip to content

Commit

Permalink
YARN-11663. [Federation] Add Cache Entity Nums Limit. (#6662) Contrib…
Browse files Browse the repository at this point in the history
…uted by Shilun Fan.

Reviewed-by: Dinesh Chitlangia <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored Apr 1, 2024
1 parent f7d1ec2 commit 5f3eb44
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4031,6 +4031,10 @@ public static boolean isAclEnabled(Configuration conf) {
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;

public static final String FEDERATION_CACHE_ENTITY_NUMS =
FEDERATION_PREFIX + "cache-entity.nums";
public static final int DEFAULT_FEDERATION_CACHE_ENTITY_NUMS = 1000;

public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3787,6 +3787,15 @@
<value>300</value>
</property>

<property>
<description>
The number of entries in the Federation cache.
default is 1000.
</description>
<name>yarn.federation.cache-entity.nums</name>
<value>1000</value>
</property>

<property>
<description>The registry base directory for federation.</description>
<name>yarn.federation.registry.base-dir</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class FederationGuavaCache extends FederationCache {

private static final Logger LOG = LoggerFactory.getLogger(FederationCache.class);

private Cache<String, CacheRequest<String, ?>> cache;

private int cacheTimeToLive;
private long cacheEntityNums;

private String className = this.getClass().getSimpleName();

Expand All @@ -52,15 +57,20 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) {
// no conflict or pick up a specific one in the future.
cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS);
if (cacheTimeToLive <= 0) {
isCachingEnabled = false;
return;
}
this.setStateStore(pStateStore);

// Initialize Cache.
LOG.info("Creating a JCache Manager with name {}. " +
"Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive,
cacheEntityNums);
cache = CacheBuilder.newBuilder().expireAfterWrite(cacheTimeToLive,
TimeUnit.MILLISECONDS).build();
TimeUnit.SECONDS).maximumSize(cacheEntityNums).build();
isCachingEnabled = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,31 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.ehcache.Cache;
import org.ehcache.CacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.ExpiryPolicyBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.expiry.ExpiryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.spi.CachingProvider;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class FederationJCache extends FederationCache {

private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class);

private Cache<String, CacheRequest<String, ?>> cache;
private Cache<String, CacheRequest> cache;

private int cacheTimeToLive;
private long cacheEntityNums;

private boolean isCachingEnabled = false;

private String className = this.getClass().getSimpleName();
private final String className = this.getClass().getSimpleName();

@Override
public boolean isCachingEnabled() {
Expand All @@ -64,33 +63,35 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) {
// no conflict or pick up a specific one in the future
cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS);
if (cacheTimeToLive <= 0) {
isCachingEnabled = false;
return;
}
this.setStateStore(pStateStore);
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
this.cache = jcacheManager.getCache(className);
CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true);

if (this.cache == null) {
LOG.info("Creating a JCache Manager with name {}.", className);
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory =
new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry));
MutableConfiguration<String, CacheRequest<String, ?>> configuration =
new MutableConfiguration<>();
configuration.setStoreByValue(false);
configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory);
this.cache = jcacheManager.createCache(className, configuration);
LOG.info("Creating a JCache Manager with name {}. " +
"Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive,
cacheEntityNums);
// Set the number of caches
ResourcePoolsBuilder poolsBuilder = ResourcePoolsBuilder.heap(cacheEntityNums);
ExpiryPolicy expiryPolicy = ExpiryPolicyBuilder.timeToLiveExpiration(
Duration.ofSeconds(cacheTimeToLive));
CacheConfigurationBuilder<String, CacheRequest> configurationBuilder =
CacheConfigurationBuilder.newCacheConfigurationBuilder(
String.class, CacheRequest.class, poolsBuilder)
.withExpiry(expiryPolicy);
cache = cacheManager.createCache(className, configurationBuilder);
}
isCachingEnabled = true;
}

@Override
public void clearCache() {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
jcacheManager.destroyCache(className);

this.cache = null;
}

Expand Down Expand Up @@ -142,13 +143,12 @@ public void removeSubCluster(boolean flushCache) {
}

@VisibleForTesting
public Cache<String, CacheRequest<String, ?>> getCache() {
public Cache<String, CacheRequest> getCache() {
return cache;
}

@VisibleForTesting
public String getAppHomeSubClusterCacheKey(ApplicationId appId)
throws YarnException {
public String getAppHomeSubClusterCacheKey(ApplicationId appId) {
return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID,
appId.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import javax.cache.Cache;
import org.ehcache.Cache;

/**
* Unit tests for FederationStateStoreFacade.
Expand Down Expand Up @@ -245,7 +245,7 @@ public void testGetApplicationHomeSubClusterCache() throws Exception {
assert fedCache instanceof FederationJCache;
FederationJCache jCache = (FederationJCache) fedCache;
String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId);
Cache<String, CacheRequest<String, ?>> cache = jCache.getCache();
Cache<String, CacheRequest> cache = jCache.getCache();
CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
ApplicationHomeSubClusterCacheResponse response =
ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue());
Expand Down

0 comments on commit 5f3eb44

Please sign in to comment.