Skip to content

Commit

Permalink
NIFI-4664, NIFI-4662, NIFI-4660, NIFI-4659 moved tests which are timi…
Browse files Browse the repository at this point in the history
…ng/threading/network dependent and brittle to integration tests and un-ignored tests that are IT. Updated travis to reduce impact on infra and appveyor now skips test runs so is just to prove build works on windows. This closes apache#2319

squash
  • Loading branch information
joewitt authored and mcgilman committed Dec 6, 2017
1 parent c730f80 commit cdc1fac
Show file tree
Hide file tree
Showing 17 changed files with 1,401 additions and 1,111 deletions.
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ env:
- USER_LANGUAGE=en USER_REGION=US'
- USER_LANGUAGE=fr USER_REGION=FR'
- USER_LANGUAGE=ja USER_REGION=JP'
- USER_LANGUAGE=pt USER_REGION=BR'
- USER_LANGUAGE=default USER_REGION=default

os:
- linux
Expand Down Expand Up @@ -54,4 +52,4 @@ install:
# Note: The reason the sed is done as part of script is to ensure the pom hack
# won't affect the 'clean install' above
- bash .travis.sh
- mvn -T 2C -Pcontrib-check -Ddir-only clean install
- mvn -T 2C -Pcontrib-check -Ddir-only clean install
4 changes: 1 addition & 3 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ install:
- cmd: SET MAVEN_OPTS=-XX:MaxPermSize=2g -Xmx4g
- cmd: SET JAVA_OPTS=-XX:MaxPermSize=2g -Xmx4g
build_script:
- mvn -q clean package --batch-mode -DskipTests
test_script:
- mvn -q clean install --batch-mode -Pcontrib-check
- mvn clean package --batch-mode -DskipTests -Ddir-only
cache:
- C:\maven\
- C:\Users\appveyor\.m2

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,10 @@

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
Expand Down Expand Up @@ -99,97 +92,6 @@ public void validatePropertiesValidation() throws Exception {
}
}

@Test
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);

ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void validateGetAllMessagesPattern() throws Exception {
String groupName = "validateGetAllMessagesPattern";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);

ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);

ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void testJaasConfiguration() throws Exception {
ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;

public class ITConsumeKafka {

ConsumerLease mockLease = null;
ConsumerPool mockConsumerPool = null;

@Before
public void setup() {
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
}

@Test
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);

ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void validateGetAllMessagesPattern() throws Exception {
String groupName = "validateGetAllMessagesPattern";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);

ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);

ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,10 @@

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
Expand Down Expand Up @@ -99,97 +92,6 @@ public void validatePropertiesValidation() throws Exception {
}
}

@Test
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);

ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void validateGetAllMessagesPattern() throws Exception {
String groupName = "validateGetAllMessagesPattern";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);

ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";

when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);

ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST);
runner.run(1, false);

verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}

@Test
public void testJaasConfiguration() throws Exception {
ConsumeKafka_0_11 consumeKafka = new ConsumeKafka_0_11();
Expand Down
Loading

0 comments on commit cdc1fac

Please sign in to comment.