diff --git a/docs/changelog/128111.yaml b/docs/changelog/128111.yaml new file mode 100644 index 0000000000000..d3b113a682d4a --- /dev/null +++ b/docs/changelog/128111.yaml @@ -0,0 +1,5 @@ +pr: 128111 +summary: Fix union types in CCS +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 26e6ec81584cf..76b52708b4fac 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -67,6 +67,12 @@ public static org.elasticsearch.Version remoteClusterVersion() { return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT; } + public static org.elasticsearch.Version bwcVersion() { + org.elasticsearch.Version local = localClusterVersion(); + org.elasticsearch.Version remote = remoteClusterVersion(); + return local.before(remote) ? local : remote; + } + private static Version distributionVersion(String key) { final String val = System.getProperty(key); return val != null ? Version.fromString(val) : Version.CURRENT; diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 71fd9da8d03b3..f056e2294e63f 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { private static TestFeatureService remoteFeaturesService; private static RestClient remoteClusterClient; + private static DataLocation dataLocation = null; @ParametersFactory(argumentFormatting = "%2$s.%3$s") public static List readScriptSpec() throws Exception { @@ -188,8 +189,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw */ static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException { RestClient twoClients = mock(RestClient.class); + assertNotNull("data location was set", dataLocation); // write to a single cluster for now due to the precision of some functions such as avg and tests related to updates - final RestClient bulkClient = randomFrom(localClient, remoteClient); + final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient); when(twoClients.performRequest(any())).then(invocation -> { Request request = invocation.getArgument(0); String endpoint = request.getEndpoint(); @@ -214,6 +216,11 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return twoClients; } + enum DataLocation { + REMOTE_ONLY, + ANY_CLUSTER + } + static Request[] cloneRequests(Request orig, int numClones) throws IOException { Request[] clones = new Request[numClones]; for (int i = 0; i < clones.length; i++) { @@ -238,6 +245,9 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException { * Convert FROM employees ... => FROM *:employees,employees */ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) { + if (dataLocation == null) { + dataLocation = randomFrom(DataLocation.values()); + } String query = testCase.query; String[] commands = query.split("\\|"); String first = commands[0].trim(); @@ -245,19 +255,23 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas String[] parts = commands[0].split("(?i)metadata"); assert parts.length >= 1 : parts; String fromStatement = parts[0]; - - String[] localIndices = fromStatement.substring("FROM ".length()).split(","); - String remoteIndices = Arrays.stream(localIndices) - .map(index -> "*:" + index.trim() + "," + index.trim()) - .collect(Collectors.joining(",")); - var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length()); + String index = fromStatement.substring("FROM ".length()); + if (canUseRemoteIndicesOnly() && randomBoolean()) { + index = remoteIndices(index); + } else { + index = index + "," + remoteIndices(index); + } + var newFrom = "FROM " + index + " " + commands[0].substring(fromStatement.length()); testCase.query = newFrom + query.substring(first.length()); } if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) { String[] parts = commands[0].split("\\s+"); assert parts.length >= 2 : commands[0]; - String[] indices = parts[1].split(","); - parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(",")); + if (canUseRemoteIndicesOnly() && randomBoolean()) { + parts[1] = remoteIndices(parts[1]); + } else { + parts[1] = parts[1] + "," + remoteIndices(parts[1]); + } String newNewMetrics = String.join(" ", parts); testCase.query = newNewMetrics + query.substring(first.length()); } @@ -274,6 +288,15 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas return testCase; } + static String remoteIndices(String localIndices) { + String[] parts = localIndices.split(","); + return Arrays.stream(parts).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); + } + + static boolean canUseRemoteIndicesOnly() { + return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_9_1_0); + } + static boolean hasIndexMetadata(String query) { String[] commands = query.split("\\|"); if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index fe5c36aff07e7..0b62f3c4930a5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -991,4 +991,28 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha remoteClient.admin().indices().prepareRefresh(indexName).get(); } + public void testMultiTypes() throws Exception { + Client remoteClient = client(REMOTE_CLUSTER_1); + int totalDocs = 0; + for (String type : List.of("integer", "long")) { + String index = "conflict-index-" + type; + assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type)); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get(); + } + remoteClient.admin().indices().prepareRefresh(index).get(); + totalDocs += numDocs; + } + for (String castFunction : List.of("TO_LONG", "TO_INT")) { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)"); + try (EsqlQueryResponse resp = runQuery(request)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values.get(0), hasSize(1)); + assertThat(values.get(0).get(0), equalTo((long) totalDocs)); + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 0f348e268ef69..31a859d0c1951 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -157,7 +157,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference); MultiTypeEsField unionTypes = findUnionTypes(attr); if (unionTypes != null) { - String indexName = shardContext.ctx.index().getName(); + // Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix + String indexName = shardContext.ctx.getFullyQualifiedIndex().getName(); Expression conversion = unionTypes.getConversionExpressionForIndex(indexName); return conversion == null ? BlockLoader.CONSTANT_NULLS