Skip to content

Commit

Permalink
Update data stream lifecycle telemetry to track global retention (ela…
Browse files Browse the repository at this point in the history
…stic#112451)

Currently, the data stream lifecycle telemetry has the following
structure:

```
{
....
  "data_lifecycle" : {
    "available": true,
    "enabled": true,
    "count": 0,
    "default_rollover_used": true,
    "retention": {
        "minimum_millis": 0,
        "maximum_millis": 0,
        "average_millis": 0.0
    }
  }....
```

In the snippet above you can see that we track:

- The amount of data streams managed by the data stream lifecycle by `count`
- If the default rollover has been overwritten by `default_rollover_used`
- The min, max and average of the `data_retention` configured on a data stream level.

In this PR we propose the following extention:

```
....
  "data_lifecycle" : {
    "available": true,
    "enabled": true,
    "count": 0,
    "default_rollover_used": true,
    "effective_retention": { #elastic/dev#2537
        "retained_data_streams": 5,
        "minimum_millis": 0, # Only if retained data streams > 1
        "maximum_millis": 0,
        "average_millis": 0.0
    },
    "data_retention": {
        "configured_data_streams": 5,
        "minimum_millis": 0, # Only if retained data streams > 1
        "maximum_millis": 0,
        "average_millis": 0.0
    },
    "global_retention": {
      "default": {
         "defined": true/false,
	  "affected_data_streams": 0,
         "millis": 0 
      },
      "max": {
         "defined": true/false,
	  "affected_data_streams": 0,
         "millis": 0 
      }
    }
```

With this extension we are tracking:

- The amount of data streams managed by the data stream lifecycle by `count`
- If the default rollover has been overwritten by `default_rollover_used`
- The min, max and average of the `data_retention` configured on a data stream level and the number of data streams that have it configured. We add the min, max and avg only if there are data streams with data retention configuration to avoid messing with the stats in a dashboard.
- The min, max and average of the `effective_retention` and the number of data streams that are retained. We add the min, max and avg only if there are retained data streams to avoid messing with the stats in a dashboard.
- Global retention stats, if they are defined, if the number of the affected data streams and the actual value.

The above metrics allow us to answer questions like:

- How many data streams are affected by global retention.
- How big is the difference between the longest data retention compared to max global retention.
- How much does the effective retention diverging from the data retention, this will show the impact of the global retention.
  • Loading branch information
gmarouli authored Sep 11, 2024
1 parent 9bd49f0 commit c1a2d39
Show file tree
Hide file tree
Showing 10 changed files with 560 additions and 187 deletions.
16 changes: 9 additions & 7 deletions docs/changelog/111972.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ type: feature
issues: []
highlight:
title: Add global retention in data stream lifecycle
body: "Data stream lifecycle now supports configuring retention on a cluster level,\
\ namely global retention. Global retention \nallows us to configure two different\
\ retentions:\n\n- `data_streams.lifecycle.retention.default` is applied to all\
\ data streams managed by the data stream lifecycle that do not have retention\n\
defined on the data stream level.\n- `data_streams.lifecycle.retention.max` is\
\ applied to all data streams managed by the data stream lifecycle and it allows\
\ any data stream \ndata to be deleted after the `max_retention` has passed."
body: |-
Data stream lifecycle now supports configuring retention on a cluster level,
namely global retention. Global retention \nallows us to configure two different
retentions:
- `data_streams.lifecycle.retention.default` is applied to all data streams managed
by the data stream lifecycle that do not have retention defined on the data stream level.
- `data_streams.lifecycle.retention.max` is applied to all data streams managed by the
data stream lifecycle and it allows any data stream \ndata to be deleted after the `max_retention` has passed.
notable: true
29 changes: 29 additions & 0 deletions docs/changelog/112451.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pr: 112451
summary: Update data stream lifecycle telemetry to track global retention
area: Data streams
type: breaking
issues: []
breaking:
title: Update data stream lifecycle telemetry to track global retention
area: REST API
details: |-
In this release we introduced global retention settings that fulfil the following criteria:
- a data stream managed by the data stream lifecycle,
- a data stream that is not an internal data stream.
As a result, we defined different types of retention:
- **data retention**: the retention configured on data stream level by the data stream user or owner
- **default global retention:** the retention configured by an admin on a cluster level and applied to any
data stream that doesn't have data retention and fulfils the criteria.
- **max global retention:** the retention configured by an admin to guard against having long retention periods.
Any data stream that fulfills the criteria will adhere to the data retention unless it exceeds the max retention,
in which case the max global retention applies.
- **effective retention:** the retention that applies on the data stream that fulfill the criteria at a given moment
in time. It takes into consideration all the retention above and resolves it to the retention that will take effect.
Considering the above changes, having a field named `retention` in the usage API was confusing. For this reason, we
renamed it to `data_retention` and added telemetry about the other configurations too.
impact: Users that use the field `data_lifecycle.retention` should use the `data_lifecycle.data_retention`
notable: false
17 changes: 13 additions & 4 deletions docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,19 @@ GET /_xpack/usage
"enabled": true,
"count": 0,
"default_rollover_used": true,
"retention": {
"minimum_millis": 0,
"maximum_millis": 0,
"average_millis": 0.0
"data_retention": {
"configured_data_streams": 0
},
"effective_retention": {
"retained_data_streams": 0
},
"global_retention": {
"default": {
"defined": false
},
"max": {
"defined": false
}
}
},
"data_tiers" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_ADD_INDEX_MODE_CONCRETE_INDICES = def(8_736_00_0);
public static final TransportVersion UNASSIGNED_PRIMARY_COUNT_ON_CLUSTER_HEALTH = def(8_737_00_0);
public static final TransportVersion ESQL_AGGREGATE_EXEC_TRACKS_INTERMEDIATE_ATTRS = def(8_738_00_0);

public static final TransportVersion CCS_TELEMETRY_STATS = def(8_739_00_0);
public static final TransportVersion GLOBAL_RETENTION_TELEMETRY = def(8_740_00_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand All @@ -34,11 +36,13 @@
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

Expand All @@ -64,44 +68,84 @@ private void cleanup() throws Exception {
return clusterStateBuilder.build();
});
updateClusterSettings(
Settings.builder().put(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING.getKey(), (String) null)
Settings.builder()
.putNull(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING.getKey())
.putNull(DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey())
);
}

@SuppressWarnings("unchecked")
public void testAction() throws Exception {
assertUsageResults(0, 0, 0, 0.0, true);
AtomicLong totalCount = new AtomicLong(0);
AtomicLong countLifecycleWithRetention = new AtomicLong(0);
// test empty results
{
Map<String, Object> map = getLifecycleUsage();
assertThat(map.get("available"), equalTo(true));
assertThat(map.get("enabled"), equalTo(true));
assertThat(map.get("count"), equalTo(0));
assertThat(map.get("default_rollover_used"), equalTo(true));

Map<String, Object> dataRetentionMap = (Map<String, Object>) map.get("data_retention");
assertThat(dataRetentionMap.size(), equalTo(1));
assertThat(dataRetentionMap.get("configured_data_streams"), equalTo(0));

Map<String, Object> effectiveRetentionMap = (Map<String, Object>) map.get("effective_retention");
assertThat(effectiveRetentionMap.size(), equalTo(1));
assertThat(effectiveRetentionMap.get("retained_data_streams"), equalTo(0));

Map<String, Object> globalRetentionMap = (Map<String, Object>) map.get("global_retention");
assertThat(globalRetentionMap.get("max"), equalTo(Map.of("defined", false)));
assertThat(globalRetentionMap.get("default"), equalTo(Map.of("defined", false)));
}

// Keep track of the data streams created
AtomicInteger dataStreamsWithLifecycleCount = new AtomicInteger(0);
AtomicInteger dataStreamsWithRetentionCount = new AtomicInteger(0);
AtomicInteger dataStreamsWithDefaultRetentionCount = new AtomicInteger(0);

AtomicLong totalRetentionTimes = new AtomicLong(0);
AtomicLong minRetention = new AtomicLong(Long.MAX_VALUE);
AtomicLong maxRetention = new AtomicLong(Long.MIN_VALUE);

boolean useDefaultRolloverConfig = randomBoolean();
if (useDefaultRolloverConfig == false) {
updateClusterSettings(
Settings.builder().put(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING.getKey(), "min_docs=33")
);
}
TimeValue defaultRetention = TimeValue.timeValueDays(10);
boolean useDefaultRetention = randomBoolean();
if (useDefaultRetention) {
updateClusterSettings(
Settings.builder()
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(), defaultRetention.getStringRep())
);
}
/*
* We now add a number of simulated data streams to the cluster state. Some have lifecycles, some don't. The ones with lifecycles
* have varying retention periods. After adding them, we make sure the numbers add up.
*/
updateClusterState(clusterState -> {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
Map<String, DataStream> dataStreamMap = new HashMap<>();
for (int dataStreamCount = 0; dataStreamCount < randomInt(200); dataStreamCount++) {
boolean hasLifecycle = randomBoolean();
boolean atLeastOne = false;
for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(1, 200); dataStreamCount++) {
boolean hasLifecycle = randomBoolean() || atLeastOne == false;
DataStreamLifecycle lifecycle;
boolean systemDataStream = rarely();
if (hasLifecycle) {
if (randomBoolean()) {
lifecycle = new DataStreamLifecycle(null, null, null);
totalCount.incrementAndGet();
dataStreamsWithLifecycleCount.incrementAndGet();
if (useDefaultRetention && systemDataStream == false) {
dataStreamsWithDefaultRetentionCount.incrementAndGet();
}
atLeastOne = true;
} else {
long retentionMillis = randomLongBetween(1000, 100000);
boolean isEnabled = randomBoolean();
boolean isEnabled = randomBoolean() || atLeastOne == false;
if (isEnabled) {
totalCount.incrementAndGet();
countLifecycleWithRetention.incrementAndGet();
dataStreamsWithLifecycleCount.incrementAndGet();
dataStreamsWithRetentionCount.incrementAndGet();
totalRetentionTimes.addAndGet(retentionMillis);

if (retentionMillis < minRetention.get()) {
Expand All @@ -110,6 +154,7 @@ public void testAction() throws Exception {
if (retentionMillis > maxRetention.get()) {
maxRetention.set(retentionMillis);
}
atLeastOne = true;
}
lifecycle = DataStreamLifecycle.newBuilder().dataRetention(retentionMillis).enabled(isEnabled).build();
}
Expand All @@ -121,7 +166,6 @@ public void testAction() throws Exception {
Index index = new Index(randomAlphaOfLength(60), randomAlphaOfLength(60));
indices.add(index);
}
boolean systemDataStream = randomBoolean();
boolean replicated = randomBoolean();
DataStream dataStream = new DataStream(
randomAlphaOfLength(50),
Expand All @@ -147,28 +191,59 @@ public void testAction() throws Exception {
clusterStateBuilder.metadata(metadataBuilder);
return clusterStateBuilder.build();
});
int expectedMinimumRetention = minRetention.get() == Long.MAX_VALUE ? 0 : minRetention.intValue();
int expectedMaximumRetention = maxRetention.get() == Long.MIN_VALUE ? 0 : maxRetention.intValue();
double expectedAverageRetention = countLifecycleWithRetention.get() == 0

int retainedDataStreams = dataStreamsWithRetentionCount.get() + dataStreamsWithDefaultRetentionCount.get();

int expectedMinimumDataRetention = minRetention.get() == Long.MAX_VALUE ? 0 : minRetention.intValue();
int expectedMinimumEffectiveRetention = dataStreamsWithDefaultRetentionCount.get() > 0
? (int) Math.min(minRetention.get(), defaultRetention.getMillis())
: expectedMinimumDataRetention;

int expectedMaximumDataRetention = maxRetention.get() == Long.MIN_VALUE ? 0 : maxRetention.intValue();
int expectedMaximumEffectiveRetention = dataStreamsWithDefaultRetentionCount.get() > 0
? (int) Math.max(maxRetention.get(), defaultRetention.getMillis())
: expectedMaximumDataRetention;

double expectedAverageDataRetention = dataStreamsWithRetentionCount.get() == 0
? 0.0
: totalRetentionTimes.doubleValue() / countLifecycleWithRetention.get();
assertUsageResults(
totalCount.intValue(),
expectedMinimumRetention,
expectedMaximumRetention,
expectedAverageRetention,
useDefaultRolloverConfig
);
: totalRetentionTimes.doubleValue() / dataStreamsWithRetentionCount.get();
double expectedAverageEffectiveRetention = dataStreamsWithDefaultRetentionCount.get() > 0
? (totalRetentionTimes.doubleValue() + dataStreamsWithDefaultRetentionCount.get() * defaultRetention.getMillis())
/ retainedDataStreams
: expectedAverageDataRetention;

Map<String, Object> map = getLifecycleUsage();
assertThat(map.get("available"), equalTo(true));
assertThat(map.get("enabled"), equalTo(true));
assertThat(map.get("count"), equalTo(dataStreamsWithLifecycleCount.intValue()));
assertThat(map.get("default_rollover_used"), equalTo(useDefaultRolloverConfig));

Map<String, Object> dataRetentionMap = (Map<String, Object>) map.get("data_retention");
assertThat(dataRetentionMap.get("configured_data_streams"), equalTo(dataStreamsWithRetentionCount.get()));
if (dataStreamsWithRetentionCount.get() > 0) {
assertThat(dataRetentionMap.get("minimum_millis"), equalTo(expectedMinimumDataRetention));
assertThat(dataRetentionMap.get("maximum_millis"), equalTo(expectedMaximumDataRetention));
assertThat(dataRetentionMap.get("average_millis"), equalTo(expectedAverageDataRetention));
}

Map<String, Object> effectieRetentionMap = (Map<String, Object>) map.get("effective_retention");
assertThat(effectieRetentionMap.get("retained_data_streams"), equalTo(retainedDataStreams));
if (retainedDataStreams > 0) {
assertThat(effectieRetentionMap.get("minimum_millis"), equalTo(expectedMinimumEffectiveRetention));
assertThat(effectieRetentionMap.get("maximum_millis"), equalTo(expectedMaximumEffectiveRetention));
assertThat(effectieRetentionMap.get("average_millis"), equalTo(expectedAverageEffectiveRetention));
}

Map<String, Map<String, Object>> globalRetentionMap = (Map<String, Map<String, Object>>) map.get("global_retention");
assertThat(globalRetentionMap.get("max").get("defined"), equalTo(false));
assertThat(globalRetentionMap.get("default").get("defined"), equalTo(useDefaultRetention));
if (useDefaultRetention) {
assertThat(globalRetentionMap.get("default").get("affected_data_streams"), equalTo(dataStreamsWithDefaultRetentionCount.get()));
assertThat(globalRetentionMap.get("default").get("retention_millis"), equalTo((int) defaultRetention.getMillis()));
}
}

@SuppressWarnings("unchecked")
private void assertUsageResults(
int count,
int minimumRetention,
int maximumRetention,
double averageRetention,
boolean defaultRolloverUsed
) throws Exception {
private Map<String, Object> getLifecycleUsage() throws IOException {
XPackUsageFeatureResponse response = safeGet(client().execute(DATA_STREAM_LIFECYCLE, new XPackUsageRequest(SAFE_AWAIT_TIMEOUT)));
XContentBuilder builder = XContentFactory.jsonBuilder();
builder = response.getUsage().toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand All @@ -177,17 +252,7 @@ private void assertUsageResults(
true,
XContentType.JSON
);

Map<String, Object> map = tuple.v2();
assertThat(map.get("available"), equalTo(true));
assertThat(map.get("enabled"), equalTo(true));
assertThat(map.get("count"), equalTo(count));
assertThat(map.get("default_rollover_used"), equalTo(defaultRolloverUsed));
Map<String, Object> retentionMap = (Map<String, Object>) map.get("retention");
assertThat(retentionMap.size(), equalTo(3));
assertThat(retentionMap.get("minimum_millis"), equalTo(minimumRetention));
assertThat(retentionMap.get("maximum_millis"), equalTo(maximumRetention));
assertThat(retentionMap.get("average_millis"), equalTo(averageRetention));
return tuple.v2();
}

/*
Expand Down
Loading

0 comments on commit c1a2d39

Please sign in to comment.